Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-57

Allow restart sink connector in case of drop and recreate of collection, i.e. proper handling of invalidate events

    XMLWordPrintableJSON

Details

    • Icon: Improvement Improvement
    • Resolution: Fixed
    • Icon: Major - P3 Major - P3
    • 1.0
    • 0.2
    • None
    • None

    Description

      I dropped a collection and recreated it with the same name. The startup of the source connector cannot happen, due to an invalid resume token:

      {{[2019-08-21 09:23:02,602] ERROR WorkerSourceTask

      {id=mongo-atlas-democluster-inventory-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
      com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.' on server democluster-shard-00-02-9xl5i.mongodb.net:27017. The full response is {"operationTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "ok": 0.0, "errmsg": "Attempted to resume a stream on a collection which has been dropped. The change stream's pipeline may need to make comparisons which should respect the collection's default collation, which can no longer be determined.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1566379378, "i": 1}}, "signature": {"hash": {"$binary": "8KpkHutFLiWBW8QQNNCS9bxIS0c=", "$type": "00"}, "keyId": {"$numberLong": "6683096432586522625"}}}}
      at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:243)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:234)
      at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:137)
      at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:196)
      at com.mongodb.operation.AggregateOperationImpl$1.call(AggregateOperationImpl.java:192)
      at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:462)
      at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:406)
      at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:192)
      at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:265)
      at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
      at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179)
      at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:132)
      at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:86)
      at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:203)
      at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:92)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      [2019-08-21 09:23:02,608] ERROR WorkerSourceTask{id=mongo-atlas-democluster-inventory-source-0}

      Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
      [2019-08-21 09:23:02,612] INFO Closed connection [connectionId

      {localValue:4, serverValue:531}

      ] to democluster-shard-00-02-9xl5i.mongodb.net:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
      }}

      The source connector should listen to invalidate events and clear the resume token (plus necessary logging). What additional cases could appear that we should handle in case of invalidate events?

      Attachments

        Activity

          People

            ross@mongodb.com Ross Lawley
            christian.kurze@mongodb.com Christian Kurze (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: