[KAFKA-131] Copy existing configuration with pipeline Created: 03/Aug/20  Updated: 28/Oct/23  Resolved: 08/Sep/20

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.2.0
Fix Version/s: 1.3.0

Type: Improvement Priority: Major - P3
Reporter: Sabari Gandhi Assignee: Ross Lawley
Resolution: Fixed Votes: 1
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

Kafka Connector: 1.2.0
MongoDb version: 3.6.17


Issue Links:
Documented
Duplicate
is duplicated by KAFKA-150 When copy exist, support config pipel... Closed
Documentation Changes: Needed
Documentation Changes Summary:

Added a new configuration:

copy.existing.pipeline=[{"$match": {"closed": "false"}}]

An inline JSON array with objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient.

Use if there is any filtering of collection data in the `pipeline` configuration to speed up the copying process


 Description   

We are trying to do copy existing data in huge collections(around 6 million documents). our requirement is such that we need a specific set of data and not all data. so in the configuration, we provide pipeline similar to:

"pipeline": "[
  { $project: { "updateDescription":0 } }, 
  { $match: {"fullDocument.createdDate":{ "$gt": ISODate("2019-03-31T13:44:54.791Z"), "$lt": ISODate("2020-07-23T13:44:54.791Z")} } } 
]". 

Mongodb logs show the lookup seems to be very expensive. From the connector code, it looks up the entire collection and applies the filter https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java#L147 The pipeline configuration is added at the end so it looks up the entire collection and applies the data. Is there an option or a way to add the provided pipeline configuration at the beginning of the list.

Also, please provide us other configuration option available to make the copy data effective. Thanks 



 Comments   
Comment by Sabari Gandhi [ 01/Oct/20 ]

Hi Rob/Ross, we saw that 1.3.0 was released today. Thanks 

Comment by Sabari Gandhi [ 01/Oct/20 ]

Hi Robert, Thanks for providing us with snapshot build information for testing. Can you also please let us know the planned date of release this month? Thanks.

Comment by Robert Walters [ 15/Sep/20 ]

You can test this now by grabbing the latest build here:
https://oss.sonatype.org/content/repositories/snapshots/org/mongodb/kafka/mongo-kafka-connect/1.3.0-SNAPSHOT/

We will officially be releasing 1.3 in October.

Comment by Urvish Saraiya [ 15/Sep/20 ]

Hi Ross,

Any updates on when is 1.3.0 release going to be available for consumption ?

Thanks,

Urvish

