Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-239

Source connector support for Json Schema

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: 1.5.0
    • Component/s: Source, Test
    • Labels:

      When using timestamp type, message written to mongodb is different with what we're receiving from Kafka Topic.

       

      Connector config: 

      name=mongo-source
      connector.class=com.mongodb.kafka.connect.MongoSourceConnector
      tasks.max=1
      
      # Connection and source configuration
      connection.uri=mongodb://localhost:27018
      database=foo
      collection=baz
      
      # Output configuration
      output.schema.infer.value=true
      output.format.value=schema
      
      topic.prefix=
      topic.suffix=
      poll.max.batch.size=1000
      poll.await.time.ms=5000
      
      # Change stream options
      pipeline=[]
      batch.size=0
      change.stream.full.document=updateLookup
      collation=
      

       

        

      Input:

      rs1:PRIMARY> db.baz.insert({"ts" : Timestamp(1620742106, 2)})
      

      Output:

      {
        "schema": {
          "type": "struct",
          "fields": [
            {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "_data"
                }
              ],
              "optional": true,
              "name": "_id",
              "field": "_id"
            },
            {
              "type": "int64",
              "optional": true,
              "name": "org.apache.kafka.connect.data.Timestamp",
              "version": 1,
              "field": "clusterTime"
            },
            {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "_id"
                }
              ],
              "optional": true,
              "name": "documentKey",
              "field": "documentKey"
            },
            {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "_id"
                },
                {
                  "type": "int64",
                  "optional": true,
                  "name": "org.apache.kafka.connect.data.Timestamp",
                  "version": 1,
                  "field": "ts"
                }
              ],
              "optional": true,
              "name": "fullDocument",
              "field": "fullDocument"
            },
            {
              "type": "struct",
              "fields": [
                {
                  "type": "string",
                  "optional": true,
                  "field": "coll"
                },
                {
                  "type": "string",
                  "optional": true,
                  "field": "db"
                }
              ],
              "optional": true,
              "name": "ns",
              "field": "ns"
            },
            {
              "type": "string",
              "optional": true,
              "field": "operationType"
            }
          ],
          "optional": false,
          "name": "default"
        },
        "payload": {
          "_id": {
            "_data": "82609AA32D000000012B022C0100296E5A10047DB96B417494423E8B1A351D894EDFC246645F69640064609AA32D485F85468421F0F30004"
          },
          "clusterTime": 1544382408,
          "documentKey": {
            "_id": "609aa32d485f85468421f0f3"
          },
          "fullDocument": {
            "_id": "609aa32d485f85468421f0f3",
            "ts": 1539435408
          },
          "ns": {
            "coll": "baz",
            "db": "foo"
          },
          "operationType": "insert"
        }
      }

       

      The value of `ts` we're expecting is `1620742106000`, but we're getting `1539435408` instead.

       

      After further investigation, timestamp value is overflown in this section https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java#L141

       

      Testing with this function in `SchemaAndValueProducerTest.java`

      
      @Test
      @DisplayName("test infer schema and value producer")
      void testInferSchemaAndValueProducer2() {
        Schema expectedSchema =
            SchemaBuilder.struct()
                .name(DEFAULT_FIELD_NAME)
                .field("timestamp", Timestamp.builder().optional().build())
                .build();
      
        SchemaAndValue expectedSchemaAndValue =
            new SchemaAndValue(
                expectedSchema, new Struct(expectedSchema).put("timestamp", new Date(1620742106000L)));
      
        SchemaAndValueProducer valueProducer =
            new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS);
      
        final String FULL_DOCUMENT_JSON =
            "{" + "\"timestamp\": {\"$timestamp\": {\"t\": 1620742106, \"i\": 2}} " + "}";
      
        assertSchemaAndValueEquals(
            expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
      }
      

       

      yield this result

       

      Changing the multiplier to `1000L` return expected epoch millis.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            rachitagrawa@hotmail.com Rachit Agrawal
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: