[KAFKA-162] Scheme with DefaultJson formatter Created: 21/Sep/20  Updated: 27/Oct/23  Resolved: 22/Sep/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Andrey B Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
There is a document:

{
  "_id": {
    "_id": {
      "$oid": "5d485086aadbfa00430a9653"
    },
    "copyingData": true
  },
  "operationType": "insert",
  "ns": {
    "db": "db",
    "coll": "coll"
  },
  "documentKey": {
    "_id": {
      "$oid": "5d485086aadbfa00430a9653"
    }
  },
  "fullDocument": {
    "name": "val"
  }
}

And I want to use the only documentKey._id.$oid field as the key.

With following scheme:

{
  "type": "record",
  "name": "keySchema",
  "fields": [
    {
      "name": "documentKey._id.$oid",
      "type": "string",
      "default": "key"
    }
  ]
}

I get

SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id.$oid=key}}

It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:

private Optional<BsonValue> fieldLookup(final String fieldName, final BsonDocument document) {
    if (document.containsKey(fieldName)) {
      return Optional.of(document.get(fieldName));
    } else if (fieldName.contains(".") && !fieldName.endsWith(".")) {
      String subDocumentName = fieldName.substring(0, fieldName.indexOf("."));
      String subDocumentFieldName = fieldName.substring(fieldName.indexOf(".") + 1);
      if (document.isDocument(subDocumentName)) {
        return fieldLookup(subDocumentFieldName, document.getDocument(subDocumentName));
      } else if (document.isObjectId(subDocumentName) &&
              subDocumentFieldName.equals("$oid")) {
        String hexString = document.getObjectId(subDocumentName).getValue().toHexString();
        return Optional.of(new BsonString(hexString));
      }
    }
 
    return Optional.empty();
  }

Or maybe you could suggest another way?

In the end, I want to get a flattened document, something like that:

{"documentKey._id.$oid": "5d485086aadbfa00430a9653"}
or
{"documentKey._id": "5d485086aadbfa00430a9653"}



 Comments   
Comment by Andrey B [ 22/Sep/20 ]

Yeah, it makes sense.

With the scheme that you provided and com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson formatter it's possible to get what I want.
But I don't want to use SimplifiedJson since I already have some code that works with DefaultJson formatter.

So, I guess I tried to extend my pipeline to use $project or $convert.

Comment by Ross Lawley [ 22/Sep/20 ]

Hi andreworty@gmail.com,

So a couple of points here to note. Firstly, the documentation of new features is still in progress and won't be available until the release.

Secondly, the configuration output.json.formatter is used when converting non string Bson types into strings. So "$oid" is not a field in itself of the change stream document, rather its used as part of the extended json string representation of an ObjectId.

To get the ouput:

SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id=5d485086aadbfa00430a9653}}

You would need to set the schema to be:

{
  "type": "record",
  "name": "keySchema",
  "fields": [
    {
      "name": "documentKey._id",
      "type": "string",
      "default": "key"
    }
  ]
}

And use the "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson" json outputter.

Alternatively, I haven't tested this but you may be able to use the aggregation pipeline with $project and $convert to cast types to their required form and then use that field as the basis for the key.

I hope that makes sense.

Just to let you know for future reference, for usage questions the MongoDB community portal, located here is preferred.

Ross

Generated at Thu Feb 08 09:05:43 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.