[KAFKA-180] PartialValueStrategy clone key which is not interested Created: 21/Dec/20  Updated: 28/Oct/23  Resolved: 13/Jan/21

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: 1.4.0

Type: Bug Priority: Major - P3
Reporter: Yj hwang Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

The PartialValueStrategy component attempts to clone via BSON parsing on both the key and value of the kafka message. At this time, bson parsing is attempted for the key value, resulting in a parsing error if the key is an unsupported value. This error completely stops consuming messages. I think this is an unintended strategy implementation. PartialValueStrategy should try bson parsing only for the value.

For example, kafka's key type is string type. When I try to clone the key, the key is converted to a bson type. But it's just a string, so the conversion(bson parsing) fails.

Just cloning the value in PartialValueStrategy is clear and enough. 

The same goes for PartialKeyStrategy.



 Comments   
Comment by Githook User [ 13/Jan/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Fix LazyBsonDocument clone (#46)

Clone the LazyBsonDocument and not the unwrapped BsonDocument.
As not all LazyBsonDocument instances are valid BsonDocuments
and need unwrapping.

KAFKA-180
Branch: master
https://github.com/mongodb/mongo-kafka/commit/a8e1927117874ae94da499c5d7ff246d9f3b9700

Comment by Ross Lawley [ 12/Jan/21 ]

https://github.com/mongodb/mongo-kafka/pull/46

Comment by Ross Lawley [ 04/Jan/21 ]

Scheduling for 1.4.0

Comment by Yj hwang [ 22/Dec/20 ]

This is my suggestion. If I misunderstood or missed something, please let me know.
https://github.com/mongodb/mongo-kafka/pull/44

Comment by Yj hwang [ 21/Dec/20 ]

A sample config and the error with the key with `string` type.

...
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=schema_registry_url
value.converter.schemas.enabled=true
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=eventId
document.id.strategy.partial.value.projection.type=AllowList
 writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

 

 

org.apache.kafka.connect.errors.DataException: Unexpected data conversion exception.
 	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:133)
 	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:125)
 	at com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)
 	at com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy.generateId(PartialValueStrategy.java:55)
 	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:49)
 	at java.util.Optional.ifPresent(Optional.java:159)
 	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:46)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$5(MongoSinkTask.java:277)
 	at java.util.ArrayList.forEach(ArrayList.java:1257)
 	at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1082)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:277)
 	at java.util.ArrayList.forEach(ArrayList.java:1257)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:274)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:194)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:134)
 	at java.util.ArrayList.forEach(ArrayList.java:1257)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:132)
 	at java.util.HashMap.forEach(HashMap.java:1289)
 	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:127)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 	at java.lang.Thread.run(Thread.java:748)
 Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'my_kafka_key_string'.
 	at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
 	at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
 	at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
 	at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
 	at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
 	at org.bson.BsonDocument.parse(BsonDocument.java:62)
 	at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
 	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$0(SinkConverter.java:48)
 	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:131)
 	... 29 more

Comment by Esha Bhargava [ 21/Dec/20 ]

lala7573@gmail.com Thanks for reporting this issue! We'll look into it and get back to you.

Generated at Thu Feb 08 09:05:46 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.