[KAFKA-182] Source Connector ability to skip bad resumeAfter token Created: 08/Jan/21 Updated: 27/Oct/23 Resolved: 11/Jan/21 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | Source |
| Affects Version/s: | 1.3.0 |
| Fix Version/s: | None |
| Type: | Improvement | Priority: | Major - P3 |
| Reporter: | Branden Makana | Assignee: | Ross Lawley |
| Resolution: | Works as Designed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Description |
|
As per how to recover from an invalid resume token, the only way to recover from an invalid token is to stop the connector, delete the saved offsets, and restart the connector. This will have the connector start a new change stream. This is a critical weakness, as it causes data loss when an unrecoverable event is encountered - for example, a change stream event that exceeds the default 16MB document size limit. The source connector already supports the ability to pass a pipeline; could it also be enhanced to allow passing one or more of the options db.watch() supports, such as startAtOperationTime, startAfter, or even resumeAfter? These values could be passed via the json configuration used to create the connector. Supporting startAfter would allow easily skipping a bad record, if the current error message could print out the complete resumeAfter token (I believe it truncates currently). |
| Comments |
| Comment by Branden Makana [ 27/Apr/21 ] | ||||||||||||||||||||||
|
Thanks Ross - I have opened https://jira.mongodb.org/browse/KAFKA-219 | ||||||||||||||||||||||
| Comment by Ross Lawley [ 27/Apr/21 ] | ||||||||||||||||||||||
|
The issue is the resumeToken is valid but the server fails to start the change stream due to its size limitation. Can I ask you to open a new ticket for that? I think we may need to look into both the server and connector for how to best deal with that scenario. Ross | ||||||||||||||||||||||
| Comment by Branden Makana [ 27/Apr/21 ] | ||||||||||||||||||||||
|
Hi Ross, I wonder if this could be re-opened, as it doesn't seem that errors.tolerance=all works for this scenario? Here's my connector config (errors part):
but when an event greater than the limit (here, 20mb) happens, the connector is stuck in an unrecoverable loop:
The behavior still exists on connector v1.5 and kafka connect 6.1.1. | ||||||||||||||||||||||
| Comment by Ross Lawley [ 11/Jan/21 ] | ||||||||||||||||||||||
|
Thanks for the ticket. There's a couple of improvements in the 1.0.3 release you may not be aware of:
By default startAfter is the preferred mechanism for a change stream (as long as the MongoDB version supports it). However, with an unrecoverable event such as a bigger than 16mb document size limit, the last saved offset by Kafka would be from before that error, meaning when retrying the change stream would load from the last valid position and hit the error once again. The work around in that scenario would be to use the errors.tolerance=all configuration. I hope that helps, Ross |