-
Type: Bug
-
Resolution: Won't Fix
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
Atlas Streams
-
ALL
-
Sprint 61
The kafka to $merge test below produces DLQ messages for "ConflictingUpdateOperators" in the $merge stage. This is likely a bug in our code. I was able to repro this consistently when running a release build (though it might repro in a debug build as well).
[j0:prim] dlqMessage: { _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919, key: BinData(0, ), headers: [] } }, errInfo: { reason: "Failed to process an input document in the current batch in MergeOperator with error: code = ConflictingUpdateOperators, reason = Updating the path '_..." }, operatorName: "MergeOperator", doc: { a: 2, arr: { i: 918 }, str: "vAU6Eur0OfRtQy7I1mBAleGM4ttjz1jyVMGBRMORlme7WjWDGLPJ5LnWWPZbvE3S8cGisLrh1PZdWnAC4iCrFeQAGYMHjaMde9gGsc9ByLcyFsdszsdVmBCCeT3nKX1V", _ts: new Date(1730334396868), _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919 } }, _ts: new Date(1730334396871) }, processorName: "reader", dlqTime: new Date(1730334401320) }
Add this test in kafka.js:
runKafkaTest(kafka, () => { Random.setRandomSeed(42); // Write a bunch to a 32 partition Kafka topic. const numDocsInBatch = 5001; let arr = []; const str = makeRandomString(128); for (let i = 0; i < numDocsInBatch; i += 1) { arr.push({i: i}); } let inputData = []; const numInput = 10; for (let i = 0; i < numInput; i += 1) { inputData.push({a: i, arr: arr, str: str}); } let spName = "writer"; const totalInput = arr.length * inputData.length; sp.createStreamProcessor(spName, [ {$source: {'connectionName': '__testMemory'}}, {$unwind: "$arr"}, { $emit: { connectionName: kafkaPlaintextName, topic: topicName1, } } ]); sp[spName].start(); for (const doc of inputData) { sp[spName].testInsert(doc); } assert.soon(() => { return sp[spName].stats().outputMessageCount == totalInput; }); sp[spName].stop(); // Now read from that topic into a $merge. spName = "reader"; sp.createStreamProcessor(spName, [ { $source: { connectionName: kafkaPlaintextName, topic: topicName1, config: { auto_offset_reset: "earliest" } }}, {$project: {_id: 0}}, {$merge: {into: {connectionName: dbConnName, db: dbName, coll: sinkCollName1}}} ]); sp[spName].start(); assert.soon(() => { jsTestLog(sp[spName].stats()); return sp[spName].stats().outputMessageCount == totalInput; }); sp[spName].stop(); }, 32 /* partitionCount */);