[KAFKA-98] MongoSinkTask stops with java.lang.NullPointerException Created: 20/Apr/20  Updated: 28/Oct/23  Resolved: 07/May/20

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: 1.0
Fix Version/s: 1.2.0

Type: Bug Priority: Major - P3
Reporter: Rajaramesh Yaramati Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

Source MongoDB 3.6.8
Target MongoDB 3.6.8
Source connector debezium
Sink connector MongoDB Kafka Sink Connector



 Description   

I am doing migration of data about 330 million records and about after completion if 85% I got below exception from sink connector. 

{
"name": "mongo-sink-assets-shard24",
"connector":

{ "state": "RUNNING", "worker_id": "xxx.xxx.xxx.xxx:9083" }

,
"tasks": [

{ "id": 0, "state": "FAILED", "worker_id": "xxx.xxx.xxx.xxx:9083", "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: java.lang.NullPointerException\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:184)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1507)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)\n\tat java.base/java.util.HashMap.forEach(HashMap.java:1338)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n" }

],
"type": "sink"
}

Error from connect.log

[2020-04-19 17:09:41,322] ERROR Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:181)
com.mongodb.MongoBulkWriteException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=305, code=28, message='Cannot create field 'sha256' in element

{xmpCameraRaw: "<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="Adobe XMP Core 5.6-c140 79.160451, 2017/05/06-01:08:21 "> <rdf:RDF xmlns:rdf="http://www.w3.org/1..."}

', details={}}].
at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173)
at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202)
at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143)
at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227)
at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282)
at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196)
at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213)
at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476)
at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456)
at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:177)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1507)
at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
at java.base/java.util.HashMap.forEach(HashMap.java:1338)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:830)

I am using "writemodel.strategy" default value. 

Can you please suggest what could cause this? 

Thanks,

Rajaramesh. 



 Comments   
Comment by Ross Lawley [ 22/May/20 ]

Hi yaramati@adobe.com,

The NPE error has been fixed but not yet released, so this ticket has been closed.

However, the cause remains and was due to the bulk operation failing. KAFKA-105 & KAFKA-106 are tickets to look at error tolerance and handling of errors. The expected behaviour is the error should still be logged and Kafka itself should then process as per the error.tolerance configuration.

Please could you post your config in KAFKA-105 and the error there as that ticket is open and ready for future work. Also please state the Kafka version as well.

Comment by Rajaramesh Yaramati [ 21/May/20 ]

Ross Lawley,

I just tested your changes related to "Fix possible NPE when logging" and I am still getting the same issue and sink connector terminates after retry limit reaches. As per my understanding, your fix should handle sink connector to continue with the next message after logging error message into logfile.

I have log files to share with you but I am not able to attach here getting some token missing error.

Error from sink connector:

[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId\{localValue:2}] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,176] INFO [mongo-sink-assets-shard24-new|task-0] Opened connection [connectionId\{localValue:2}] to 10.74.1.50:27021 (org.mongodb.driver.connection:71)[2020-05-20 20:50:49,260] ERROR [mongo-sink-assets-shard24-new|task-0] Mongodb bulk write (partially) failed (com.mongodb.kafka.connect.sink.MongoSinkTask:184)com.mongodb.MongoBulkWriteException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:173) at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:202) at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:143) at com.mongodb.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:227) at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:282) at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:72) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:205) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:196) at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:501) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:196) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:71) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:213) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:476) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:456) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:180) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$null$2(MongoSinkTask.java:120) at java.base/java.util.ArrayList.forEach(ArrayList.java:1507) at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:119) at java.base/java.util.HashMap.forEach(HashMap.java:1338) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:117) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:830)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteResult: AcknowledgedBulkWriteResult{insertedCount=0, matchedCount=38, removedCount=0, modifiedCount=9, upserts=[]} (com.mongodb.kafka.connect.sink.MongoSinkTask:185)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteErrors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}] (com.mongodb.kafka.connect.sink.MongoSinkTask:186)[2020-05-20 20:50:49,261] ERROR [mongo-sink-assets-shard24-new|task-0] WriteConcernError: null (com.mongodb.kafka.connect.sink.MongoSinkTask:187)[2020-05-20 20:50:49,262] ERROR [mongo-sink-assets-shard24-new|task-0] WorkerSinkTask{id=mongo-sink-assets-shard24-new-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)org.apache.kafka.connect.errors.RetriableException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=67, code=28, message='Cannot create field 'renditions' in element

