[KAFKA-149] After applying the update 'immutable' was found to have been altered Created: 28/Aug/20  Updated: 28/Aug/20  Resolved: 28/Aug/20

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: 1.2.0
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Rajaramesh Yaramati Assignee: Ross Lawley
Resolution: Duplicate Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

kafka 2.13
mongodb source & target versions are 3.6.8


Issue Links:
Duplicate
duplicates KAFKA-78 Publish error messages to a topic Closed
Documentation Changes Summary:

 Description   

After initial sync complete, the connector terminates with the below error. Also aren't these errors should tolerate as per this https://docs.mongodb.com/kafka-connector/master/kafka-sink-properties/#dead-letter-queue-configuration-settings? My understanding of these DLQ is any such errors should tolerate and proceed to next message. But I don't see that behavior. 

 

Error formatting macro: code: java.lang.StackOverflowError

{
      "id": 1,
      "state": "FAILED",
      "worker_id": "x.x.x.x:9083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documents despite retrying\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:237)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:213)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:134)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1507)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:132)\n\tat java.base/java.util.HashMap.forEach(HashMap.java:1338)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:127)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\nCaused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongos-pool.adbephotos-next.com:27017. Write errors: [BulkWriteError{index=154, code=66, message=c, the (immutable) field 'wcd_guid' was found to have been altered to wcd_guid: \"4F7876CA5F06E8400A491622\"', details={}}]. \n\tat com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143)\n\tat com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)\n\tat com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282)\n\tat com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:205)\n\t... 16 more\n"
    }

 

 

Source & Sink configuration:

 

 

Source:
{
  "name": "deb-mongo-source-accounts",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "max.queue.size": "9056",
    "tasks.max": "2",
    "initial.sync.max.threads": "3",
    "mongodb.name": "deb_oz_mongo",
    "internal.key.converter.schemas.enable": "false",
    "collection.whitelist": "oz_next.accounts",
    "key.converter.schemas.enable": "false",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "mongodb.members.auto.discover": "true",
    "internal.value.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "name": "deb-mongo-source-accounts",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "mongodb.hosts": "mongos-pool.adbephotos-next.com:27017",
    "linger.ms": "500"
  },
  "tasks": [
    {
      "connector": "deb-mongo-source-accounts",
      "task": 0
    },
    {
      "connector": "deb-mongo-source-accounts",
      "task": 1
    }
  ],
  "type": "source"
}
 
Sink:
{
  "name": "deb-mongo-sink-accounts",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "errors.log.include.messages": "true",
    "topics": "deb_oz_mongo.oz_next.accounts",
    "tasks.max": "2",
    "max.num.retries": "3",
    "collection": "poc_accounts",
    "errors.deadletterqueue.context.headers.enable": "true",
    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
    "key.converter.schemas.enable": "false",
    "database": "poc_oz_next",
    "errors.deadletterqueue.topic.name": "dlq_poc_oz_next.poc_accounts",
    "value.converter.schemas.enable": "false",
    "connection.uri": "mongodb://mongos-pool.adbephotos-next.com:27017",
    "name": "deb-mongo-sink-accounts",
    "errors.tolerance": "all",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "max.batch.size": "16000",
    "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector",
    "retries.defer.timeout": "5000",
    "session.timeout.ms": "25000",
    "errors.log.enable": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.projection.type": "blocklist",
    "value.projection.list": "trial, entitlements, settings"
  },
  "tasks": [
    {
      "connector": "deb-mongo-sink-accounts",
      "task": 0
    },
    {
      "connector": "deb-mongo-sink-accounts",
      "task": 1
    }
  ],
  "type": "sink"
}

 

Thanks,

Rajaramesh

 



 Comments   
Comment by Ross Lawley [ 28/Aug/20 ]

Hi yaramati@adobe.com,

Please see the excellent blog post: Kafka Connect Deep Dive – Error Handling and Dead Letter Queues for more information about the Kafka deadletter queue feature. Specifically the "Where is error handling NOT provided by Kafka Connect?" section. As you can see from what is covered the put part of a source connector is not covered by Kafka's dead letter queues. KAFKA-78 aims to improve upon this and hopefully will extend the Sink and Source connector to explicitly add failure scenarios such as a failed write.

For future reference as this sounds like a question / support issue, I wanted to give you some resources to get this questioned answered more quickly:

  • our MongoDB support portal, located at support.mongodb.com
  • our MongoDB community portal, located here
  • If you are an Atlas customer, there is free support offered 24/7 in the lower right hand corner of the UI.

Just in case you have already opened a support case and are not receiving sufficient help, please let me know and I can facilitate escalating your issue.

Thank you!

Generated at Thu Feb 08 09:05:41 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.