- 
    Type:Bug 
- 
    Resolution: Fixed
- 
    Priority:Unknown 
- 
    Affects Version/s: 1.5.0
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
When using timestamp type, message written to mongodb is different with what we're receiving from Kafka Topic.
Connector config:
name=mongo-source connector.class=com.mongodb.kafka.connect.MongoSourceConnector tasks.max=1 # Connection and source configuration connection.uri=mongodb://localhost:27018 database=foo collection=baz # Output configuration output.schema.infer.value=true output.format.value=schema topic.prefix= topic.suffix= poll.max.batch.size=1000 poll.await.time.ms=5000 # Change stream options pipeline=[] batch.size=0 change.stream.full.document=updateLookup collation=
Input:
rs1:PRIMARY> db.baz.insert({"ts" : Timestamp(1620742106, 2)})
Output:
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_data"
          }
        ],
        "optional": true,
        "name": "_id",
        "field": "_id"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "clusterTime"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_id"
          }
        ],
        "optional": true,
        "name": "documentKey",
        "field": "documentKey"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "_id"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "ts"
          }
        ],
        "optional": true,
        "name": "fullDocument",
        "field": "fullDocument"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "coll"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          }
        ],
        "optional": true,
        "name": "ns",
        "field": "ns"
      },
      {
        "type": "string",
        "optional": true,
        "field": "operationType"
      }
    ],
    "optional": false,
    "name": "default"
  },
  "payload": {
    "_id": {
      "_data": "82609AA32D000000012B022C0100296E5A10047DB96B417494423E8B1A351D894EDFC246645F69640064609AA32D485F85468421F0F30004"
    },
    "clusterTime": 1544382408,
    "documentKey": {
      "_id": "609aa32d485f85468421f0f3"
    },
    "fullDocument": {
      "_id": "609aa32d485f85468421f0f3",
      "ts": 1539435408
    },
    "ns": {
      "coll": "baz",
      "db": "foo"
    },
    "operationType": "insert"
  }
}
The value of `ts` we're expecting is `1620742106000`, but we're getting `1539435408` instead.
After further investigation, timestamp value is overflown in this section https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java#L141
Testing with this function in `SchemaAndValueProducerTest.java`
@Test @DisplayName("test infer schema and value producer") void testInferSchemaAndValueProducer2() { Schema expectedSchema = SchemaBuilder.struct() .name(DEFAULT_FIELD_NAME) .field("timestamp", Timestamp.builder().optional().build()) .build(); SchemaAndValue expectedSchemaAndValue = new SchemaAndValue( expectedSchema, new Struct(expectedSchema).put("timestamp", new Date(1620742106000L))); SchemaAndValueProducer valueProducer = new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS); final String FULL_DOCUMENT_JSON = "{" + "\"timestamp\": {\"$timestamp\": {\"t\": 1620742106, \"i\": 2}} " + "}"; assertSchemaAndValueEquals( expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON))); }
yield this result 
Changing the multiplier to `1000L` return expected epoch millis.
