[KAFKA-265] Add support for copy existing to set allow disk use flag Created: 09/Nov/21  Updated: 28/Oct/23  Resolved: 19/Jan/22

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.6.1
Fix Version/s: 1.7.0

Type: Bug Priority: Unknown
Reporter: Nitin Kapoor Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: Bug, external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Documentation Changes: Needed
Documentation Changes Summary:

Added a new source configuration for the copy existing process.

Added: copy.existing.allow.disk.use defaults to true.


 Description   

Add copy.existing.allow.disk.use configuration.

Allow the copy existing aggregation to use temporary disk storage if required. Defaults to true but can be disabled if the user doesn't have the permissions
for disk access.

============
Was: Exceeded memory limit for $group, but didn't allow external sort

Hi Team,

The Mongo Src connector is failing with the error :

 

connect | org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 16945 (Location16945): 'Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.' on server inclusio-1-shard-00-02.kwe2e.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1636449949, "i": 857}}, "ok": 0.0, "errmsg": "Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.", "code": 16945, "codeName": "Location16945", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1636449949, "i": 857}}, "signature": {"hash": {"$binary": {"base64": "TZhmmTYxktZ6ivXdEWZkImFgqVA=", "subType": "00"}}, "keyId": 6991155560347336706}}} 
connect | Caused by: com.mongodb.MongoCommandException: Command failed with error 16945 (Location16945): 'Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.' on server inclusio-1-shard-00-02.kwe2e.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1636449949, "i": 857}}, "ok": 0.0, "errmsg": "Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.", "code": 16945, "codeName": "Location16945", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1636449949, "i": 857}}, "signature": {"hash": {"$binary": {"base64": "TZhmmTYxktZ6ivXdEWZkImFgqVA=", "subType": "00"}}, "keyId": 6991155560347336706}}}

The Src connector properties are fairly simple

name = prodMongoSrc
connector.class = com.mongodb.kafka.connect.MongoSourceConnector
tasks.max = 1
errors.retry.timeout = -1
errors.retry.delay.max.ms = 10000
errors.log.enable = true
errors.log.include.messages = true
connection.uri = mongodb+srv:/<######>
topic.prefix = prod
copy.existing = true
offset.partition.name = partition-03

I have hashed the credentials.

When i relaunch it works but again fails after sometimes.

Looking for support and help on this



 Comments   
Comment by Nitin Kapoor [ 20/Jan/22 ]

Thanks @Ross , for the resolution , will surely update and try .

I will keep you posted on the results and feedback.

Again really appreciate the quick resolution and fix.

Comment by Ross Lawley [ 19/Jan/22 ]

Hi nitin.kapoor@inclusio.io,

A configuration will be included and defaulted to true in the 1.7.0 release of the connector, which should be released in the near future.

Ross

Comment by Githook User [ 19/Jan/22 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Use the default value from the configuration in MongoCopyDataManagerTest

KAFKA-265
Branch: master
https://github.com/mongodb/mongo-kafka/commit/febaad85c57953e74600fcd8c3c27cc2dc7b70d3

Comment by Githook User [ 19/Jan/22 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Added `copy.existing.allow.disk.use` configuration

Allows the copy existing aggregation to use temporary
disk storage if required. Defaults to true but can be
disabled if the user doesn't have the permissions
for disk access.

KAFKA-265
Branch: master
https://github.com/mongodb/mongo-kafka/commit/3faf4ddde1f78b998d7255f1362e8d5f50946122

Comment by Nitin Kapoor [ 18/Nov/21 ]

Hi Ross,

The mongo DB version is v4.2.17 and its hosted on mongo cloud , its a M10 cluster.

Best Regards

Nitin

Comment by Ross Lawley [ 17/Nov/21 ]

Thanks nitin.kapoor@inclusio.io,

Can you confirm the MongoDB version you are using?

The connector doesn't use any of the listed operators listed in the aggregation pipeline limits section of the documentation. So I'm surprised that it's hitting the limitation and need to check if this is a known issue and if the documentation requires updating.

Ross

Comment by Nitin Kapoor [ 17/Nov/21 ]

Hi Ross,

This issue comes when the "copy existing data" flag is true .

Not when its false.

Best Regards

Nitin

Comment by Ross Lawley [ 17/Nov/21 ]

Hi nitin.kapoor@inclusio.io,

Thanks for confirming. This is strange as the connector doesn't use a $group stage and we haven't had this reported before by other users.

Do you know if the error occurs during the copying existing data process?
Also what version of MongoDB are you running?

Ross

Comment by Nitin Kapoor [ 17/Nov/21 ]

Hi ross@mongodb.com

Thanks for taking a look at it , We are not using any pipelines .

The event stream used is the basic one from Mongo DB Changestream . I checked online , it was mentioned that we can use "allowDiskUse" flag to work around this , but there is no option for me to configure it in the Src/Sink connectors.

It would be a bug help if you can suggest something here .

Best Regards
Nitin

Comment by Ross Lawley [ 16/Nov/21 ]

Hi nitin.kapoor@inclusio.io,

Thanks for the ticket. A $group stage on essentially an infinite stream seems unusual. Are you providing your own pipeline to kafka?

Kind Regards,

Ross

Comment by Esha Bhargava [ 11/Nov/21 ]

nitin.kapoor@inclusio.io Thank you for reporting the issue! We'll look into it and get back to you soon.

Comment by Nitin Kapoor [ 09/Nov/21 ]

The mongo cluster used is M10 which supports these aggregations

Generated at Thu Feb 08 09:05:58 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.