Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-89

Kafka source connector swallows change stream failures

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      If a Kafka source connector encounters a change stream failure, the failure is ignored and the connector continues to run.

      connector status:
      $ curl xxx.xxx.xxx.xxx:8083/connectors/mongo-ordertrackinginfo/status
      {{{"name":"mongo-ordertrackinginfo","connector":

      {"state":"RUNNING","worker_id":"[xxx.xxx.xxx.xxx:8083|http://xxx.xxx.xxx.xxx:8083/]"}

      ,"tasks":[

      {"id":0,"state":"RUNNING","worker_id":"[xxx.xxx.xxx.xxx:8083|http://xxx.xxx.xxx.xxx:8083/]"}

      ],"type":"source"}}}

      Logs:
      [2020-02-26 23:25:21,270] DEBUG Execution of command with request id 93193 failed to complete successfully in 292.53 ms on connection [connectionId\{localValue:2}] to server xxx.xxx.xxx.xxx:27017 (org.mongodb.driver.protocol.command)
      com.mongodb.MongoCommandException: Command failed with error 40576 (Location40576): 'Resume of change stream was not possible, as the resume point may
      no longer be in the oplog. ' on server xxx.xxx.xxx.xxx:27017. The full response is {"ok": 0.0, "errmsg": "Resume of change stream wa
      s not possible, as the resume point may no longer be in the oplog. ", "code": 40576, "codeName": "Location40576", "operationTime": {"$timestamp": {"t"
      : 1582759521, "i": 52, "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1582759521, "i": 594}}, "signature": {"hash": {"$binary": "AAAAAAAAAAAAA}}
      AAAAAAAAAAAAAA=", "$type": "00"}, "keyId": {"$numberLong": "0"}}}}
      at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:175)
      at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:303)
      at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)
      at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
      at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450)
      at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
      at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226)
      at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
      at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
      at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:260)
      at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)
      at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)
      at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:86)
      at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:83)
      at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:166)
      at com.mongodb.operation.ChangeStreamBatchCursor.tryNext(ChangeStreamBatchCursor.java:83)
      at com.mongodb.client.internal.MongoChangeStreamCursorImpl.tryNext(MongoChangeStreamCursorImpl.java:78)
      at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:320)
      at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:154)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            james.kovacs@mongodb.com James Kovacs
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: