Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-364

Kafka Connector: How to make delete.on.null work?

    • Type: Icon: Improvement Improvement
    • Resolution: Won't Do
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Sink
    • None

      Kafka Connector v1.10.0

      The user is trying to make the following configuration work:

      "config": {
           "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
           "tasks.max":"1",
           "topics": "'"$OUTPUT_TOPIC"'",
           "connection.uri":"mongodb://mongodb:27017",
           "database": "'"$ER_DB"'",
           "collection": "'"$ER_COLLECTION"'",
           "key.converter": "org.apache.kafka.connect.storage.StringConverter",
           "value.converter": "org.apache.kafka.connect.storage.StringConverter",
           "key.converter.schemas.enable": "false",
           "value.converter.schemas.enable": "false",
           "max.batch.size": "1000",
           "delete.on.null.values": "true",
           "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy",
           "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
           "document.id.strategy.partial.key.projection.list":"field1,field2",
           "document.id.strategy.partial.key.projection.type":"ALLOWLIST",
           "tasks.max": "1"
      ...
      

      The expectation is that if a message with NULL value arrives that should make the Sink connector to delete the matching document. This is expected to work because delete.on.null.values supports PartialKeyStrategy:

      However, what is happening instead is that an exception gets thrown:

      [2023-04-11 01:11:37,264] ERROR Unable to process record SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test-resolved-entities', kafkaPartition=0, key={"field1":"0","field2":"0123456789ab0123456789ab"}, keySchema=Schema{STRING}, value=null, valueSchema=Schema{STRING}, timestamp=1681175496937, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
      
      org.apache.kafka.connect.errors.DataException: Could not convert value `null` into a BsonDocument.
      	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
      	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
      	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
      	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
      	at java.base/java.util.Optional.ifPresent(Optional.java:183)
      	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
      	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
      	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
      	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
      	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
      	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:113)
      	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:91)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is NULL.
      	at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689)
      	at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721)
      	at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449)
      	at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
      	at org.bson.BsonDocument.parse(BsonDocument.java:66)
      	at com.mongodb.kafka.connect.sink.converter.StringRecordConverter.convert(StringRecordConverter.java:35)
      	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
      	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
      

      I am able to reproduce a similar behaviour in my own environment as:

      $ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property parse.key=true --property key.separator="|"
      >{ "field1": "a", "field2": "b" }|null
      

      Help Needed

      Could the Driver team clarify what we're missing here? Is it mean to work as described or is the conclusion drawn from the quoted documentation is incorrect?

            Assignee:
            valentin.kovalenko@mongodb.com Valentin Kavalenka
            Reporter:
            robert.walters@mongodb.com Robert Walters
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: