[KAFKA-57] Allow restart sink connector in case of drop and recreate of collection, i.e. proper handling of invalidate events Created: 21/Aug/19  Updated: 28/Oct/23  Resolved: 06/Dec/19

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: 0.2
Fix Version/s: 1.0

Type: Improvement Priority: Major - P3
Reporter: Christian Kurze (Inactive) Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Epic Link: Resilient Source Connector

 Description   

I dropped a collection and recreated it with the same name. The startup of the source connector cannot happen, due to an invalid resume token:

{{[2019-08-21 09:23:02,602] ERROR WorkerSourceTask

{id=mongo-atlas-democluster-inventory-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.' on server democluster-shard-00-02-9xl5i.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "ok": 0.0, "errmsg": "Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "signature": {"hash": {"$binary": "8KpkHutFLiWBW8QQNNCS9bxIS0c=", "$type": "00"}, "keyId": {"$numberLong": "6683096432586522625"}}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:243)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:234)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:137)
at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:196)
at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:192)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:462)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:406)
at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:192)
at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:265)
at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179)
at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132)
at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86)
at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:203)
at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:92)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-08-21 09:23:02,608] ERROR WorkerSourceTask{id=mongo-atlas-democluster-inventory-source-0}

Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2019-08-21 09:23:02,612] INFO Closed connection [connectionId

{localValue:4, serverValue:531}

] to democluster-shard-00-02-9xl5i.mongodb.net:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
}}

The source connector should listen to invalidate events and clear the resume token (plus necessary logging). What additional cases could appear that we should handle in case of invalidate events?



 Comments   
Comment by Githook User [ 06/Dec/19 ]

Author:

{'email': 'ross.lawley@gmail.com', 'name': 'Ross Lawley', 'username': 'rozza'}

Message: Resilient source

Can now support non-existent collections / databases
And collection drops

KAFKA-57 KAFKA-59
Branch: master
https://github.com/mongodb/mongo-kafka/commit/f440809510ad5dc934a81ae0731b9d6646500703

Comment by Ross Lawley [ 28/Nov/19 ]

PR: https://github.com/rozza/mongo-kafka/pull/7

Generated at Thu Feb 08 09:05:28 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.