[KAFKA-97] Configuration for MongoDB Kafka Connect Connector to avoid {"_id": {"$numberLong": "11111111"} Created: 09/Apr/20 Updated: 17/Jun/20 Resolved: 17/Jun/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | None |
| Affects Version/s: | 1.1 |
| Fix Version/s: | None |
| Type: | New Feature | Priority: | Major - P3 |
| Reporter: | Sabari Gandhi | Assignee: | Unassigned |
| Resolution: | Duplicate | Votes: | 2 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Environment: |
MongoDB: 3.6.17, Kafka Connector 1.1 |
||
| Attachments: |
|
||||||||
| Issue Links: |
|
||||||||
| Description |
|
I am using the attached configuration on the source side and I am seeing the following in the Kafka topic: {} |
| Comments |
| Comment by Ross Lawley [ 17/Jun/20 ] |
|
Will fix as part of |
| Comment by Sabari Gandhi [ 10/Apr/20 ] |
|
Update: When I further looked into Kafka code I do see the validation that happens on the message for the transformations. https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java#L52.
Transformations expect the schema type to have a struct with relevant data but Mongo source connector doesn't produce the struct format when it publishes messages to Kafka topic. Or is there any way to enable the configuration? Thanks in advance |
| Comment by Sabari Gandhi [ 10/Apr/20 ] |
|
To Summarize: I need a clean JSON in my S3. If I have StringConverter in source config and JsonConverter on the sink config I have clean JSON output but if I insert something like db.investigate1.insert({"_id" : NumberLong("298384262959477"),"name" : "sample", "type" : "pipeline"}) the output I see in S3 is as follows: ,"type":"pipeline"},"ns":{"coll":"investigate1","db":"test"},"documentKey":{"_id":{"$numberLong":"298384262959477"}},"operationType":"insert","_id":{"_data": {"$binary":"gl6Qw3cAAAADRh5faWQAMQIewgAAIuoAWhAEs+M48TaZTnOzj+bC1yVP2QQ=","$type":"00"},"_typeBits":{"$binary":"Ag==","$type":"00"}}} |
| Comment by Sabari Gandhi [ 10/Apr/20 ] |
|
We sink messages from Kafka topic to S3 and we can't run Athena queries because it breaks on the $. Also FYI, I have attached the sink configuration in the ticket.
Also, I tried transformations on S3 sink configurations it doesn't readily work with Mongo source connector. Is it because of the message format that's produced by the source connector to the Kafka? Can we please get some information on transformation supported by Mongo source connector? I confirmed some transformations work for debezium hence I was asking about the message format produced to the Kafka. Thanks |