Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-57

Allow restart sink connector in case of drop and recreate of collection, i.e. proper handling of invalidate events

    • Type: Icon: Improvement Improvement
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 1.0
    • Affects Version/s: 0.2
    • Component/s: None
    • None

      I dropped a collection and recreated it with the same name. The startup of the source connector cannot happen, due to an invalid resume token:

      {{[2019-08-21 09:23:02,602] ERROR WorkerSourceTask

      {id=mongo-atlas-democluster-inventory-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
      com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.' on server democluster-shard-00-02-9xl5i.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "ok": 0.0, "errmsg": "Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "signature": {"hash": {"$binary": "8KpkHutFLiWBW8QQNNCS9bxIS0c=", "$type": "00"}, "keyId": {"$numberLong": "6683096432586522625"}}}}
      at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:243)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:234)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:137)
      at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:196)
      at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:192)
      at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:462)
      at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:406)
      at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:192)
      at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:265)
      at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
      at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179)
      at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132)
      at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86)
      at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:203)
      at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:92)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      [2019-08-21 09:23:02,608] ERROR WorkerSourceTask{id=mongo-atlas-democluster-inventory-source-0}

      Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
      [2019-08-21 09:23:02,612] INFO Closed connection [connectionId

      {localValue:4, serverValue:531}

      ] to democluster-shard-00-02-9xl5i.mongodb.net:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
      }}

      The source connector should listen to invalidate events and clear the resume token (plus necessary logging). What additional cases could appear that we should handle in case of invalidate events?

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            christian.kurze@mongodb.com Christian Kurze (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: