[KAFKA-402] com.mongodb.kafka.connect.source.json.formatter.DefaultJson can produce invalid JSON with NaN and Infinity Created: 05/Feb/24  Updated: 05/Feb/24

Status: Needs Triage
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.11.1
Fix Version/s: None

Type: Bug Priority: Unknown
Reporter: 挺瑋 藍 Assignee: Unassigned
Resolution: Unresolved Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Assigned Teams:
Java Drivers

 Description   

Create a source connector with the configuration:

name=test
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1connection.uri=mongodb://localhost
database=test
collection=testtopic.prefix=mongo.changes
topic.suffix=
topic.creation.default.replication.factor=-1
topic.creation.default.partitions=3pipeline=[{"$set": {"partitionKey": {"$convert": {"input": "$documentKey._id", "to": "string", "onError": "", "onNull": ""}}}}]
collation=
change.stream.full.document=updateLookupkey.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
output.format.key=schema
output.schema.key={"type": "record", "name": "PartitionKey", "fields": [{"name": "partitionKey", "type": ["string", "null"]}]}
output.format.value=json
output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJsonerrors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
heartbeat.interval.ms=600000
heartbeat.topic.name=mongo.heartbeats.test.testpredicates=isHeartbeat
predicates.isHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isHeartbeat.pattern=mongo\\.heartbeats\\..*transforms=extractPartitionKey
transforms.extractPartitionKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractPartitionKey.field=partitionKey
transforms.extractPartitionKey.predicate=isHeartbeat
transforms.extractPartitionKey.negate=true 

Perform database operations:

test> db.test.insertOne({ a: NaN })
{ acknowledged: true, insertedId: ObjectId('65c06f52b838acbfff95e33c') }
test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { b: NaN } })
{ acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }
test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { c: Infinity } })
{ acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }
test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { d: -Infinity } })
{ acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }

The source connector produce messages with invalid JSON to Kafka:

{"_id": {"_data": "8265C06F52000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1707110226, "i": 1}}, "wallTime": {"$date": 1707110226156}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "partitionKey": "65c06f52b838acbfff95e33c"}
{"_id": {"_data": "8265C06F72000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110258, "i": 1}}, "wallTime": {"$date": 1707110258884}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"b": NaN}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
{"_id": {"_data": "8265C06F78000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110264, "i": 1}}, "wallTime": {"$date": 1707110264226}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"c": Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
{"_id": {"_data": "8265C06F94000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110292, "i": 1}}, "wallTime": {"$date": 1707110292362}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity, "d": -Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"d": -Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}

Node.js cannot parse messages produced by the source connector:

Uncaught SyntaxError: Unexpected token N in JSON at position 401
Uncaught SyntaxError: Unexpected token N in JSON at position 401
Uncaught SyntaxError: Unexpected token N in JSON at position 401
Uncaught SyntaxError: Unexpected token N in JSON at position 401

 



 Comments   
Comment by PM Bot [ 05/Feb/24 ]

Hi tingweilan@noodoe.com, thank you for reporting this issue! The team will look into it and get back to you soon.

Generated at Thu Feb 08 09:06:17 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.