[KAFKA-162] Scheme with DefaultJson formatter Created: 21/Sep/20 Updated: 27/Oct/23 Resolved: 22/Sep/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | None |
| Affects Version/s: | None |
| Fix Version/s: | None |
| Type: | Bug | Priority: | Major - P3 |
| Reporter: | Andrey B | Assignee: | Ross Lawley |
| Resolution: | Works as Designed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Description |
|
I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
And I want to use the only documentKey._id.$oid field as the key. With following scheme:
I get
It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:
Or maybe you could suggest another way? In the end, I want to get a flattened document, something like that:
|
| Comments |
| Comment by Andrey B [ 22/Sep/20 ] | ||||||||||||
|
Yeah, it makes sense. With the scheme that you provided and com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson formatter it's possible to get what I want. So, I guess I tried to extend my pipeline to use $project or $convert. | ||||||||||||
| Comment by Ross Lawley [ 22/Sep/20 ] | ||||||||||||
|
So a couple of points here to note. Firstly, the documentation of new features is still in progress and won't be available until the release. Secondly, the configuration output.json.formatter is used when converting non string Bson types into strings. So "$oid" is not a field in itself of the change stream document, rather its used as part of the extended json string representation of an ObjectId. To get the ouput:
You would need to set the schema to be:
And use the "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson" json outputter. Alternatively, I haven't tested this but you may be able to use the aggregation pipeline with $project and $convert to cast types to their required form and then use that field as the basis for the key. I hope that makes sense. Just to let you know for future reference, for usage questions the MongoDB community portal, located here is preferred. Ross |