[KAFKA-97] Configuration for MongoDB Kafka Connect Connector to avoid {"_id": {"$numberLong": "11111111"} Created: 09/Apr/20  Updated: 17/Jun/20  Resolved: 17/Jun/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: 1.1
Fix Version/s: None

Type: New Feature Priority: Major - P3
Reporter: Sabari Gandhi Assignee: Unassigned
Resolution: Duplicate Votes: 2
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

MongoDB: 3.6.17, Kafka Connector 1.1


Attachments: File sink_config.json     File source_config.json    
Issue Links:
Duplicate
duplicates KAFKA-99 Support relaxed Json format in the so... Closed

 Description   

I am using the attached configuration on the source side and I am seeing the following in the Kafka topic:

{}
"somefield": {"$numberLong": "2342423432432432434324"}
 
is there a way we can have the data formatted like this: "somefield": 2342423432432432434324".
 
Attached config uses StringConverter, but I have tried JSON as well as Avro with the following configuration
 
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
 
Thanks in advance.
 
 
{}



 Comments   
Comment by Ross Lawley [ 17/Jun/20 ]

Will fix as part of KAFKA-99

Comment by Sabari Gandhi [ 10/Apr/20 ]

Update: When I further looked into Kafka code I do see the validation that happens on the message for the transformations.  https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java#L52.

 

Transformations expect the schema type to have a struct with relevant data but Mongo source connector doesn't produce the struct format when it publishes messages to Kafka topic. Or is there any way to enable the configuration? Thanks in advance

Comment by Sabari Gandhi [ 10/Apr/20 ]

To Summarize: I need a clean JSON in my S3. If I have StringConverter in source config and JsonConverter on the sink config I have clean JSON output but if I insert something like  db.investigate1.insert({"_id" : NumberLong("298384262959477"),"name" : "sample", "type" : "pipeline"})

the output I see in S3 is as follows: 
{"fullDocument":{"name":"sample","_id":

{"$numberLong":"298384262959477"}

,"type":"pipeline"},"ns":{"coll":"investigate1","db":"test"},"documentKey":{"_id":{"$numberLong":"298384262959477"}},"operationType":"insert","_id":{"_data":

{"$binary":"gl6Qw3cAAAADRh5faWQAMQIewgAAIuoAWhAEs+M48TaZTnOzj+bC1yVP2QQ=","$type":"00"}

,"_typeBits":{"$binary":"Ag==","$type":"00"}}}
 
I would like to see something like "fullDocument":{"name":"sample","_id":298384262959477, "type":"pipeline"}... in my S3.
 
I also looked into transforms that are available on the sink config. But I see the transforms does not work here as it works for debezium connector. When i looked into it more I doubt the message format produced by source connector in kafka topic is not similar to debezium. Do we have transform support for source connector. Please advice.
 

Comment by Sabari Gandhi [ 10/Apr/20 ]

We sink messages from Kafka topic to S3 and we can't run Athena queries because it breaks on the $. Also FYI, I have attached the sink configuration in the ticket. 

 

Also, I tried transformations on S3 sink configurations it doesn't readily work with Mongo source connector. Is it because of the message format that's produced by the source connector to the Kafka? Can we please get some information on transformation supported by Mongo source connector? I confirmed some transformations work for debezium hence I was asking about the message format produced to the Kafka. Thanks

Generated at Thu Feb 08 09:05:34 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.