-
Type:
Bug
-
Resolution: Gone away
-
Priority:
Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
-
Atlas Streams
-
ALL
-
Sprint 61, Sprint 72
-
None
-
None
-
None
-
None
-
None
-
None
-
None
The kafka to $merge test below produces DLQ messages for "ConflictingUpdateOperators" in the $merge stage. This is likely a bug in our code. I was able to repro this consistently when running a release build (though it might repro in a debug build as well).
[j0:prim] dlqMessage: { _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919, key: BinData(0, ), headers: [] } }, errInfo: { reason: "Failed to process an input document in the current batch in MergeOperator with error: code = ConflictingUpdateOperators, reason = Updating the path '_..." }, operatorName: "MergeOperator", doc: { a: 2, arr: { i: 918 }, str: "vAU6Eur0OfRtQy7I1mBAleGM4ttjz1jyVMGBRMORlme7WjWDGLPJ5LnWWPZbvE3S8cGisLrh1PZdWnAC4iCrFeQAGYMHjaMde9gGsc9ByLcyFsdszsdVmBCCeT3nKX1V", _ts: new Date(1730334396868), _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919 } }, _ts: new Date(1730334396871) }, processorName: "reader", dlqTime: new Date(1730334401320) }
Add this test in kafka.js:
runKafkaTest(kafka, () => {
Random.setRandomSeed(42); // Write a bunch to a 32 partition Kafka topic.
const numDocsInBatch = 5001;
let arr = [];
const str = makeRandomString(128);
for (let i = 0; i < numDocsInBatch; i += 1) {
arr.push({i: i});
}
let inputData = [];
const numInput = 10;
for (let i = 0; i < numInput; i += 1) {
inputData.push({a: i, arr: arr, str: str});
}
let spName = "writer";
const totalInput = arr.length * inputData.length;
sp.createStreamProcessor(spName, [
{$source: {'connectionName': '__testMemory'}},
{$unwind: "$arr"},
{
$emit: {
connectionName: kafkaPlaintextName,
topic: topicName1,
}
}
]);
sp[spName].start();
for (const doc of inputData) {
sp[spName].testInsert(doc);
}
assert.soon(() => { return sp[spName].stats().outputMessageCount == totalInput; });
sp[spName].stop(); // Now read from that topic into a $merge.
spName = "reader";
sp.createStreamProcessor(spName, [
{ $source: {
connectionName: kafkaPlaintextName,
topic: topicName1,
config: {
auto_offset_reset: "earliest"
}
}},
{$project: {_id: 0}},
{$merge: {into: {connectionName: dbConnName, db: dbName, coll: sinkCollName1}}}
]);
sp[spName].start();
assert.soon(() => {
jsTestLog(sp[spName].stats());
return sp[spName].stats().outputMessageCount == totalInput;
});
sp[spName].stop();
}, 32 /* partitionCount */);