-
Type: Improvement
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: 0.2
-
Component/s: None
-
None
I dropped a collection and recreated it with the same name. The startup of the source connector cannot happen, due to an invalid resume token:
{{[2019-08-21 09:23:02,602] ERROR WorkerSourceTask
{id=mongo-atlas-democluster-inventory-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.' on server democluster-shard-00-02-9xl5i.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "ok": 0.0, "errmsg": "Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "signature": {"hash": {"$binary": "8KpkHutFLiWBW8QQNNCS9bxIS0c=", "$type": "00"}, "keyId": {"$numberLong": "6683096432586522625"}}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:243)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:234)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:137)
at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:196)
at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:192)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:462)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:406)
at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:192)
at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:265)
at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179)
at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132)
at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86)
at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:203)
at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:92)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-08-21 09:23:02,608] ERROR WorkerSourceTask{id=mongo-atlas-democluster-inventory-source-0}
Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2019-08-21 09:23:02,612] INFO Closed connection [connectionId
] to democluster-shard-00-02-9xl5i.mongodb.net:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
}}
The source connector should listen to invalidate events and clear the resume token (plus necessary logging). What additional cases could appear that we should handle in case of invalidate events?