-
Type: Improvement
-
Resolution: Won't Do
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: Sink
-
None
-
(copied to CRM)
Kafka Connector v1.10.0
The user is trying to make the following configuration work:
"config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "topics": "'"$OUTPUT_TOPIC"'", "connection.uri":"mongodb://mongodb:27017", "database": "'"$ER_DB"'", "collection": "'"$ER_COLLECTION"'", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "max.batch.size": "1000", "delete.on.null.values": "true", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy", "document.id.strategy.partial.key.projection.list":"field1,field2", "document.id.strategy.partial.key.projection.type":"ALLOWLIST", "tasks.max": "1" ...
The expectation is that if a message with NULL value arrives that should make the Sink connector to delete the matching document. This is expected to work because delete.on.null.values supports PartialKeyStrategy:
However, what is happening instead is that an exception gets thrown:
[2023-04-11 01:11:37,264] ERROR Unable to process record SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test-resolved-entities', kafkaPartition=0, key={"field1":"0","field2":"0123456789ab0123456789ab"}, keySchema=Schema{STRING}, value=null, valueSchema=Schema{STRING}, timestamp=1681175496937, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData) org.apache.kafka.connect.errors.DataException: Could not convert value `null` into a BsonDocument. at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:113) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:91) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is NULL. at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689) at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721) at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449) at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84) at org.bson.BsonDocument.parse(BsonDocument.java:66) at com.mongodb.kafka.connect.sink.converter.StringRecordConverter.convert(StringRecordConverter.java:35) at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
I am able to reproduce a similar behaviour in my own environment as:
$ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property parse.key=true --property key.separator="|" >{ "field1": "a", "field2": "b" }|null
Help Needed
Could the Driver team clarify what we're missing here? Is it mean to work as described or is the conclusion drawn from the quoted documentation is incorrect?