[KAFKA-217] Using ns filed in copy.existing pipeline bug Created: 06/Apr/21  Updated: 28/Oct/23  Resolved: 01/Jun/21

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.5.0
Fix Version/s: 1.5.1

Type: Bug Priority: Major - P3
Reporter: Andrey B Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Case:

 Description   

In KAFKA-201 pipeline for copy.existing has been changed.
MR: https://github.com/mongodb/mongo-kafka/pull/59
The current version could lead to unexpected behavior if the user using the ns field in the pipeline.

Here is an example that could be added and run in MongoSourceConnectorIntegrationTest

  @Test
  @DisplayName("Ensure source loads and stream data from certain collections")
  void testSourceLoadsAndStreamDataFromCertainCollections() {
    assumeTrue(isGreaterThanThreeDotSix());
    MongoDatabase db = getDatabaseWithPostfix();
    MongoCollection<Document> coll1 = db.getCollection("coll1");
    MongoCollection<Document> coll2 = db.getCollection("coll2");
    MongoCollection<Document> coll3 = db.getCollection("coll3");
 
    insertMany(rangeClosed(1, 50), coll1);
    insertMany(rangeClosed(1, 50), coll2);
    insertMany(rangeClosed(1, 50), coll3);
 
    Properties sourceProperties = new Properties();
    sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
    String namespaceRegex = String.format("(%s\\.coll(1|2))", db.getName());
    sourceProperties.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
    sourceProperties.put(PIPELINE_CONFIG, "[{\"$match\": {\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}}]");
    /* 
    Works with such pipeline
    sourceProperties.put(
        PIPELINE_CONFIG,
        "[{\"$match\": {\"$or\": [{\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}, {\"__.coll\": {\"$regex\": /^coll(1|2)$/}}]}}]");
     */
 
    addSourceConnector(sourceProperties);
 
    insertMany(rangeClosed(51, 100), coll1);
    insertMany(rangeClosed(51, 100), coll2);
    insertMany(rangeClosed(51, 100), coll3);
 
    assertProduced(createInserts(1, 100), coll1); // test fails. Existing documents (1, 50) weren't produced to kafka
    assertProduced(createInserts(1, 100), coll2);
    // TODO: assertProducedNone(coll3)
  }

see also https://github.com/mongodb/mongo-kafka/pull/59#discussion_r606671922



 Comments   
Comment by Githook User [ 01/Jun/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Fixed using `ns` field in `copy.existing` pipeline bug

KAFKA-217
Branch: 1.5.x
https://github.com/mongodb/mongo-kafka/commit/47835f1b8a059fa4ca29018483e4902f01d7bb9e

Comment by Githook User [ 01/Jun/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Fixed using `ns` field in `copy.existing` pipeline bug

KAFKA-217
Branch: master
https://github.com/mongodb/mongo-kafka/commit/0da64c7f80a36581af4dec945649a72dfe99b7b5

Comment by Esha Bhargava [ 13/Apr/21 ]

andreworty@gmail.com Thank you for reporting this issue! We'll look into it and get back to you soon.

Generated at Thu Feb 08 09:05:52 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.