Comment by Githook User [ 08/Sep/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Added `copy.existing.pipeline` configuration.

Allows indexes to be used during the copying process, use when there is any filtering done by the main pipeline

KAFKA-131
Branch: master
https://github.com/mongodb/mongo-kafka/commit/96fb1ad05a79c22a6de4359ca75a9086a2881f8b

Comment by Ross Lawley [ 03/Sep/20 ]

PR: https://github.com/mongodb/mongo-kafka/pull/33

Comment by Urvish Saraiya [ 28/Aug/20 ]

Hi Ross,

Any updates on when 1.3.0 release is coming out ? We are looking to use feature to load the existing data.

Thanks,

Urvish

Comment by Ross Lawley [ 04/Aug/20 ]

Hi saraiya.urvish@gmail.com,

To clarify, the initial analysis you've done is correct, your pipeline query is not efficient for the copying process and is causing a full collection scan, which would increase the load on your system. Once the data is copied the pipeline wouldn't cause an issue with the change stream. So there is a feature missing to improve the efficiency of the copying process in your scenario.

I plan on adding a new config to add a pipeline stage at the start of the copying data pipeline - which will enable the use of the indexes correctly. That should fix the issue for you.

The 1.3.0 release is scheduled for this quarter so should be released sometime next month.

I will update this ticket once this improvement has been added to source connector and SNAPSHOT builds will be available for testing.

Ross

Comment by Urvish Saraiya [ 04/Aug/20 ]

Thanks Ross for the response.

Just wanted to clarify our scenario here, When we tried to do copy existing data we did not notice any flow of data or cursor getting created so we deleted the connector after 10 minutes. Than we looked into mongodb logs and found that query took 10 hours. See below log statement

command center0.abc command: aggregate { aggregate: "abc", pipeline: [ { $replaceRoot: { newRoot: { _id: { _id: "$_id", copyingData: true }, operationType: "insert", ns: { db: "center0", coll: "abc" }, documentKey: { _id: "$_id" }, fullDocument: "$$ROOT" } } }, { $match: { fullDocument.createdDate: { $gt: new Date(1596350748791) } } }, { $project: { updateDescription: 0 } } ], cursor: {}, $db: "center0", $clusterTime: { clusterTime: Timestamp(1596415285, 333), signature: { hash: BinData(0, REDACTED), keyId: REDACTED } }, lsid: { id: UUID("e63b613c-5c7b-4455-89ad-17dc279cd088") } } planSummary: COLLSCAN cursorid:8388662097805689831 keysExamined:0 docsExamined:61935946 numYields:1635016 nreturned:101 reslen:3370489 locks:{ Global: { acquireCount: { r: 5429114 } }, Database: { acquireCount: { r: 2714557 } }, Collection: { acquireCount: { r: 2714557 } } } protocol:op_msg 39121496ms  

 

So do you think that there would be initial delay of 10 hours for such a big collection ? We had concerns that such a long running query can cause other issues on operational db. 

 

We did another test with 3 million records and we saw that it opened cursor immediately saw that data was flowing through continuously with out any initial delay.

 

Any explanation or insights why would data not flow for large collection ? I am fine if it takes more time to execute as far as we can throttle requests on to mongodb.

 

Comment by Ross Lawley [ 04/Aug/20 ]

Hi saraiya.urvish@gmail.com,

Copying a full 60 million document collection, isn't necessarily an issue if the pipeline is just a transformation of the documents shape. It will take time and that is dependent on the network latency.  Although the copying process can utilize multiple threads, they are currently limited to 1 per namespace. That is all that is currently available.

Collections could be partitioned so that multiple cursors could be used to copy the data but that also comes with a set of limitations. I've added KAFKA-135 to track that.

The only workaround currently would be to write a custom kafka producer and manually handle the copying process.

Ross

Comment by Urvish Saraiya [ 04/Aug/20 ]

Thank you Ross for the prompt response. Do you know what is time line around the release ?

Is there any workaround available ? If copy existing is used to bring over entire collection, but if the collection is large in our case it was 60 million records, still it won't work efficiently. Do you have any recommendation on how to go around this issue ?

Comment by Ross Lawley [ 04/Aug/20 ]

Marking as an improvement as this will be a new feature.  Scheduled for the 1.3 release of the connector.

Comment by Ross Lawley [ 04/Aug/20 ]

Hi sabari.mgn@gmail.com & saraiya.urvish@gmail.com,

Thanks for the ticket and feedback. Unfortunately, the current mongodb query plan optimizer does not derive the intent here of the user supplied pipeline, when added to the conversion to a Change Stream Event pipeline. Given that the user supplied pipeline is intended to operate solely on the outputted change stream events it can't be set as the first pipeline stage. The only guaranteed way for it to work is for it to be applied after the conversion of a document to a change stream event.

Having looked at your pipeline, it does appear to be for filtering / limiting the initial set of events for the copying process. This is not a scenario that was necessarily envisaged when developing this feature. However, it definitely is something we can look to support in a future release. This most likely will require the addition of a new configuration parameter for a pipeline just for the copying stage. I think that would solve the issue here.

Thanks again for your feedback and helping make the Mongo Kafka Connector even better.

Ross

Comment by Urvish Saraiya [ 03/Aug/20 ]

The current functionality does a collection scan which is highly inefficient

You can see an example of query plan. Even if we try to load entire collection, it will take a lot of time.

Can you share some statistics on the testing of this feature like it took 10 hours to load 50 million documents ?

How would you filter documents to avoid Collection scan ?

MongoDB Enterprise uscore01:SECONDARY> db.abc.explain().aggregate( [ { $replaceRoot: { newRoot: { _id: { _id: "$_id", copyingData: true }, operationType: "insert", ns: { db: "center0", coll: "abc" }, documentKey: { _id: "$_id" }, fullDocument: "$$ROOT" } } }, { $match: { "fullDocument.createdDate": { $gt: new Date(1596350748791) } } }, { $project: { updateDescription: 0 } } ])
{
	"stages" : [
		{
			"$cursor" : {
				"query" : {				},
				"queryPlanner" : {
					"plannerVersion" : 1,
					"namespace" : "db.abc",
					"indexFilterSet" : false,
					"parsedQuery" : {					},
					"winningPlan" : {
						"stage" : "COLLSCAN",
						"direction" : "forward"
					},
					"rejectedPlans" : [ ]
				}
			}
		},
		{
			"$replaceRoot" : {
				"newRoot" : {
					"_id" : {
						"_id" : "$_id",
						"copyingData" : {
							"$const" : true
						}
					},
					"operationType" : {
						"$const" : "insert"
					},
					"ns" : {
						"db" : {
							"$const" : "center0"
						},
						"coll" : {
							"$const" : "notification"
						}
					},
					"documentKey" : {
						"_id" : "$_id"
					},
					"fullDocument" : "$$ROOT"
				}
			}
		},
		{
			"$match" : {
				"fullDocument.createdDate" : {
					"$gt" : ISODate("2020-08-02T06:45:48.791Z")
				}
			}
		},
		{
			"$project" : {
				"updateDescription" : false
			}
		}
	],
	"ok" : 1,
	"operationTime" : Timestamp(1596487352, 75),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1596487352, 75),
		"signature" : {
			"hash" : BinData(0,"REDACTED="),
			"keyId" : NumberLong("REDACTED")
		}
	}
} 

