Details
-
Bug
-
Resolution: Fixed
-
Unknown
-
1.5.0
Description
When using timestamp type, message written to mongodb is different with what we're receiving from Kafka Topic.
Connector config:
name=mongo-source
|
connector.class=com.mongodb.kafka.connect.MongoSourceConnector |
tasks.max=1 |
|
|
# Connection and source configuration
|
connection.uri=mongodb://localhost:27018 |
database=foo
|
collection=baz
|
|
|
# Output configuration
|
output.schema.infer.value=true |
output.format.value=schema
|
|
|
topic.prefix=
|
topic.suffix=
|
poll.max.batch.size=1000 |
poll.await.time.ms=5000 |
|
|
# Change stream options
|
pipeline=[]
|
batch.size=0 |
change.stream.full.document=updateLookup
|
collation=
|
Input:
rs1:PRIMARY> db.baz.insert({"ts" : Timestamp(1620742106, 2)}) |
Output:
{
|
"schema": {
|
"type": "struct",
|
"fields": [
|
{
|
"type": "struct",
|
"fields": [
|
{
|
"type": "string",
|
"optional": true,
|
"field": "_data"
|
}
|
],
|
"optional": true,
|
"name": "_id",
|
"field": "_id"
|
},
|
{
|
"type": "int64",
|
"optional": true,
|
"name": "org.apache.kafka.connect.data.Timestamp",
|
"version": 1,
|
"field": "clusterTime"
|
},
|
{
|
"type": "struct",
|
"fields": [
|
{
|
"type": "string",
|
"optional": true,
|
"field": "_id"
|
}
|
],
|
"optional": true,
|
"name": "documentKey",
|
"field": "documentKey"
|
},
|
{
|
"type": "struct",
|
"fields": [
|
{
|
"type": "string",
|
"optional": true,
|
"field": "_id"
|
},
|
{
|
"type": "int64",
|
"optional": true,
|
"name": "org.apache.kafka.connect.data.Timestamp",
|
"version": 1,
|
"field": "ts"
|
}
|
],
|
"optional": true,
|
"name": "fullDocument",
|
"field": "fullDocument"
|
},
|
{
|
"type": "struct",
|
"fields": [
|
{
|
"type": "string",
|
"optional": true,
|
"field": "coll"
|
},
|
{
|
"type": "string",
|
"optional": true,
|
"field": "db"
|
}
|
],
|
"optional": true,
|
"name": "ns",
|
"field": "ns"
|
},
|
{
|
"type": "string",
|
"optional": true,
|
"field": "operationType"
|
}
|
],
|
"optional": false,
|
"name": "default"
|
},
|
"payload": {
|
"_id": {
|
"_data": "82609AA32D000000012B022C0100296E5A10047DB96B417494423E8B1A351D894EDFC246645F69640064609AA32D485F85468421F0F30004"
|
},
|
"clusterTime": 1544382408,
|
"documentKey": {
|
"_id": "609aa32d485f85468421f0f3"
|
},
|
"fullDocument": {
|
"_id": "609aa32d485f85468421f0f3",
|
"ts": 1539435408
|
},
|
"ns": {
|
"coll": "baz",
|
"db": "foo"
|
},
|
"operationType": "insert"
|
}
|
}
|
The value of `ts` we're expecting is `1620742106000`, but we're getting `1539435408` instead.
After further investigation, timestamp value is overflown in this section https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java#L141
Testing with this function in `SchemaAndValueProducerTest.java`
|
|
@Test
|
@DisplayName("test infer schema and value producer") |
void testInferSchemaAndValueProducer2() { |
Schema expectedSchema =
|
SchemaBuilder.struct()
|
.name(DEFAULT_FIELD_NAME)
|
.field("timestamp", Timestamp.builder().optional().build()) |
.build();
|
|
|
SchemaAndValue expectedSchemaAndValue =
|
new SchemaAndValue( |
expectedSchema, new Struct(expectedSchema).put("timestamp", new Date(1620742106000L))); |
|
|
SchemaAndValueProducer valueProducer =
|
new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS); |
|
|
final String FULL_DOCUMENT_JSON = |
"{" + "\"timestamp\": {\"$timestamp\": {\"t\": 1620742106, \"i\": 2}} " + "}"; |
|
|
assertSchemaAndValueEquals(
|
expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON)));
|
}
|
yield this result 
Changing the multiplier to `1000L` return expected epoch millis.