[KAFKA-364] Kafka Connector: How to make delete.on.null work? Created: 13/Apr/23  Updated: 26/Aug/23  Resolved: 13/Jul/23

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: None
Fix Version/s: None

Type: Improvement Priority: Major - P3
Reporter: Robert Walters Assignee: Valentin Kavalenka
Resolution: Won't Do Votes: 1
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
Case:

 Description   

Kafka Connector v1.10.0

The user is trying to make the following configuration work:

"config": {
     "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
     "tasks.max":"1",
     "topics": "'"$OUTPUT_TOPIC"'",
     "connection.uri":"mongodb://mongodb:27017",
     "database": "'"$ER_DB"'",
     "collection": "'"$ER_COLLECTION"'",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schemas.enable": "false",
     "value.converter.schemas.enable": "false",
     "max.batch.size": "1000",
     "delete.on.null.values": "true",
     "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy",
     "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
     "document.id.strategy.partial.key.projection.list":"field1,field2",
     "document.id.strategy.partial.key.projection.type":"ALLOWLIST",
     "tasks.max": "1"
...

The expectation is that if a message with NULL value arrives that should make the Sink connector to delete the matching document. This is expected to work because delete.on.null.values supports PartialKeyStrategy:

However, what is happening instead is that an exception gets thrown:

[2023-04-11 01:11:37,264] ERROR Unable to process record SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test-resolved-entities', kafkaPartition=0, key={"field1":"0","field2":"0123456789ab0123456789ab"}, keySchema=Schema{STRING}, value=null, valueSchema=Schema{STRING}, timestamp=1681175496937, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
 
org.apache.kafka.connect.errors.DataException: Could not convert value `null` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:113)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:91)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is NULL.
	at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689)
	at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
	at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449)
	at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
	at org.bson.BsonDocument.parse(BsonDocument.java:66)
	at com.mongodb.kafka.connect.sink.converter.StringRecordConverter.convert(StringRecordConverter.java:35)
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)

I am able to reproduce a similar behaviour in my own environment as:

$ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property parse.key=true --property key.separator="|"
>{ "field1": "a", "field2": "b" }|null

Help Needed

Could the Driver team clarify what we're missing here? Is it mean to work as described or is the conclusion drawn from the quoted documentation is incorrect?



 Comments   
Comment by Valentin Kavalenka [ 14/Jul/23 ]

I created https://github.com/mongodb/mongo-kafka/pull/140 to address the ambiguity in the error message.

Comment by Valentin Kavalenka [ 13/Jul/23 ]

The reported exception is unrelated to PartialKeyStrategy, delete.on.null.values=true, and is caused by SinkRecord.value() being "null" (double quotes are not part of the value, I use them here to express that I am talking about a string, and the specific string I am talking about is what is specified within the quotes) when value.converter=org.apache.kafka.connect.storage.StringConverter is used. Unfortunately, one cannot see from the error message whether the value is a Java null reference, or "null", but based on the code, it is clear that this exception can happen only if the value is "null". Additionally, I was able to reproduce the exception with an integration test, and was able to show that delete.on.null.values=true works with PartialKeyStrategy if the value is a Java null reference.

The documentation of the MongoDB Kafka Connector misleadingly says the following: "Your sink connector must receive valid JSON strings from your Apache Kafka topic even when using a String converter." Despite "null" being a valid JSON text, it is not a valid value in the context of MongoDB Kafka Connector when org.apache.kafka.connect.storage.StringConverter is used. The reality is that when a string converter is used, the values must either be a null Java references or a string parseable by the org.bson.BsonDocument.parse method, i.e, "a string in MongoDB Extended JSON format". "null" is not a valid extended JSON text because extended JSON text is always a JSON object.

In other words, the exception is caused by a SinkRecord that is not valid given the value.converter=org.apache.kafka.connect.storage.StringConverter configuration. A custom post-processor added via post.processor.chain cannot be reliably used to fix this user-side issue because our post-processors are applied after converters.

Generated at Thu Feb 08 09:06:12 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.