Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-217

Using ns filed in copy.existing pipeline bug

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 1.5.1
    • Affects Version/s: 1.5.0
    • Component/s: Source
    • Labels:
      None

      In KAFKA-201 pipeline for copy.existing has been changed.
      MR: https://github.com/mongodb/mongo-kafka/pull/59
      The current version could lead to unexpected behavior if the user using the ns field in the pipeline.

      Here is an example that could be added and run in MongoSourceConnectorIntegrationTest

        @Test
        @DisplayName("Ensure source loads and stream data from certain collections")
        void testSourceLoadsAndStreamDataFromCertainCollections() {
          assumeTrue(isGreaterThanThreeDotSix());
          MongoDatabase db = getDatabaseWithPostfix();
          MongoCollection<Document> coll1 = db.getCollection("coll1");
          MongoCollection<Document> coll2 = db.getCollection("coll2");
          MongoCollection<Document> coll3 = db.getCollection("coll3");
      
          insertMany(rangeClosed(1, 50), coll1);
          insertMany(rangeClosed(1, 50), coll2);
          insertMany(rangeClosed(1, 50), coll3);
      
          Properties sourceProperties = new Properties();
          sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
          String namespaceRegex = String.format("(%s\\.coll(1|2))", db.getName());
          sourceProperties.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
          sourceProperties.put(PIPELINE_CONFIG, "[{\"$match\": {\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}}]");
          /* 
          Works with such pipeline
          sourceProperties.put(
              PIPELINE_CONFIG,
              "[{\"$match\": {\"$or\": [{\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}, {\"__.coll\": {\"$regex\": /^coll(1|2)$/}}]}}]");
           */
      
          addSourceConnector(sourceProperties);
      
          insertMany(rangeClosed(51, 100), coll1);
          insertMany(rangeClosed(51, 100), coll2);
          insertMany(rangeClosed(51, 100), coll3);
      
          assertProduced(createInserts(1, 100), coll1); // test fails. Existing documents (1, 50) weren't produced to kafka
          assertProduced(createInserts(1, 100), coll2);
          // TODO: assertProducedNone(coll3)
        }
      

      see also https://github.com/mongodb/mongo-kafka/pull/59#discussion_r606671922

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            andreworty@gmail.com Andrey B
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: