[KAFKA-57] Allow restart sink connector in case of drop and recreate of collection, i.e. proper handling of invalidate events Created: 21/Aug/19 Updated: 28/Oct/23 Resolved: 06/Dec/19 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | None |
| Affects Version/s: | 0.2 |
| Fix Version/s: | 1.0 |
| Type: | Improvement | Priority: | Major - P3 |
| Reporter: | Christian Kurze (Inactive) | Assignee: | Ross Lawley |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Epic Link: | Resilient Source Connector |
| Description |
|
I dropped a collection and recreated it with the same name. The startup of the source connector cannot happen, due to an invalid resume token: {{[2019-08-21 09:23:02,602] ERROR WorkerSourceTask {id=mongo-atlas-democluster-inventory-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.' on server democluster-shard-00-02-9xl5i.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "ok": 0.0, "errmsg": "Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "signature": {"hash": {"$binary": "8KpkHutFLiWBW8QQNNCS9bxIS0c=", "$type": "00"}, "keyId": {"$numberLong": "6683096432586522625"}}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:243) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:234) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:137) at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:196) at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:192) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:462) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:406) at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:192) at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:265) at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179) at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132) at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86) at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:203) at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:92) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-08-21 09:23:02,608] ERROR WorkerSourceTask{id=mongo-atlas-democluster-inventory-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) ] to democluster-shard-00-02-9xl5i.mongodb.net:27017 because the pool has been closed. (org.mongodb.driver.connection:71) The source connector should listen to invalidate events and clear the resume token (plus necessary logging). What additional cases could appear that we should handle in case of invalidate events? |
| Comments |
| Comment by Githook User [ 06/Dec/19 ] |
|
Author: {'email': 'ross.lawley@gmail.com', 'name': 'Ross Lawley', 'username': 'rozza'}Message: Resilient source Can now support non-existent collections / databases
|
| Comment by Ross Lawley [ 28/Nov/19 ] |