-
Type:
Bug
-
Resolution: Works as Designed
-
Priority:
Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
There is a document:
{
"_id": {
"_id": {
"$oid": "5d485086aadbfa00430a9653"
},
"copyingData": true
},
"operationType": "insert",
"ns": {
"db": "db",
"coll": "coll"
},
"documentKey": {
"_id": {
"$oid": "5d485086aadbfa00430a9653"
}
},
"fullDocument": {
"name": "val"
}
}
And I want to use the only documentKey._id.$oid field as the key.
With following scheme:
{
"type": "record",
"name": "keySchema",
"fields": [
{
"name": "documentKey._id.$oid",
"type": "string",
"default": "key"
}
]
}
I get
SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id.$oid=key}}
It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:
private Optional<BsonValue> fieldLookup(final String fieldName, final BsonDocument document) { if (document.containsKey(fieldName)) { return Optional.of(document.get(fieldName)); } else if (fieldName.contains(".") && !fieldName.endsWith(".")) { String subDocumentName = fieldName.substring(0, fieldName.indexOf(".")); String subDocumentFieldName = fieldName.substring(fieldName.indexOf(".") + 1); if (document.isDocument(subDocumentName)) { return fieldLookup(subDocumentFieldName, document.getDocument(subDocumentName)); } else if (document.isObjectId(subDocumentName) && subDocumentFieldName.equals("$oid")) { String hexString = document.getObjectId(subDocumentName).getValue().toHexString(); return Optional.of(new BsonString(hexString)); } } return Optional.empty(); }
Or maybe you could suggest another way?
In the end, I want to get a flattened document, something like that:
{"documentKey._id.$oid": "5d485086aadbfa00430a9653"}
or
{"documentKey._id": "5d485086aadbfa00430a9653"}