-
Type: Bug
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
There is a document:
{ "_id": { "_id": { "$oid": "5d485086aadbfa00430a9653" }, "copyingData": true }, "operationType": "insert", "ns": { "db": "db", "coll": "coll" }, "documentKey": { "_id": { "$oid": "5d485086aadbfa00430a9653" } }, "fullDocument": { "name": "val" } }
And I want to use the only documentKey._id.$oid field as the key.
With following scheme:
{ "type": "record", "name": "keySchema", "fields": [ { "name": "documentKey._id.$oid", "type": "string", "default": "key" } ] }
I get
SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id.$oid=key}}
It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:
private Optional<BsonValue> fieldLookup(final String fieldName, final BsonDocument document) { if (document.containsKey(fieldName)) { return Optional.of(document.get(fieldName)); } else if (fieldName.contains(".") && !fieldName.endsWith(".")) { String subDocumentName = fieldName.substring(0, fieldName.indexOf(".")); String subDocumentFieldName = fieldName.substring(fieldName.indexOf(".") + 1); if (document.isDocument(subDocumentName)) { return fieldLookup(subDocumentFieldName, document.getDocument(subDocumentName)); } else if (document.isObjectId(subDocumentName) && subDocumentFieldName.equals("$oid")) { String hexString = document.getObjectId(subDocumentName).getValue().toHexString(); return Optional.of(new BsonString(hexString)); } } return Optional.empty(); }
Or maybe you could suggest another way?
In the end, I want to get a flattened document, something like that:
{"documentKey._id.$oid": "5d485086aadbfa00430a9653"} or {"documentKey._id": "5d485086aadbfa00430a9653"}