Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-402

com.mongodb.kafka.connect.source.json.formatter.DefaultJson can produce invalid JSON with NaN and Infinity

    • Type: Icon: Bug Bug
    • Resolution: Unresolved
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: 1.11.1
    • Component/s: Source
    • Labels:
      None
    • Java Drivers

      Create a source connector with the configuration:

      name=test
      connector.class=com.mongodb.kafka.connect.MongoSourceConnector
      tasks.max=1connection.uri=mongodb://localhost
      database=test
      collection=testtopic.prefix=mongo.changes
      topic.suffix=
      topic.creation.default.replication.factor=-1
      topic.creation.default.partitions=3pipeline=[{"$set": {"partitionKey": {"$convert": {"input": "$documentKey._id", "to": "string", "onError": "", "onNull": ""}}}}]
      collation=
      change.stream.full.document=updateLookupkey.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=org.apache.kafka.connect.storage.StringConverter
      output.format.key=schema
      output.schema.key={"type": "record", "name": "PartitionKey", "fields": [{"name": "partitionKey", "type": ["string", "null"]}]}
      output.format.value=json
      output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJsonerrors.tolerance=all
      errors.log.enable=true
      errors.log.include.messages=true
      heartbeat.interval.ms=600000
      heartbeat.topic.name=mongo.heartbeats.test.testpredicates=isHeartbeat
      predicates.isHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
      predicates.isHeartbeat.pattern=mongo\\.heartbeats\\..*transforms=extractPartitionKey
      transforms.extractPartitionKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
      transforms.extractPartitionKey.field=partitionKey
      transforms.extractPartitionKey.predicate=isHeartbeat
      transforms.extractPartitionKey.negate=true 

      Perform database operations:

      test> db.test.insertOne({ a: NaN })
      { acknowledged: true, insertedId: ObjectId('65c06f52b838acbfff95e33c') }
      test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { b: NaN } })
      { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }
      test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { c: Infinity } })
      { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }
      test> db.test.updateOne({ _id: ObjectId('65c06f52b838acbfff95e33c') }, { $set: { d: -Infinity } })
      { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 }

      The source connector produce messages with invalid JSON to Kafka:

      {"_id": {"_data": "8265C06F52000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1707110226, "i": 1}}, "wallTime": {"$date": 1707110226156}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "partitionKey": "65c06f52b838acbfff95e33c"}
      {"_id": {"_data": "8265C06F72000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110258, "i": 1}}, "wallTime": {"$date": 1707110258884}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"b": NaN}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
      {"_id": {"_data": "8265C06F78000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110264, "i": 1}}, "wallTime": {"$date": 1707110264226}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"c": Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
      {"_id": {"_data": "8265C06F94000000012B042C0100296E5A10043AF945FB8C864B1F8FFD7A58466A7985463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F6964006465C06F52B838ACBFFF95E33C000004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1707110292, "i": 1}}, "wallTime": {"$date": 1707110292362}, "fullDocument": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}, "a": NaN, "b": NaN, "c": Infinity, "d": -Infinity}, "ns": {"db": "test", "coll": "test"}, "documentKey": {"_id": {"$oid": "65c06f52b838acbfff95e33c"}}, "updateDescription": {"updatedFields": {"d": -Infinity}, "removedFields": [], "truncatedArrays": []}, "partitionKey": "65c06f52b838acbfff95e33c"}
      

      Node.js cannot parse messages produced by the source connector:

      Uncaught SyntaxError: Unexpected token N in JSON at position 401
      Uncaught SyntaxError: Unexpected token N in JSON at position 401
      Uncaught SyntaxError: Unexpected token N in JSON at position 401
      Uncaught SyntaxError: Unexpected token N in JSON at position 401

       

            Assignee:
            robert.walters@mongodb.com Robert Walters
            Reporter:
            tingweilan@noodoe.com 挺瑋 藍
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: