[KAFKA-182] Source Connector ability to skip bad resumeAfter token Created: 08/Jan/21  Updated: 27/Oct/23  Resolved: 11/Jan/21

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.3.0
Fix Version/s: None

Type: Improvement Priority: Major - P3
Reporter: Branden Makana Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

As per how to recover from an invalid resume token, the only way to recover from an invalid token is to stop the connector, delete the saved offsets, and restart the connector. This will have the connector start a new change stream.

This is a critical weakness, as it causes data loss when an unrecoverable event is encountered - for example, a change stream event that exceeds the default 16MB document size limit. 

The source connector already supports the ability to pass a pipeline; could it also be enhanced to allow passing one or more of the options db.watch() supports, such as startAtOperationTime, startAfter, or even resumeAfter? These values could be passed via the json configuration used to create the connector. Supporting startAfter would allow easily skipping a bad record, if the current error message could print out the complete resumeAfter token (I believe it truncates currently). 



 Comments   
Comment by Branden Makana [ 27/Apr/21 ]

Thanks Ross - I have opened https://jira.mongodb.org/browse/KAFKA-219

Comment by Ross Lawley [ 27/Apr/21 ]

Hi bmakana@indeed.com,

The issue is the resumeToken is valid but the server fails to start the change stream due to its size limitation.

Can I ask you to open a new ticket for that? I think we may need to look into both the server and connector for how to best deal with that scenario.

Ross

Comment by Branden Makana [ 27/Apr/21 ]

Hi Ross,

I wonder if this could be re-opened, as it doesn't seem that errors.tolerance=all works for this scenario? 

Here's my connector config (errors part): 

"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true"

but when an event greater than the limit (here, 20mb) happens, the connector is stuck in an unrecoverable loop: 

2021-04-27 03:35:42,633] INFO An exception occurred when trying to get the next item from the Change Stream: Query failed with error code 10334 and error message 'BSONObj size: 20793516 (0x13D48AC) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826087866E000000682B022C0100296E5A100411C5B2DDA8794637A32244ED8485B866463C5F70726F664B65792E5F616363744964003C3466353736383061003C5F70726F664B65792E5F..." }' on server mongo:27097 (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:44,107] INFO Watching for collection changes on 'the_collection' (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:44,108] INFO Resuming the change stream after the previous offset: {"_data": "826087866E0000005D2B022C0100296E5A100411C5B2DDA8794637A32244ED8485B866463C5F70726F664B65792E5F616363744964003C3338323730313430003C5F70726F664B65792E5F73003C70726F66696F003C5F6964003C316634386C6472633875347265383030000004"} (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:44,696] WARN Failed to resume change stream: BSONObj size: 20793516 (0x13D48AC) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826087866E000000682B022C0100296E5A100411C5B2DDA8794637A32244ED8485B866463C5F70726F664B65792E5F616363744964003C3466353736383061003C5F70726F664B65792E5F..." } 10334=====================================================================================
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.To restart the change stream with no resume token either: 
  * Create a new partition name using the `offset.partition.name` configuration.
  * Set `errors.tolerance=all` and ignore the erroring resume token. 
  * Manually remove the old offset from its configured storage.Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `copy.existing=true` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
 (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:49,093] INFO Watching for collection changes on 'the_collection' (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:49,094] INFO Resuming the change stream after the previous offset: {"_data": "826087866E0000005D2B022C0100296E5A100411C5B2DDA8794637A32244ED8485B866463C5F70726F664B65792E5F616363744964003C3338323730313430003C5F70726F664B65792E5F73003C70726F66696F003C5F6964003C316634386C6472633875347265383030000004"} (com.mongodb.kafka.connect.source.MongoSourceTask)
[2021-04-27 03:35:49,684] WARN Failed to resume change stream: BSONObj size: 20793516 (0x13D48AC) is invalid. Size must be betwe... (repeats until killed) 

The behavior still exists on connector v1.5 and kafka connect 6.1.1.

Comment by Ross Lawley [ 11/Jan/21 ]

Hi bmakana@indeed.com,

Thanks for the ticket. There's a couple of improvements in the 1.0.3 release you may not be aware of:

  • KAFKA-105 added source connector support for errors.tolerance=all so an invalid resume token / change stream event can be ignored / logged.
  • KAFKA-76 added Heartbeats to allow the Source connector to be able to consume post batch resume tokens (useful when watching a quiet collection in a busy database)

By default startAfter is the preferred mechanism for a change stream (as long as the MongoDB version supports it). However, with an unrecoverable event such as a bigger than 16mb document size limit, the last saved offset by Kafka would be from before that error, meaning when retrying the change stream would load from the last valid position and hit the error once again. The work around in that scenario would be to use the errors.tolerance=all configuration.

I hope that helps,

Ross

Generated at Thu Feb 08 09:05:47 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.