command center0.abc command: aggregate { aggregate: "abc", pipeline: [ { $replaceRoot: { newRoot: { _id: { _id: "$_id", copyingData: true }, operationType: "insert", ns: { db: "center0", coll: "abc" }, documentKey: { _id: "$_id" }, fullDocument: "$$ROOT" } } }, { $match: { fullDocument.createdDate: { $gt: new Date(1596350748791) } } }, { $project: { updateDescription: 0 } } ], cursor: {}, $db: "center0", $clusterTime: { clusterTime: Timestamp(1596415285, 333), signature: { hash: BinData(0, REDACTED), keyId: REDACTED } }, lsid: { id: UUID("e63b613c-5c7b-4455-89ad-17dc279cd088") } } planSummary: COLLSCAN cursorid:8388662097805689831 keysExamined:0 docsExamined:61935946 numYields:1635016 nreturned:101 reslen:3370489 locks:{ Global: { acquireCount: { r: 5429114 } }, Database: { acquireCount: { r: 2714557 } }, Collection: { acquireCount: { r: 2714557 } } } protocol:op_msg 39121496ms 

 

If we change the order in pipeline and have match upfront than it does the IXSCAN , see below

 

MongoDB Enterprise db:PRIMARY> db.abc.explain("executionStats").aggregate([{$match: {createdDate:{$gt:new ISODate("2019-03-01T13:44:54.791Z"), $lt: new ISODate("2020-07-23T13:44:54.791Z")}}},{$replaceRoot: { newRoot: { _id: { _id: "$_id", copyingData: true }, operationType: "insert", ns: { db: "center0", coll: "abc" }, documentKey: { _id: "$_id" }, fullDocument: "$$ROOT" } }}])
{
	"stages" : [
		{
			"$cursor" : {
				"query" : {
					"createdDate" : {
						"$gt" : ISODate("2019-03-01T13:44:54.791Z"),
						"$lt" : ISODate("2020-07-23T13:44:54.791Z")
					}
				},
				"queryPlanner" : {
					"plannerVersion" : 1,
					"namespace" : "db.abc",
					"indexFilterSet" : false,
					"parsedQuery" : {
						"$and" : [
							{
								"createdDate" : {
									"$lt" : ISODate("2020-07-23T13:44:54.791Z")
								}
							},
							{
								"createdDate" : {
									"$gt" : ISODate("2019-03-01T13:44:54.791Z")
								}
							}
						]
					},
					"winningPlan" : {
						"stage" : "FETCH",
						"inputStage" : {
							"stage" : "IXSCAN",
							"keyPattern" : {
								"createdDate" : -1
							},
							"indexName" : "createdDate_-1",
							"isMultiKey" : false,
							"multiKeyPaths" : {
								"createdDate" : [ ]
							},
							"isUnique" : false,
							"isSparse" : false,
							"isPartial" : false,
							"indexVersion" : 1,
							"direction" : "forward",
							"indexBounds" : {
								"createdDate" : [
									"(new Date(1595511894791), new Date(1551447894791))"
								]
							}
						}
					},
					"rejectedPlans" : [ ]
				},
				"executionStats" : {
					"executionSuccess" : true,
					"nReturned" : 2905239,
					"executionTimeMillis" : 51124,
					"totalKeysExamined" : 2905239,
					"totalDocsExamined" : 2905239,
					"executionStages" : {
						"stage" : "FETCH",
						"nReturned" : 2905239,
						"executionTimeMillisEstimate" : 50582,
						"works" : 2905240,
						"advanced" : 2905239,
						"needTime" : 0,
						"needYield" : 0,
						"saveState" : 22900,
						"restoreState" : 22900,
						"isEOF" : 1,
						"invalidates" : 0,
						"docsExamined" : 2905239,
						"alreadyHasObj" : 0,
						"inputStage" : {
							"stage" : "IXSCAN",
							"nReturned" : 2905239,
							"executionTimeMillisEstimate" : 1530,
							"works" : 2905240,
							"advanced" : 2905239,
							"needTime" : 0,
							"needYield" : 0,
							"saveState" : 22900,
							"restoreState" : 22900,
							"isEOF" : 1,
							"invalidates" : 0,
							"keyPattern" : {
								"createdDate" : -1
							},
							"indexName" : "createdDate_-1",
							"isMultiKey" : false,
							"multiKeyPaths" : {
								"createdDate" : [ ]
							},
							"isUnique" : false,
							"isSparse" : false,
							"isPartial" : false,
							"indexVersion" : 1,
							"direction" : "forward",
							"indexBounds" : {
								"createdDate" : [
									"(new Date(1595511894791), new Date(1551447894791))"
								]
							},
							"keysExamined" : 2905239,
							"seeks" : 1,
							"dupsTested" : 0,
							"dupsDropped" : 0,
							"seenInvalidated" : 0
						}
					}
				}
			}
		},
		{
			"$replaceRoot" : {
				"newRoot" : {
					"_id" : {
						"_id" : "$_id",
						"copyingData" : {
							"$const" : true
						}
					},
					"operationType" : {
						"$const" : "insert"
					},
					"ns" : {
						"db" : {
							"$const" : "center0"
						},
						"coll" : {
							"$const" : "notification"
						}
					},
					"documentKey" : {
						"_id" : "$_id"
					},
					"fullDocument" : "$$ROOT"
				}
			}
		}
	],
	"ok" : 1,
	"operationTime" : Timestamp(1596485978, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1596485978, 1),
		"signature" : {
			"hash" : BinData(0,"GcDfjc9e1PiCP17T3FA7F5zP4kA="),
			"keyId" : NumberLong("6815180618402365454")
		}
	}
} 

Generated at Thu Feb 08 09:05:39 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.