Create a source connector with the configuration:
name=test connector.class=com.mongodb.kafka.connect.MongoSourceConnector tasks.max=1connection.uri=mongodb://localhost database=test collection=testtopic.prefix=mongo.changes topic.suffix= topic.creation.default.replication.factor=-1 topic.creation.default.partitions=3pipeline=[{"$set": {"partitionKey": {"$convert": {"input": "$documentKey._id", "to": "string", "onError": "", "onNull": ""}}}}] collation= change.stream.full.document=updateLookupkey.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter output.format.key=schema output.schema.key={"type": "record", "name": "PartitionKey", "fields": [{"name": "partitionKey", "type": ["string", "null"]}]} output.format.value=json output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJsonerrors.tolerance=all errors.log.enable=true errors.log.include.messages=true heartbeat.interval.ms=600000 heartbeat.topic.name=mongo.heartbeats.test.testpredicates=isHeartbeat predicates.isHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.isHeartbeat.pattern=mongo\\.heartbeats\\..*transforms=extractPartitionKey transforms.extractPartitionKey.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.extractPartitionKey.field=partitionKey transforms.extractPartitionKey.predicate=isHeartbeat transforms.extractPartitionKey.negate=true
Perform database operations:
test> db.test.insertOne({ a: NaN }) { acknowledged: true, insertedId: ObjectId('65c06f52b838acbfff95e33c') } test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { b: NaN } }) { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 } test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { c: Infinity } }) { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 } test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { d: -Infinity } }) { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }
The source connector produce messages with invalid JSON to Kafka:
{"_id": {"_data": "8265C06F52000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1707110226, "i": 1}}, "wallTime": {"$date": 1707110226156}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "partitionKey": "65c06f52b838acbfff95e33c"} {"_id": {"_data": "8265C06F72000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110258, "i": 1}}, "wallTime": {"$date": 1707110258884}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"b": NaN}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"} {"_id": {"_data": "8265C06F78000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110264, "i": 1}}, "wallTime": {"$date": 1707110264226}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"c": Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"} {"_id": {"_data": "8265C06F94000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110292, "i": 1}}, "wallTime": {"$date": 1707110292362}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity, "d": -Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"d": -Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
Node.js cannot parse messages produced by the source connector:
Uncaught SyntaxError: Unexpected token N in JSON at position 401 Uncaught SyntaxError: Unexpected token N in JSON at position 401 Uncaught SyntaxError: Unexpected token N in JSON at position 401 Uncaught SyntaxError: Unexpected token N in JSON at position 401