[KAFKA-105] Support errors.tolerance Created: 06/May/20  Updated: 28/Oct/23  Resolved: 21/Sep/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: 1.1
Fix Version/s: 1.3.0

Type: Improvement Priority: Major - P3
Reporter: Ross Lawley Assignee: Ross Lawley
Resolution: Fixed Votes: 5
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Duplicate
is duplicated by KAFKA-96 Source Connector: The resume token UU... Closed
is duplicated by KAFKA-127 Kafka Source connector handling docum... Closed
is duplicated by KAFKA-140 Add behavior.on.malformed.documents c... Closed
Related
related to KAFKA-115 Connector stop itself after db timeout. Closed
is related to KAFKA-215 New names for errors tolerance config... Closed
Epic Link: Error Handling
Case:
Documentation Changes: Needed
Documentation Changes Summary:

There will be a change in configuration


 Description   

Look to support errors.tolerance configuration.

For the Sink bulk write errors (eg: Duplicate key) should head to the DLQ if configured rather than stop the world.

For the Source connector there is no DLQ - but perhaps invalid configurations could push the original data to a Sink style DLQ if any of the conversions fail. Also test messages over 16MB (KAFKA-127)



 Comments   
Comment by Githook User [ 30/Sep/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Ensure resume token missing errors respect errors.tolerance

Throw an exception if there is no errors tolerance and
the resume token is missing.

KAFKA-105
Branch: master
https://github.com/mongodb/mongo-kafka/commit/a5b269c084b057fbe65ecebc263dcc37cc67d016

Comment by Githook User [ 21/Sep/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Support errors.tolerance

Scenarios covered:

Source

  • Missing / invalid / not found Resume Tokens (Integration test with a mocked offsetStorageReader)
  • Poison pill message - invalid schema

Sink

  • Poison pill messages - Invalid Key / Values types & invalid documents
  • Errors thrown by PostProcessors
  • Debezium CDC handler errors / poison pills

KAFKA-105
Branch: master
https://github.com/mongodb/mongo-kafka/commit/17ef77bc10174500f9629f26c04216a95d5bb0fc

Comment by Ross Lawley [ 16/Sep/20 ]

PR: https://github.com/mongodb/mongo-kafka/pull/38

Comment by Ross Lawley [ 16/Jul/20 ]

sabari.mgn@gmail.com no set release date for 1.3 but work has started on 1.3.

Comment by Sabari Gandhi [ 19/Jun/20 ]

Hi Ross, Yes I see dead letter queue support isn't supported for source connectors: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ . But errors.tolerance can ensure that connectors are resilient for failures. Can you please share your thoughts whether it will be supported? Also, can you please provide a tentative release date for 1.3.

Comment by Sabari Gandhi [ 18/Jun/20 ]

Hi Ross, Thanks for the response. yes, max.request.size also have to be configured on the broker side. But my question was even with the configuration (errors.tolerance: all) enabled in the connector it goes into a failed state. But was checking if the processing can still continue after the error. 

Comment by Ross Lawley [ 17/Jun/20 ]

sabari.mgn@gmail.com that is a KAFKA limitation of how it supports errors.tolerance and the dlq.

Comment by Sabari Gandhi [ 16/Jun/20 ]

The source connector also lacks the support for errors.tolerance and dead letter queue. If there are conversion issues or max.request.size issue the connector fails.

Comment by Rajaramesh Yaramati [ 22/May/20 ]

Moving my comment from https://jira.mongodb.org/browse/KAFKA-98 to here.

My Environment:

Kafka version 2.4.0

Source MongoDB 3.6.8
Target MongoDB 3.6.8
Source connector debezium
Sink connector MongoDB Kafka Sink Connector version 1.0

 

As per my understanding, fix should handle sink connector to continue with the next message after logging error message into logfile.

Error from sink connector:

[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId

{localValue:2}

] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId

{localValue:2}

] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,260] ERROR [mongo-sink-assets-shard24-new|task-0] Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:184)com.mongodb.MongoBulkWriteException: Bulk write operation error on server xxx.xxx.xxx.xxx:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143) at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227) at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282) at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196) at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:180) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$null$2(MongoSinkTask.java:120) at java.base/java.util.ArrayList.forEach(ArrayList.java:1507) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:119) at java.base/java.util.HashMap.forEach(HashMap.java:1338) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:117) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:830)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteResult: AcknowledgedBulkWriteResult{insertedCount=0, matchedCount=38, removedCount=0, modifiedCount=9, upserts=[]} (com.mongodb.kafka.connect.sink.MongoSinkTask:185)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteErrors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}] (com.mongodb.kafka.connect.sink.MongoSinkTask:186)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteConcernError: null (com.mongodb.kafka.connect.sink.MongoSinkTask:187)[2020-05-20 20:50:49,262] ERROR [mongo-sink-assets-shard24-new|task-0] WorkerSinkTask{id=mongo-sink-assets-shard24-new-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server xxx.xxx.xxx.xxx:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}]. at com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:212) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:188)

 

Error from mongod log:

