Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-317

Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument

    XMLWordPrintableJSON

Details

    • Icon: Question Question
    • Resolution: Gone away
    • Icon: Unknown Unknown
    • None
    • None
    • None
    • None

    Description

      Hi, I try to use kafka connect to save data to mongodb. I also use schema registry to de/serialize avro messages from kafka. It's works fine in my java app but when I try to create MongoSinkConnector i run on this error

      [2022-06-13 16:29:47,136] ERROR Unable to process record SinkRecord\{kafkaOffset=863932, timestampType=CreateTime} ConnectRecord\{topic='zzz', kafkaPartition=1, key=null, keySchema=null, 
      value=java.nio.HeapByteBuffer[pos=0 lim=164 cap=164], valueSchema=Schema{BYTES}, timestamp=1655136458764, headers=ConnectHeaders(headers=[ConnectHeader(key=scst_partition, value=1, 
      schema=Schema\{INT8}), ConnectHeader(key=contentType, value=application/vnd.networkpacket.v1+avro, schema=Schema\{STRING}), ConnectHeader(key=spring_json_header_types, value=\
      {scst_partition=java.lang.Integer, contentType=java.lang.String}, schema=Schema\{MAP})])} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData) org.apache.kafka.connect.errors.DataException: 
      Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) 
      at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
      java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
      (MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
      com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at 
      org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at 
      java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at 
      java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer at 
      com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,136] ERROR WorkerSinkTask\{id=mongo-sink-0} Task threw an 
      uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. 
      (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
      java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
      (MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
      com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at 
      org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at 
      java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
      java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at 
      java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer at 
      com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,137] ERROR WorkerSinkTask\{id=mongo-sink-0} Task threw an 
      uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting 
      WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at 
      org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at 
      java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at 
      java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Could 
      not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
      com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
      java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
      com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
      (MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
      com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
      org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) ... 10 more Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected 
      object type: java.nio.HeapByteBuffer at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at 
      com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
      com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,137] INFO Stopping MongoDB sink task 
      (com.mongodb.kafka.connect.sink.MongoSinkTask) [2022-06-13 16:29:47,139] INFO [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Revoke previously assigned partitions 
      zzz-1 pzzz-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
      

      Part of my configuration:

      {
         "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.uri":"mongodb://mongodb:27017",
         "database":"xxx",
         "collection":"yyy",
         "topics":"zzz",
         "value.converter":"io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url":"http://schema-registry:8081",
         "key.converter":"org.apache.kafka.connect.storage.StringConverter",
         "key.converter.schemas.enable":"false",
         "value.converter.schemas.enable":"false"
      }
      

      Attachments

        Activity

          People

            ross@mongodb.com Ross Lawley
            pawelwangryn0@gmail.com Paweł Wangryn
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: