[KAFKA-98] MongoSinkTask stops with java.lang.NullPointerException Created: 20/Apr/20 Updated: 28/Oct/23 Resolved: 07/May/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | Sink |
| Affects Version/s: | 1.0 |
| Fix Version/s: | 1.2.0 |
| Type: | Bug | Priority: | Major - P3 |
| Reporter: | Rajaramesh Yaramati | Assignee: | Ross Lawley |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Environment: |
Source MongoDB 3.6.8 |
||
| Description |
|
I am doing migration of data about 330 million records and about after completion if 85% I got below exception from sink connector. { , ], Error from connect.log [2020-04-19 17:09:41,322] ERROR Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:181) ', details={}}]. I am using "writemodel.strategy" default value. Can you please suggest what could cause this? Thanks, Rajaramesh. |
| Comments |
| Comment by Ross Lawley [ 22/May/20 ] | ||||||
|
The NPE error has been fixed but not yet released, so this ticket has been closed. However, the cause remains and was due to the bulk operation failing. Please could you post your config in | ||||||
| Comment by Rajaramesh Yaramati [ 21/May/20 ] | ||||||
|
Ross Lawley, I just tested your changes related to "Fix possible NPE when logging" and I am still getting the same issue and sink connector terminates after retry limit reaches. As per my understanding, your fix should handle sink connector to continue with the next message after logging error message into logfile. I have log files to share with you but I am not able to attach here getting some token missing error. Error from sink connector: [2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId\{localValue:2}] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId\{localValue:2}] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,260] ERROR [mongo-sink-assets-shard24-new|task-0] Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:184)com.mongodb.MongoBulkWriteException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143) at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227) at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282) at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196) at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:180) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$null$2(MongoSinkTask.java:120) at java.base/java.util.ArrayList.forEach(ArrayList.java:1507) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:119) at java.base/java.util.HashMap.forEach(HashMap.java:1338) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:117) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:830)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteResult: AcknowledgedBulkWriteResult{insertedCount=0, matchedCount=38, removedCount=0, modifiedCount=9, upserts=[]} (com.mongodb.kafka.connect.sink.MongoSinkTask:185)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteErrors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}] (com.mongodb.kafka.connect.sink.MongoSinkTask:186)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteConcernError: null (com.mongodb.kafka.connect.sink.MongoSinkTask:187)[2020-05-20 20:50:49,262] ERROR [mongo-sink-assets-shard24-new|task-0] WorkerSinkTask{id=mongo-sink-assets-shard24-new-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element {12: null}', details={}}]. at com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:212) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:188)
Error from mongod log: 2020-05-20T20:33:37.358-0700 D NETWORK [conn658] Compressing message with snappy , u: { $set: { dna: { match: false, date: "2020-05-07T00:01:57.957556Z" }} }, multi: false, upsert: false } planSummary: IDHACK keysExamined:0 docsExamined:0 nMatched:0 nModified:0 numYields:0 locks:{ Global: { acquireCount: { r: 1, w: 1 }}, Database: { acquireCount: { w: 1 }}, Collection: { acquireCount: { w: 1 } } } 0ms , u: { $set: { revisions.12.renditions.thumbnail2x: { id: "1e5dbcda2d290412a0f88928821c310c", created_by: "d49d14c1bfb74dbf2334c027fe3c1c61", created: 1588809718024706, created_by_ip: "73.185.39.99, 10.92.211.1, 10.92.214.94, 34.213.157.249, 35.160.101.122" }, revisions.12.updated: 1588809718024706 } }, multi: false, upsert: false } planSummary: IDHACK exception: Cannot create field 'renditions' in element {12: null} code:PathNotViable numYields:0 locks:{ Global: { acquireCount: { r: 2, w: 2 }}, Database: { acquireCount: { w: 2 }}, Collection: { acquireCount: { w: 2 } } } 0ms , txnNumber: 35, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature: { hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }}, $client: { driver: { name: "mongo-java-driver|sync|mongo-kafka|sink", version: "3.12.4|1.1.0-5-g32f5458-dirty" }, os: { type: "Linux", name: "Linux", architecture: "amd64", version: "4.4.0-1095-aws" }, platform: "Java/Oracle Corporation/13.0.2+8", mongos: { host: "poc-config-mongos:27021", client: "10.74.1.240:33674", version: "3.6.8" } }, $configServerState: { opTime: { ts: Timestamp(1590032011, 1), t: 1 }}, $db: "poc_oz_prod" } numYields:0 reslen:484 locks:{ Global: { acquireCount: { r: 2, w: 2 }}, Database: { acquireCount: { w: 2 }}, Collection: { acquireCount: { w: 2 } } } protocol:op_msg 1ms , $replData: 1, $oplogQueryData: 1, $readPreference: { mode: "secondaryPreferred" }, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature: { hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 } }, $db: "local" } | ||||||
| Comment by Githook User [ 07/May/20 ] | ||||||
|
Author: {'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}Message: Fix possible NPE when logging
| ||||||
| Comment by Rajaramesh Yaramati [ 28/Apr/20 ] | ||||||
|
Thank you, Ross Lawley, for looking into and update. Do you know TBD of release 1.2.0? And just want to check, is there a plan to handle errors.tolerence in future releases?
Thanks, Rajaramesh. | ||||||
| Comment by Ross Lawley [ 28/Apr/20 ] | ||||||
|
Looks like there was a write error when trying to write the data to mongodb.
It looks like the NPE was caused by a logging error which will be fixed in the next release. Unfortunately, errors.tolerance=all does not include errors created inside a connector. Ross | ||||||
| Comment by Rajaramesh Yaramati [ 20/Apr/20 ] | ||||||
|
Also, I am using a parameter to errors.tolerance=all to continue the sink. But sink connector still stops upon this error. Here is my sink configuration if it is useful for you. { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "errors.log.include.messages": "true", "topics": "shard24.oz_mongo.oz_prod.assets", "tasks.max": "1", "max.num.retries": "3", "collection": "poc_assets", "internal.key.converter.schemas.enable": "false", "errors.deadletterqueue.context.headers.enable": "true", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "database": "poc_oz_prod", "errors.deadletterqueue.topic.name": "error-messages", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "connection.uri": "mongodb://xxx.xxx.xxx.xxx:27021", "name": "mongo-sink-assets-shard24", "errors.tolerance": "all", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "retries.defer.timeout": "5000", "session.timeout.ms": "25000", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } |