Description
In KAFKA-201 pipeline for copy.existing has been changed.
MR: https://github.com/mongodb/mongo-kafka/pull/59
The current version could lead to unexpected behavior if the user using the ns field in the pipeline.
Here is an example that could be added and run in MongoSourceConnectorIntegrationTest
@Test |
@DisplayName("Ensure source loads and stream data from certain collections") |
void testSourceLoadsAndStreamDataFromCertainCollections() { |
assumeTrue(isGreaterThanThreeDotSix());
|
MongoDatabase db = getDatabaseWithPostfix();
|
MongoCollection<Document> coll1 = db.getCollection("coll1"); |
MongoCollection<Document> coll2 = db.getCollection("coll2"); |
MongoCollection<Document> coll3 = db.getCollection("coll3"); |
|
|
insertMany(rangeClosed(1, 50), coll1); |
insertMany(rangeClosed(1, 50), coll2); |
insertMany(rangeClosed(1, 50), coll3); |
|
|
Properties sourceProperties = new Properties(); |
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true"); |
String namespaceRegex = String.format("(%s\\.coll(1|2))", db.getName()); |
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
|
sourceProperties.put(PIPELINE_CONFIG, "[{\"$match\": {\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}}]"); |
/* |
Works with such pipeline
|
sourceProperties.put(
|
PIPELINE_CONFIG,
|
"[{\"$match\": {\"$or\": [{\"ns.coll\": {\"$regex\": /^coll(1|2)$/}}, {\"__.coll\": {\"$regex\": /^coll(1|2)$/}}]}}]");
|
*/
|
|
|
addSourceConnector(sourceProperties);
|
|
|
insertMany(rangeClosed(51, 100), coll1); |
insertMany(rangeClosed(51, 100), coll2); |
insertMany(rangeClosed(51, 100), coll3); |
|
|
assertProduced(createInserts(1, 100), coll1); // test fails. Existing documents (1, 50) weren't produced to kafka |
assertProduced(createInserts(1, 100), coll2); |
// TODO: assertProducedNone(coll3) |
}
|
see also https://github.com/mongodb/mongo-kafka/pull/59#discussion_r606671922