[KAFKA-105] Support errors.tolerance Created: 06/May/20 Updated: 28/Oct/23 Resolved: 21/Sep/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | None |
| Affects Version/s: | 1.1 |
| Fix Version/s: | 1.3.0 |
| Type: | Improvement | Priority: | Major - P3 |
| Reporter: | Ross Lawley | Assignee: | Ross Lawley |
| Resolution: | Fixed | Votes: | 5 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Issue Links: |
|
||||||||||||||||||||||||||||||||
| Epic Link: | Error Handling | ||||||||||||||||||||||||||||||||
| Case: | (copied to CRM) | ||||||||||||||||||||||||||||||||
| Documentation Changes: | Needed | ||||||||||||||||||||||||||||||||
| Documentation Changes Summary: | There will be a change in configuration |
||||||||||||||||||||||||||||||||
| Description |
|
Look to support errors.tolerance configuration. For the Sink bulk write errors (eg: Duplicate key) should head to the DLQ if configured rather than stop the world. For the Source connector there is no DLQ - but perhaps invalid configurations could push the original data to a Sink style DLQ if any of the conversions fail. Also test messages over 16MB ( |
| Comments |
| Comment by Githook User [ 30/Sep/20 ] |
|
Author: {'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}Message: Ensure resume token missing errors respect errors.tolerance Throw an exception if there is no errors tolerance and
|
| Comment by Githook User [ 21/Sep/20 ] |
|
Author: {'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}Message: Support errors.tolerance Scenarios covered: Source
Sink
|
| Comment by Ross Lawley [ 16/Sep/20 ] |
| Comment by Ross Lawley [ 16/Jul/20 ] |
|
sabari.mgn@gmail.com no set release date for 1.3 but work has started on 1.3. |
| Comment by Sabari Gandhi [ 19/Jun/20 ] |
|
Hi Ross, Yes I see dead letter queue support isn't supported for source connectors: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ . But errors.tolerance can ensure that connectors are resilient for failures. Can you please share your thoughts whether it will be supported? Also, can you please provide a tentative release date for 1.3. |
| Comment by Sabari Gandhi [ 18/Jun/20 ] |
|
Hi Ross, Thanks for the response. yes, max.request.size also have to be configured on the broker side. But my question was even with the configuration (errors.tolerance: all) enabled in the connector it goes into a failed state. But was checking if the processing can still continue after the error. |
| Comment by Ross Lawley [ 17/Jun/20 ] |
|
sabari.mgn@gmail.com that is a KAFKA limitation of how it supports errors.tolerance and the dlq. |
| Comment by Sabari Gandhi [ 16/Jun/20 ] |
|
The source connector also lacks the support for errors.tolerance and dead letter queue. If there are conversion issues or max.request.size issue the connector fails. |
| Comment by Rajaramesh Yaramati [ 22/May/20 ] |
|
Moving my comment from https://jira.mongodb.org/browse/KAFKA-98 to here. My Environment: Kafka version 2.4.0 Source MongoDB 3.6.8
As per my understanding, fix should handle sink connector to continue with the next message after logging error message into logfile. Error from sink connector: [2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId ] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId ] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,260] ERROR [mongo-sink-assets-shard24-new|task-0] Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:184)com.mongodb.MongoBulkWriteException: Bulk write operation error on server xxx.xxx.xxx.xxx:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143) at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227) at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282) at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196) at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:180) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$null$2(MongoSinkTask.java:120) at java.base/java.util.ArrayList.forEach(ArrayList.java:1507) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:119) at java.base/java.util.HashMap.forEach(HashMap.java:1338) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:117) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:830)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteResult: AcknowledgedBulkWriteResult{insertedCount=0, matchedCount=38, removedCount=0, modifiedCount=9, upserts=[]} (com.mongodb.kafka.connect.sink.MongoSinkTask:185)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteErrors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}] (com.mongodb.kafka.connect.sink.MongoSinkTask:186)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteConcernError: null (com.mongodb.kafka.connect.sink.MongoSinkTask:187)[2020-05-20 20:50:49,262] ERROR [mongo-sink-assets-shard24-new|task-0] WorkerSinkTask{id=mongo-sink-assets-shard24-new-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server xxx.xxx.xxx.xxx:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}]. at com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:212) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:188)
Error from mongod log: 2020-05-20T20:33:37.358-0700 D NETWORK [conn658] Compressing message with snappy , u: { $set: { dna: { match: false, date: "2020-05-07T00:01:57.957556Z" }} }, multi: false, upsert: false } planSummary: IDHACK keysExamined:0 docsExamined:0 nMatched:0 nModified:0 numYields:0 locks:{ Global: { acquireCount: { r: 1, w: 1 }}, Database: { acquireCount: { w: 1 }}, Collection: { acquireCount: { w: 1 }} } 0ms , u: { $set: { revisions.12.renditions.thumbnail2x: { id: "1e5dbcda2d290412a0f88928821c310c", created_by: "d49d14c1bfb74dbf2334c027fe3c1c61", created: 1588809718024706, created_by_ip: "73.185.39.99, 10.92.211.1, 10.92.214.94, 34.213.157.249, 35.160.101.122" }, revisions.12.updated: 1588809718024706 } }, multi: false, upsert: false } planSummary: IDHACK exception: Cannot create field 'renditions' in element {12: null} code:PathNotViable numYields:0 locks:{ Global: { acquireCount: { r: 2, w: 2 }}, Database: { acquireCount: { w: 2 }}, Collection: { acquireCount: { w: 2 }} } 0ms , txnNumber: 35, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature: { hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }}, $client: { driver: { name: "mongo-java-driver|sync|mongo-kafka|sink", version: "3.12.4|1.1.0-5-g32f5458-dirty" }, os: { type: "Linux", name: "Linux", architecture: "amd64", version: "4.4.0-1095-aws" }, platform: "Java/Oracle Corporation/13.0.2+8", mongos: { host: "poc-config-mongos:27021", client: "10.74.1.240:33674", version: "3.6.8" } }, $configServerState: { opTime: { ts: Timestamp(1590032011, 1), t: 1 }}, $db: "poc_oz_prod" } numYields:0 reslen:484 locks:{ Global: { acquireCount: { r: 2, w: 2 }}, Database: { acquireCount: { w: 2 }}, Collection: { acquireCount: { w: 2 }} } protocol:op_msg 1ms , $replData: 1, $oplogQueryData: 1, $readPreference: { mode: "secondaryPreferred" }, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature: { hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }}, $db: "local" } |
| Comment by Martin Andersson [ 20/May/20 ] |
|
Duplicate of |