Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-162

Scheme with DefaultJson formatter

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • None

      I use output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
      There is a document:

      {
        "_id": {
          "_id": {
            "$oid": "5d485086aadbfa00430a9653"
          },
          "copyingData": true
        },
        "operationType": "insert",
        "ns": {
          "db": "db",
          "coll": "coll"
        },
        "documentKey": {
          "_id": {
            "$oid": "5d485086aadbfa00430a9653"
          }
        },
        "fullDocument": {
          "name": "val"
        }
      }

      And I want to use the only documentKey._id.$oid field as the key.

      With following scheme:

      {
        "type": "record",
        "name": "keySchema",
        "fields": [
          {
            "name": "documentKey._id.$oid",
            "type": "string",
            "default": "key"
          }
        ]
      }

      I get

      SchemaAndValue{schema=Schema{keySchema:STRUCT}, value=Struct{documentKey._id.$oid=key}}

      It could be fixed via changing com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue#fieldLookup to handle ObjectId, e.g:

      private Optional<BsonValue> fieldLookup(final String fieldName, final BsonDocument document) {
          if (document.containsKey(fieldName)) {
            return Optional.of(document.get(fieldName));
          } else if (fieldName.contains(".") && !fieldName.endsWith(".")) {
            String subDocumentName = fieldName.substring(0, fieldName.indexOf("."));
            String subDocumentFieldName = fieldName.substring(fieldName.indexOf(".") + 1);
            if (document.isDocument(subDocumentName)) {
              return fieldLookup(subDocumentFieldName, document.getDocument(subDocumentName));
            } else if (document.isObjectId(subDocumentName) &&
                    subDocumentFieldName.equals("$oid")) {
              String hexString = document.getObjectId(subDocumentName).getValue().toHexString();
              return Optional.of(new BsonString(hexString));
            }
          }
      
          return Optional.empty();
        }

      Or maybe you could suggest another way?

      In the end, I want to get a flattened document, something like that:

      {"documentKey._id.$oid": "5d485086aadbfa00430a9653"}
      or
      {"documentKey._id": "5d485086aadbfa00430a9653"}

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            andreworty@gmail.com Andrey B
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: