In KAFKA-201 pipeline for copy.existing has been changed.
MR: https://github.com/mongodb/mongo-kafka/pull/59
The current version could lead to unexpected behavior if the user using the ns field in the pipeline.
Here is an example that could be added and run in MongoSourceConnectorIntegrationTest
@Test @DisplayName("Ensure source loads and stream data from certain collections") void testSourceLoadsAndStreamDataFromCertainCollections() { assumeTrue(isGreaterThanThreeDotSix()); MongoDatabase db = getDatabaseWithPostfix(); MongoCollection<Document> coll1 = db.getCollection("coll1"); MongoCollection<Document> coll2 = db.getCollection("coll2"); MongoCollection<Document> coll3 = db.getCollection("coll3"); insertMany(rangeClosed(1, 50), coll1); insertMany(rangeClosed(1, 50), coll2); insertMany(rangeClosed(1, 50), coll3); Properties sourceProperties = new Properties(); sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true"); String namespaceRegex = String.format("(%s\\.coll(1|2))", db.getName()); sourceProperties.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex); sourceProperties.put(PIPELINE_CONFIG, "[{\"$match\": {\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}}]"); /* Works with such pipeline sourceProperties.put( PIPELINE_CONFIG, "[{\"$match\": {\"$or\": [{\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}, {\"__.coll\": {\"$regex\": /^coll(1|2)$/}}]}}]"); */ addSourceConnector(sourceProperties); insertMany(rangeClosed(51, 100), coll1); insertMany(rangeClosed(51, 100), coll2); insertMany(rangeClosed(51, 100), coll3); assertProduced(createInserts(1, 100), coll1); // test fails. Existing documents (1, 50) weren't produced to kafka assertProduced(createInserts(1, 100), coll2); // TODO: assertProducedNone(coll3) }
see also https://github.com/mongodb/mongo-kafka/pull/59#discussion_r606671922