[KAFKA-239] Source connector support for Json Schema Created: 21/Jul/21  Updated: 04/May/22  Resolved: 29/Jul/21

Status: Closed
Project: Kafka Connector
Component/s: Source, Test
Affects Version/s: 1.5.0
Fix Version/s: None

Type: Bug Priority: Unknown
Reporter: Rachit Agrawal Assignee: Ross Lawley
Resolution: Duplicate Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Cloners
clones KAFKA-220 Source connector with infer value ove... Closed
Duplicate
duplicates KAFKA-220 Source connector with infer value ove... Closed

 Description   

When using timestamp type, message written to mongodb is different with what we're receiving from Kafka Topic.

 

Connector config: 

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
 
# Connection and source configuration
connection.uri=mongodb://localhost:27018
database=foo
collection=baz
 
# Output configuration
output.schema.infer.value=true
output.format.value=schema
 
topic.prefix=
topic.suffix=
poll.max.batch.size=1000
poll.await.time.ms=5000
 
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=

 

  

Input:

rs1:PRIMARY> db.baz.insert({"ts" : Timestamp(1620742106, 2)})

Output:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_data"
          }
        ],
        "optional": true,
        "name": "_id",
        "field": "_id"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "clusterTime"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_id"
          }
        ],
        "optional": true,
        "name": "documentKey",
        "field": "documentKey"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_id"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "ts"
          }
        ],
        "optional": true,
        "name": "fullDocument",
        "field": "fullDocument"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "coll"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          }
        ],
        "optional": true,
        "name": "ns",
        "field": "ns"
      },
      {
        "type": "string",
        "optional": true,
        "field": "operationType"
      }
    ],
    "optional": false,
    "name": "default"
  },
  "payload": {
    "_id": {
      "_data": "82609AA32D000000012B022C0100296E5A10047DB96B417494423E8B1A351D894EDFC246645F69640064609AA32D485F85468421F0F30004"
    },
    "clusterTime": 1544382408,
    "documentKey": {
      "_id": "609aa32d485f85468421f0f3"
    },
    "fullDocument": {
      "_id": "609aa32d485f85468421f0f3",
      "ts": 1539435408
    },
    "ns": {
      "coll": "baz",
      "db": "foo"
    },
    "operationType": "insert"
  }
}

 

The value of `ts` we're expecting is `1620742106000`, but we're getting `1539435408` instead.

 

After further investigation, timestamp value is overflown in this section https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java#L141

 

Testing with this function in `SchemaAndValueProducerTest.java`

 
@Test
@DisplayName("test infer schema and value producer")
void testInferSchemaAndValueProducer2() {
  Schema expectedSchema =
      SchemaBuilder.struct()
          .name(DEFAULT_FIELD_NAME)
          .field("timestamp", Timestamp.builder().optional().build())
          .build();
 
  SchemaAndValue expectedSchemaAndValue =
      new SchemaAndValue(
          expectedSchema, new Struct(expectedSchema).put("timestamp", new Date(1620742106000L)));
 
  SchemaAndValueProducer valueProducer =
      new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS);
 
  final String FULL_DOCUMENT_JSON =
      "{" + "\"timestamp\": {\"$timestamp\": {\"t\": 1620742106, \"i\": 2}} " + "}";
 
  assertSchemaAndValueEquals(
      expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
}

 

yield this result

 

Changing the multiplier to `1000L` return expected epoch millis.



 Comments   
Comment by Ross Lawley [ 29/Jul/21 ]

Closing as a duplicate of KAFKA-220.

Just to note the fix is in the released 1.6.0 version.

Comment by Rachit Agrawal [ 21/Jul/21 ]

I was hoping to clone and edit this. I suppose I dont have permission, please ignore this one, I will create another.

Generated at Thu Feb 08 09:05:55 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.