{12: null}

', details={}}]. at com.mongodb.kafka.connect.sink.MongoSinkTask.checkRetriableException(MongoSinkTask.java:212) at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:188)

 

Error from mongod log:

2020-05-20T20:33:37.358-0700 D NETWORK [conn658] Compressing message with snappy
2020-05-20T20:33:37.359-0700 D WRITE [conn894] UpdateResult – upserted: {} modifiers: 1 existing: 0 numDocsModified: 0 numMatched: 0
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT rollback_transaction for snapshot id 980803764
2020-05-20T20:33:37.359-0700 I WRITE [conn894] update poc_oz_prod.poc_assets_new command: { q:

{ _id: "50efcb5b9d0a431cbe7c3162bac6374e" }

, u: { $set: { dna:

{ match: false, date: "2020-05-07T00:01:57.957556Z" }

} }, multi: false, upsert: false } planSummary: IDHACK keysExamined:0 docsExamined:0 nMatched:0 nModified:0 numYields:0 locks:{ Global: { acquireCount:

{ r: 1, w: 1 }

}, Database: { acquireCount:

{ w: 1 }

}, Collection: { acquireCount:

{ w: 1 }

} } 0ms
2020-05-20T20:33:37.359-0700 D QUERY [conn894] Using idhack: { _id: "84725dbcc52bd2019a08f0f9327e3fee" }
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT begin_transaction for snapshot id 980803766
2020-05-20T20:33:37.359-0700 D - [conn894] User Assertion: 28:Cannot create field 'renditions' in element {12: null} src/mongo/db/update/modifier_node.cpp 239
2020-05-20T20:33:37.359-0700 D STORAGE [conn894] WT rollback_transaction for snapshot id 980803766
2020-05-20T20:33:37.359-0700 D WRITE [conn894] Caught Assertion in update: PathNotViable: Cannot create field 'renditions' in element {12: null}
2020-05-20T20:33:37.359-0700 I WRITE [conn894] update poc_oz_prod.poc_assets_new command: { q:

{ _id: "84725dbcc52bd2019a08f0f9327e3fee" }

, u: { $set: { revisions.12.renditions.thumbnail2x:

{ id: "1e5dbcda2d290412a0f88928821c310c", created_by: "d49d14c1bfb74dbf2334c027fe3c1c61", created: 1588809718024706, created_by_ip: "73.185.39.99, 10.92.211.1, 10.92.214.94, 34.213.157.249, 35.160.101.122" }

, revisions.12.updated: 1588809718024706 } }, multi: false, upsert: false } planSummary: IDHACK exception: Cannot create field 'renditions' in element {12: null} code:PathNotViable numYields:0 locks:{ Global: { acquireCount:

{ r: 2, w: 2 }

}, Database: { acquireCount:

{ w: 2 }

}, Collection: { acquireCount:

{ w: 2 }

} } 0ms
2020-05-20T20:33:37.359-0700 D REPL [conn894] Waiting for write concern. OpTime: { ts: Timestamp(1590032017, 49), t: 2 }, write concern: { w: 1, wtimeout: 0 }
2020-05-20T20:33:37.359-0700 I COMMAND [conn894] command poc_oz_prod.$cmd command: update { update: "poc_assets_new", bypassDocumentValidation: false, ordered: true, stmtIds: [ 0, 1, 2, 3, 4 ], updates: 5, shardVersion: [ Timestamp(1, 92037), ObjectId('5ea31e40cd8c9318b72b5e16') ], lsid:

{ id: UUID("0a2dba1e-be97-4eb8-aff8-978182ff58c6"), uid: BinData(0, E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855) }

, txnNumber: 35, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature:

{ hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }

}, $client: { driver:

{ name: "mongo-java-driver|sync|mongo-kafka|sink", version: "3.12.4|1.1.0-5-g32f5458-dirty" }

, os: { type: "Linux", name: "Linux", architecture: "amd64", version: "4.4.0-1095-aws" }, platform: "Java/Oracle Corporation/13.0.2+8", mongos: { host: "poc-config-mongos:27021", client: "10.74.1.240:33674", version: "3.6.8" } }, $configServerState: { opTime:

{ ts: Timestamp(1590032011, 1), t: 1 }

}, $db: "poc_oz_prod" } numYields:0 reslen:484 locks:{ Global: { acquireCount:

{ r: 2, w: 2 }

}, Database: { acquireCount:

{ w: 2 }

}, Collection: { acquireCount:

{ w: 2 }

} } protocol:op_msg 1ms
2020-05-20T20:33:37.359-0700 D NETWORK [conn894] Compressing message with snappy
2020-05-20T20:33:37.361-0700 D NETWORK [conn658] Decompressing message with snappy
2020-05-20T20:33:37.361-0700 D COMMAND [conn658] run command local.$cmd { getMore: 38469818473, collection: "oplog.rs", batchSize: 13981010, maxTimeMS: 5000, term: 2, lastKnownCommittedOpTime:

{ ts: Timestamp(1590032017, 48), t: 2 }

, $replData: 1, $oplogQueryData: 1, $readPreference: { mode: "secondaryPreferred" }, $clusterTime: { clusterTime: Timestamp(1590032017, 49), signature:

{ hash: BinData(0, 9782D442613737E518ABEAB61EC775B9B4E30F9B), keyId: 6818528695733452825 }

}, $db: "local" }
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] NamespaceUUIDCache: registered namespace local.oplog.rs with UUID 86258ee6-3d26-4417-886e-fcaaf736a2c4
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT begin_transaction for snapshot id 980803768
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT rollback_transaction for snapshot id 980803768
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT begin_transaction for snapshot id 980803769
2020-05-20T20:33:37.361-0700 D STORAGE [conn658] WT rollback_transaction for snapshot id 980803769
2020-05-20T20:33:37.365-0700 D NETWORK [conn885] Decompressing message with snappy

