Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-149

After applying the update 'immutable' was found to have been altered

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 1.2.0
    • Component/s: Sink
    • None
    • Environment:
      kafka 2.13
      mongodb source & target versions are 3.6.8

      After initial sync complete, the connector terminates with the below error. Also aren't these errors should tolerate as per this https://docs.mongodb.com/kafka-connector/master/kafka-sink-properties/#dead-letter-queue-configuration-settings? My understanding of these DLQ is any such errors should tolerate and proceed to next message. But I don't see that behavior. 

       

      {
            "id": 1,
            "state": "FAILED",
            "worker_id": "x.x.x.x:9083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documents despite retrying\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:237)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:213)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:134)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1507)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:132)\n\tat java.base/java.util.HashMap.forEach(HashMap.java:1338)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:127)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\nCaused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongos-pool.adbephotos-next.com:27017. Write errors: [BulkWriteError{index=154, code=66, message=c, the (immutable) field 'wcd_guid' was found to have been altered to wcd_guid: \"4F7876CA5F06E8400A491622\"', details={}}]. \n\tat com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143)\n\tat com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)\n\tat com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282)\n\tat com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:205)\n\t... 16 more\n"
          }
      

       

       

      Source & Sink configuration:

       

       

      Source:
      {
        "name": "deb-mongo-source-accounts",
        "config": {
          "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
          "max.queue.size": "9056",
          "tasks.max": "2",
          "initial.sync.max.threads": "3",
          "mongodb.name": "deb_oz_mongo",
          "internal.key.converter.schemas.enable": "false",
          "collection.whitelist": "oz_next.accounts",
          "key.converter.schemas.enable": "false",
          "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "mongodb.members.auto.discover": "true",
          "internal.value.converter.schemas.enable": "false",
          "value.converter.schemas.enable": "false",
          "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "name": "deb-mongo-source-accounts",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "mongodb.hosts": "mongos-pool.adbephotos-next.com:27017",
          "linger.ms": "500"
        },
        "tasks": [
          {
            "connector": "deb-mongo-source-accounts",
            "task": 0
          },
          {
            "connector": "deb-mongo-source-accounts",
            "task": 1
          }
        ],
        "type": "source"
      }
       
      Sink:
      {
        "name": "deb-mongo-sink-accounts",
        "config": {
          "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
          "errors.log.include.messages": "true",
          "topics": "deb_oz_mongo.oz_next.accounts",
          "tasks.max": "2",
          "max.num.retries": "3",
          "collection": "poc_accounts",
          "errors.deadletterqueue.context.headers.enable": "true",
          "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
          "key.converter.schemas.enable": "false",
          "database": "poc_oz_next",
          "errors.deadletterqueue.topic.name": "dlq_poc_oz_next.poc_accounts",
          "value.converter.schemas.enable": "false",
          "connection.uri": "mongodb://mongos-pool.adbephotos-next.com:27017",
          "name": "deb-mongo-sink-accounts",
          "errors.tolerance": "all",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "max.batch.size": "16000",
          "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector",
          "retries.defer.timeout": "5000",
          "session.timeout.ms": "25000",
          "errors.log.enable": "true",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.projection.type": "blocklist",
          "value.projection.list": "trial, entitlements, settings"
        },
        "tasks": [
          {
            "connector": "deb-mongo-sink-accounts",
            "task": 0
          },
          {
            "connector": "deb-mongo-sink-accounts",
            "task": 1
          }
        ],
        "type": "sink"
      }
      

       

      Thanks,

      Rajaramesh

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            yaramati@adobe.com Rajaramesh Yaramati
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: