[2023-04-11 01:11:37,264] ERROR Unable to process record SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test-resolved-entities', kafkaPartition=0, key={"field1":"0","field2":"0123456789ab0123456789ab"}, keySchema=Schema{STRING}, value=null, valueSchema=Schema{STRING}, timestamp=1681175496937, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
|
|
org.apache.kafka.connect.errors.DataException: Could not convert value `null` into a BsonDocument.
|
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
|
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
|
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
|
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
|
at java.base/java.util.Optional.ifPresent(Optional.java:183)
|
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
|
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
|
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
|
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
|
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
|
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:113)
|
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:91)
|
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
|
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
|
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
|
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
|
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
|
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
|
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
|
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
|
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
|
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
|
at java.base/java.lang.Thread.run(Thread.java:829)
|
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is NULL.
|
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689)
|
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
|
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449)
|
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
|
at org.bson.BsonDocument.parse(BsonDocument.java:66)
|
at com.mongodb.kafka.connect.sink.converter.StringRecordConverter.convert(StringRecordConverter.java:35)
|
at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
|
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
|