[KAFKA-317] Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument Created: 20/Jun/22  Updated: 27/Oct/23  Resolved: 05/Jul/22

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Question Priority: Unknown
Reporter: Paweł Wangryn Assignee: Ross Lawley
Resolution: Gone away Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

Hi, I try to use kafka connect to save data to mongodb. I also use schema registry to de/serialize avro messages from kafka. It's works fine in my java app but when I try to create MongoSinkConnector i run on this error

[2022-06-13 16:29:47,136] ERROR Unable to process record SinkRecord\{kafkaOffset=863932, timestampType=CreateTime} ConnectRecord\{topic='zzz', kafkaPartition=1, key=null, keySchema=null, 
value=java.nio.HeapByteBuffer[pos=0 lim=164 cap=164], valueSchema=Schema{BYTES}, timestamp=1655136458764, headers=ConnectHeaders(headers=[ConnectHeader(key=scst_partition, value=1, 
schema=Schema\{INT8}), ConnectHeader(key=contentType, value=application/vnd.networkpacket.v1+avro, schema=Schema\{STRING}), ConnectHeader(key=spring_json_header_types, value=\
{scst_partition=java.lang.Integer, contentType=java.lang.String}, schema=Schema\{MAP})])} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData) org.apache.kafka.connect.errors.DataException: 
Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) 
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
(MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at 
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer at 
com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,136] ERROR WorkerSinkTask\{id=mongo-sink-0} Task threw an 
uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. 
(org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
(MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at 
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer at 
com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,137] ERROR WorkerSinkTask\{id=mongo-sink-0} Task threw an 
uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting 
WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Could 
not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169) at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51) at java.base/java.util.Optional.ifPresent(Optional.java:183) at 
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at 
java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85) at 
com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81) at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>
(MongoProcessedSinkRecordData.java:51) at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45) at 
com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75) at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90) at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) ... 10 more Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected 
object type: java.nio.HeapByteBuffer at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92) at 
com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60) at 
com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166) ... 27 more [2022-06-13 16:29:47,137] INFO Stopping MongoDB sink task 
(com.mongodb.kafka.connect.sink.MongoSinkTask) [2022-06-13 16:29:47,139] INFO [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Revoke previously assigned partitions 
zzz-1 pzzz-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

Part of my configuration:

{
   "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
   "connection.uri":"mongodb://mongodb:27017",
   "database":"xxx",
   "collection":"yyy",
   "topics":"zzz",
   "value.converter":"io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url":"http://schema-registry:8081",
   "key.converter":"org.apache.kafka.connect.storage.StringConverter",
   "key.converter.schemas.enable":"false",
   "value.converter.schemas.enable":"false"
}



 Comments   
Comment by PM Bot [ 05/Jul/22 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to comment on this if you're able to provide more information.

Comment by Ross Lawley [ 20/Jun/22 ]

Hi pawelwangryn0@gmail.com,

I noticed that the Avro data has a schema of Bytes - the connector only supports structs for Avro data at present.

If the avro data is raw bson then extracting the raw byte array would allow the connector to convert the data into a raw bson document.

All the best,

Ross

Comment by Ross Lawley [ 20/Jun/22 ]

Hi Paweł Wangryn,

Thank you for reaching out. As this sounds like a support issue, I wanted to give you some resources to get this question answered more quickly:

Just in case you have already opened a support case and are not receiving sufficient help, please let me know and I can facilitate escalating your issue.

All the best,

Ross Lawley

Generated at Thu Feb 08 09:06:06 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.