Details
-
Bug
-
Resolution: Works as Designed
-
Major - P3
-
None
-
None
-
None
-
None
Description
I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
There is a document:
{
|
"_id": {
|
"_id": {
|
"$oid": "5d485086aadbfa00430a9653"
|
},
|
"copyingData": true
|
},
|
"operationType": "insert",
|
"ns": {
|
"db": "db",
|
"coll": "coll"
|
},
|
"documentKey": {
|
"_id": {
|
"$oid": "5d485086aadbfa00430a9653"
|
}
|
},
|
"fullDocument": {
|
"name": "val"
|
}
|
}
|
And I want to use the only documentKey._id.$oid field as the key.
With following scheme:
{
|
"type": "record",
|
"name": "keySchema",
|
"fields": [
|
{
|
"name": "documentKey._id.$oid",
|
"type": "string",
|
"default": "key"
|
}
|
]
|
}
|
I get
SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id.$oid=key}}
|
It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:
private Optional<BsonValue> fieldLookup(final String fieldName, final BsonDocument document) {
|
if (document.containsKey(fieldName)) {
|
return Optional.of(document.get(fieldName));
|
} else if (fieldName.contains(".") && !fieldName.endsWith(".")) {
|
String subDocumentName = fieldName.substring(0, fieldName.indexOf("."));
|
String subDocumentFieldName = fieldName.substring(fieldName.indexOf(".") + 1);
|
if (document.isDocument(subDocumentName)) {
|
return fieldLookup(subDocumentFieldName, document.getDocument(subDocumentName));
|
} else if (document.isObjectId(subDocumentName) &&
|
subDocumentFieldName.equals("$oid")) {
|
String hexString = document.getObjectId(subDocumentName).getValue().toHexString();
|
return Optional.of(new BsonString(hexString));
|
}
|
}
|
|
|
return Optional.empty();
|
}
|
Or maybe you could suggest another way?
In the end, I want to get a flattened document, something like that:
{"documentKey._id.$oid": "5d485086aadbfa00430a9653"}
|
or
|
{"documentKey._id": "5d485086aadbfa00430a9653"}
|