Comment by Githook User [ 07/May/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Fix possible NPE when logging

KAFKA-98
Branch: master
https://github.com/mongodb/mongo-kafka/commit/68ea3055c346d5a8b23b0b391c854d2c2a92431f

Comment by Rajaramesh Yaramati [ 28/Apr/20 ]

Thank you, Ross Lawley, for looking into and update. 

Do you know TBD of release 1.2.0? 

And just want to check, is there a plan to handle errors.tolerence in future releases? 

 

Thanks,

Rajaramesh.

Comment by Ross Lawley [ 28/Apr/20 ]

Hi yaramati@adobe.com,

 

Looks like there was a write error when trying to write the data to mongodb.

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 10.74.1.50:27021. Write errors: [BulkWriteError{index=305, code=28, message='Cannot create field 'sha256' in element
 
{xmpCameraRaw: "<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="Adobe XMP Core 5.6-c140 79.160451, 2017/05/06-01:08:21 "> <rdf:RDF xmlns:rdf="http://www.w3.org/1..."}
 
', details={}}].
 

It looks like the NPE was caused by a logging error which will be fixed in the next release.

Unfortunately, errors.tolerance=all does not include errors created inside a connector.

Ross

Comment by Rajaramesh Yaramati [ 20/Apr/20 ]

Also, I am using a parameter to errors.tolerance=all to continue the sink. But sink connector still stops upon this error. Here is my sink configuration if it is useful for you.

 

{ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "errors.log.include.messages": "true", "topics": "shard24.oz_mongo.oz_prod.assets", "tasks.max": "1", "max.num.retries": "3", "collection": "poc_assets", "internal.key.converter.schemas.enable": "false", "errors.deadletterqueue.context.headers.enable": "true", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", "key.converter.schemas.enable": "false", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "database": "poc_oz_prod", "errors.deadletterqueue.topic.name": "error-messages", "internal.value.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "connection.uri": "mongodb://xxx.xxx.xxx.xxx:27021", "name": "mongo-sink-assets-shard24", "errors.tolerance": "all", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "retries.defer.timeout": "5000", "session.timeout.ms": "25000", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter" }
Generated at Thu Feb 08 09:05:34 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.