-
Type: Bug
-
Resolution: Duplicate
-
Priority: Major - P3
-
None
-
Affects Version/s: 1.2.0
-
Component/s: Sink
-
None
-
Environment:kafka 2.13
mongodb source & target versions are 3.6.8
After initial sync complete, the connector terminates with the below error. Also aren't these errors should tolerate as per this https://docs.mongodb.com/kafka-connector/master/kafka-sink-properties/#dead-letter-queue-configuration-settings? My understanding of these DLQ is any such errors should tolerate and proceed to next message. But I don't see that behavior.
{ "id": 1, "state": "FAILED", "worker_id": "x.x.x.x:9083", "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documents despite retrying\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:237)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:213)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:134)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1507)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:132)\n\tat java.base/java.util.HashMap.forEach(HashMap.java:1338)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:127)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\nCaused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server mongos-pool.adbephotos-next.com:27017. Write errors: [BulkWriteError{index=154, code=66, message=c, the (immutable) field 'wcd_guid' was found to have been altered to wcd_guid: \"4F7876CA5F06E8400A491622\"', details={}}]. \n\tat com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202)\n\tat com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143)\n\tat com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)\n\tat com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282)\n\tat com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205)\n\tat com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196)\n\tat com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:205)\n\t... 16 more\n" }
Source & Sink configuration:
Source: { "name": "deb-mongo-source-accounts", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "max.queue.size": "9056", "tasks.max": "2", "initial.sync.max.threads": "3", "mongodb.name": "deb_oz_mongo", "internal.key.converter.schemas.enable": "false", "collection.whitelist": "oz_next.accounts", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "mongodb.members.auto.discover": "true", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "name": "deb-mongo-source-accounts", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "mongodb.hosts": "mongos-pool.adbephotos-next.com:27017", "linger.ms": "500" }, "tasks": [ { "connector": "deb-mongo-source-accounts", "task": 0 }, { "connector": "deb-mongo-source-accounts", "task": 1 } ], "type": "source" } Sink: { "name": "deb-mongo-sink-accounts", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "errors.log.include.messages": "true", "topics": "deb_oz_mongo.oz_next.accounts", "tasks.max": "2", "max.num.retries": "3", "collection": "poc_accounts", "errors.deadletterqueue.context.headers.enable": "true", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", "key.converter.schemas.enable": "false", "database": "poc_oz_next", "errors.deadletterqueue.topic.name": "dlq_poc_oz_next.poc_accounts", "value.converter.schemas.enable": "false", "connection.uri": "mongodb://mongos-pool.adbephotos-next.com:27017", "name": "deb-mongo-sink-accounts", "errors.tolerance": "all", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "max.batch.size": "16000", "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.BlockListValueProjector", "retries.defer.timeout": "5000", "session.timeout.ms": "25000", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.projection.type": "blocklist", "value.projection.list": "trial, entitlements, settings" }, "tasks": [ { "connector": "deb-mongo-sink-accounts", "task": 0 }, { "connector": "deb-mongo-sink-accounts", "task": 1 } ], "type": "sink" }
Thanks,
Rajaramesh
- duplicates
-
KAFKA-78 Publish error messages to a topic
- Closed