[KAFKA-131] Copy existing configuration with pipeline Created: 03/Aug/20 Updated: 28/Oct/23 Resolved: 08/Sep/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | Source |
| Affects Version/s: | 1.2.0 |
| Fix Version/s: | 1.3.0 |
| Type: | Improvement | Priority: | Major - P3 |
| Reporter: | Sabari Gandhi | Assignee: | Ross Lawley |
| Resolution: | Fixed | Votes: | 1 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Environment: |
Kafka Connector: 1.2.0 |
||
| Issue Links: |
|
||||||||||||
| Documentation Changes: | Needed | ||||||||||||
| Documentation Changes Summary: | Added a new configuration: copy.existing.pipeline=[{"$match": {"closed": "false"}}] An inline JSON array with objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient. Use if there is any filtering of collection data in the `pipeline` configuration to speed up the copying process |
||||||||||||
| Description |
|
We are trying to do copy existing data in huge collections(around 6 million documents). our requirement is such that we need a specific set of data and not all data. so in the configuration, we provide pipeline similar to:
Mongodb logs show the lookup seems to be very expensive. From the connector code, it looks up the entire collection and applies the filter https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java#L147 The pipeline configuration is added at the end so it looks up the entire collection and applies the data. Is there an option or a way to add the provided pipeline configuration at the beginning of the list. Also, please provide us other configuration option available to make the copy data effective. Thanks |
| Comments |
| Comment by Sabari Gandhi [ 01/Oct/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Hi Rob/Ross, we saw that 1.3.0 was released today. Thanks | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Sabari Gandhi [ 01/Oct/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Hi Robert, Thanks for providing us with snapshot build information for testing. Can you also please let us know the planned date of release this month? Thanks. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Robert Walters [ 15/Sep/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
You can test this now by grabbing the latest build here: We will officially be releasing 1.3 in October. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Urvish Saraiya [ 15/Sep/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Hi Ross, Any updates on when is 1.3.0 release going to be available for consumption ? Thanks, Urvish | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Githook User [ 08/Sep/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Author: {'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}Message: Added `copy.existing.pipeline` configuration. Allows indexes to be used during the copying process, use when there is any filtering done by the main pipeline
| |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Ross Lawley [ 03/Sep/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Urvish Saraiya [ 28/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Hi Ross, Any updates on when 1.3.0 release is coming out ? We are looking to use feature to load the existing data. Thanks, Urvish | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Ross Lawley [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
To clarify, the initial analysis you've done is correct, your pipeline query is not efficient for the copying process and is causing a full collection scan, which would increase the load on your system. Once the data is copied the pipeline wouldn't cause an issue with the change stream. So there is a feature missing to improve the efficiency of the copying process in your scenario. I plan on adding a new config to add a pipeline stage at the start of the copying data pipeline - which will enable the use of the indexes correctly. That should fix the issue for you. The 1.3.0 release is scheduled for this quarter so should be released sometime next month. I will update this ticket once this improvement has been added to source connector and SNAPSHOT builds will be available for testing. Ross | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Urvish Saraiya [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Thanks Ross for the response. Just wanted to clarify our scenario here, When we tried to do copy existing data we did not notice any flow of data or cursor getting created so we deleted the connector after 10 minutes. Than we looked into mongodb logs and found that query took 10 hours. See below log statement
So do you think that there would be initial delay of 10 hours for such a big collection ? We had concerns that such a long running query can cause other issues on operational db.
We did another test with 3 million records and we saw that it opened cursor immediately saw that data was flowing through continuously with out any initial delay.
Any explanation or insights why would data not flow for large collection ? I am fine if it takes more time to execute as far as we can throttle requests on to mongodb.
| |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Ross Lawley [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Copying a full 60 million document collection, isn't necessarily an issue if the pipeline is just a transformation of the documents shape. It will take time and that is dependent on the network latency. Although the copying process can utilize multiple threads, they are currently limited to 1 per namespace. That is all that is currently available. Collections could be partitioned so that multiple cursors could be used to copy the data but that also comes with a set of limitations. I've added KAFKA-135 to track that. The only workaround currently would be to write a custom kafka producer and manually handle the copying process. Ross | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Urvish Saraiya [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Thank you Ross for the prompt response. Do you know what is time line around the release ? Is there any workaround available ? If copy existing is used to bring over entire collection, but if the collection is large in our case it was 60 million records, still it won't work efficiently. Do you have any recommendation on how to go around this issue ? | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Ross Lawley [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Marking as an improvement as this will be a new feature. Scheduled for the 1.3 release of the connector. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Ross Lawley [ 04/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Hi sabari.mgn@gmail.com & saraiya.urvish@gmail.com, Thanks for the ticket and feedback. Unfortunately, the current mongodb query plan optimizer does not derive the intent here of the user supplied pipeline, when added to the conversion to a Change Stream Event pipeline. Given that the user supplied pipeline is intended to operate solely on the outputted change stream events it can't be set as the first pipeline stage. The only guaranteed way for it to work is for it to be applied after the conversion of a document to a change stream event. Having looked at your pipeline, it does appear to be for filtering / limiting the initial set of events for the copying process. This is not a scenario that was necessarily envisaged when developing this feature. However, it definitely is something we can look to support in a future release. This most likely will require the addition of a new configuration parameter for a pipeline just for the copying stage. I think that would solve the issue here. Thanks again for your feedback and helping make the Mongo Kafka Connector even better. Ross | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Comment by Urvish Saraiya [ 03/Aug/20 ] | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
The current functionality does a collection scan which is highly inefficient You can see an example of query plan. Even if we try to load entire collection, it will take a lot of time. Can you share some statistics on the testing of this feature like it took 10 hours to load 50 million documents ? How would you filter documents to avoid Collection scan ?
If we change the order in pipeline and have match upfront than it does the IXSCAN , see below
|