2020-05-20T20:33:37.358-0700 D NETWORK [conn658] Compressing message with snappy
2020-05-20T20:33:37.359-0700 D WRITE [conn894] UpdateResult – upserted: {} modifiers: 1 existing: 0 numDocsModified: 0 numMatched: 0
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT rollback_transaction for snapshot id 980803764
2020-05-20T20:33:37.359-0700 I WRITE [conn894] update poc_oz_prod.poc_assets_new command: { q:

{ _id: "50efcb5b9d0a431cbe7c3162bac6374e" }

, u: { $set: { dna:

{ match: false, date: "2020-05-07T00:01:57.957556Z" }

} }, multi: false, upsert: false } planSummary: IDHACK keysExamined:0 docsExamined:0 nMatched:0 nModified:0 numYields:0 locks:{ Global: { acquireCount:

{ r: 1, w: 1 }

}, Database: { acquireCount:

{ w: 1 }

}, Collection: { acquireCount:

{ w: 1 }

} } 0ms
2020-05-20T20:33:37.359-0700 D QUERY [conn894] Using idhack: { _id: "84725dbcc52bd2019a08f0f9327e3fee" }
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT begin_transaction for snapshot id 980803766
2020-05-20T20:33:37.359-0700 D - [conn894] User Assertion: 28:Cannot create field 'renditions' in element {12: null} src/mongo/db/update/modifier_node.cpp 239
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT rollback_transaction for snapshot id 980803766
2020-05-20T20:33:37.359-0700 D WRITE [conn894] Caught Assertion in update: PathNotViable: Cannot create field 'renditions' in element {12: null}
2020-05-20T20:33:37.359-0700 I WRITE [conn894] update poc_oz_prod.poc_assets_new command: { q:

{ _id: "84725dbcc52bd2019a08f0f9327e3fee" }

, u: { $set: { revisions.12.renditions.thumbnail2x:

{ id: "1e5dbcda2d290412a0f88928821c310c", created_by: "d49d14c1bfb74dbf2334c027fe3c1c61", created: 1588809718024706, created_by_ip: "73.185.39.99, 10.92.211.1, 10.92.214.94, 34.213.157.249, 35.160.101.122" }

, revisions.12.updated: 1588809718024706 } }, multi: false, upsert: false } planSummary: IDHACK exception: Cannot create field 'renditions' in element {12: null} code:PathNotViable numYields:0 locks:{ Global: { acquireCount:

{ r: 2, w: 2 }

}, Database: { acquireCount:

{ w: 2 }

}, Collection: { acquireCount:

{ w: 2 }

} } 0ms
2020-05-20T20:33:37.359-0700 D REPL [conn894] Waiting for write concern. OpTime: { ts: Timestamp(1590032017, 49), t: 2 }, write concern: { w: 1, wtimeout: 0 }
2020-05-20T20:33:37.359-0700 I COMMAND [conn894] command poc_oz_prod.$cmd command: update { update: "poc_assets_new", bypassDocumentValidation: false, ordered: true, stmtIds: [ 0, 1, 2, 3, 4 ], updates: 5, shardVersion: [ Timestamp(1, 92037), ObjectId('5ea31e40cd8c9318b72b5e16') ], lsid:

{ id: UUID("0a2dba1e-be97-4eb8-aff8-978182ff58c6"), uid: BinData(0, E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855) }

, txnNumber: 35, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature:

{ hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }

}, $client: { driver:

{ name: "mongo-java-driver|sync|mongo-kafka|sink", version: "3.12.4|1.1.0-5-g32f5458-dirty" }

, os: { type: "Linux", name: "Linux", architecture: "amd64", version: "4.4.0-1095-aws" }, platform: "Java/Oracle Corporation/13.0.2+8", mongos: { host: "poc-config-mongos:27021", client: "10.74.1.240:33674", version: "3.6.8" } }, $configServerState: { opTime:

{ ts: Timestamp(1590032011, 1), t: 1 }

}, $db: "poc_oz_prod" } numYields:0 reslen:484 locks:{ Global: { acquireCount:

{ r: 2, w: 2 }

}, Database: { acquireCount:

{ w: 2 }

}, Collection: { acquireCount:

{ w: 2 }

} } protocol:op_msg 1ms
2020-05-20T20:33:37.359-0700 D NETWORK [conn894] Compressing message with snappy
2020-05-20T20:33:37.361-0700 D NETWORK [conn658] Decompressing message with snappy
2020-05-20T20:33:37.361-0700 D COMMAND [conn658] run command local.$cmd { getMore: 38469818473, collection: "oplog.rs", batchSize: 13981010, maxTimeMS: 5000, term: 2, lastKnownCommittedOpTime:

{ ts: Timestamp(1590032017, 48), t: 2 }

, $replData: 1, $oplogQueryData: 1, $readPreference: { mode: "secondaryPreferred" }, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature:

{ hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }

}, $db: "local" }
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] NamespaceUUIDCache: registered namespace local.oplog.rs with UUID 86258ee6-3d26-4417-886e-fcaaf736a2c4
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT begin_transaction for snapshot id 980803768
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT rollback_transaction for snapshot id 980803768
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT begin_transaction for snapshot id 980803769
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT rollback_transaction for snapshot id 980803769
2020-05-20T20:33:37.365-0700 D NETWORK [conn885] Decompressing message with snappy

Comment by Martin Andersson [ 20/May/20 ]

Duplicate of KAFKA-78?

Generated at Thu Feb 08 09:05:35 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.