[SERVER-62940] DocumentSourceGroup returns different results for in-memory group and spilled group when using $sum Created: 24/Jan/22  Updated: 29/Oct/23  Resolved: 10/Mar/22

Status: Closed
Project: Core Server
Component/s: None
Affects Version/s: 5.2.0, 4.2.17, 5.0.5, 5.1.1, 4.4.12
Fix Version/s: 6.0.0-rc0

Type: Bug Priority: Major - P3
Reporter: Yoon Soo Kim Assignee: Yoon Soo Kim
Resolution: Fixed Votes: 0
Labels: query-director-triage
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Gantt Dependency
has to be done before SERVER-64227 Remove $sum/$avg merging logic which ... Closed
Related
is related to SERVER-63260 Fix the issue that $group returns dif... Closed
Backwards Compatibility: Fully Compatible
Operating System: ALL
Steps To Reproduce:

// Inserts 1000 groups each of which has 4 documents.
for (let i = 0; i < 1000; ++i) {
	db.spilling.insert([
		{k: i, n: 1e+34},
		{k: i, n: NumberDecimal("0.1")},
		{k: i, n: NumberDecimal("0.01")},
		{k: i, n: -1e+34}]);
}
 
// Turns on the classical engine.
db.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true});
// Has the document source group spill.
db.adminCommand({setParameter: 1, internalDocumentSourceGroupMaxMemoryBytes: 1000});
// Makes sure that the document source group will spill.
db.spilling.aggregate([{$group: {_id: "$k", o: {$sum: "$n"}}}, {$group: {_id: "$o"}}], {allowDiskUse: false});
Error: command failed: {
	"ok" : 0,
	"errmsg" : "PlanExecutor error during aggregation :: caused by :: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.",
	"code" : 292,
	"codeName" : "QueryExceededMemoryLimitNoDiskUseAllowed"
 
// Two different results.
db.spilling.aggregate([{$group: {_id: "$k", o: {$sum: "$n"}}}, {$group: {_id: "$o"}}], {allowDiskUse: true}).toArray();
[ { "_id" : NumberDecimal("0.11") }, { "_id" : NumberDecimal("0") } ]
// Has the document source group not spill.
db.adminCommand({setParameter: 1, internalDocumentSourceGroupMaxMemoryBytes: 100000000});
{ "was" : NumberLong(1000), "ok" : 1 }
// One result as expected.
db.spilling.aggregate([{$group: {_id: "$k", o: {$sum: "$n"}}}, {$group: {_id: "$o"}}], {allowDiskUse: true}).toArray();
[ { "_id" : NumberDecimal("0.11") } ]

Sprint: QE 2022-02-07, QE 2022-02-21, QE 2022-03-07, QE 2022-03-21
Participants:

 Description   

The reason why the document source group may return different results for in-memory group and spilled group is because when the document source group spills, it spills the accumulator state in the final state, not in intermediate state. For example, $sum accumulator stores decimal total and non-decimal total separately but when the document source group spills data, it calls AccumulatorState::getValue() which returns the final state == decimalTotal.add(nonDecimalTotal.getDecimal()).

We may have potentially similar issues for accumulators that need intermediate state in sharded environment and it needs further investigation. I haven’t yet gone deeper on this aspect. But I’m not sure what’s MongoDB’s guarantee about aggregation results in sharded environment.
If we had similar issues for sharded envionment, those issues would exist for both the classic engine and the SBE. To fix the sharded environment issues (assuming there are), we need to send the full intermediate accumulator state over the wire. Currently, we’re sending semi-processed intermediate accumulator state over the wire.



 Comments   
Comment by Githook User [ 10/Mar/22 ]

Author:

{'name': 'Yoonsoo Kim', 'email': 'yoonsoo.kim@mongodb.com', 'username': 'yun-soo'}

Message: SERVER-62940 Have $sum accumulator serialize the full state of partial result
Branch: master
https://github.com/mongodb/mongo/commit/1e2a94cd7396709295f615de0437d854d72e54cd

Comment by Yoon Soo Kim [ 04/Feb/22 ]

Here's a repro for sharded $sum/$avg issue. This repro should run on an optimized build because the classic engine deliberately spills data on a debug build for testing purpose and always produces a wrong result.

(function() {
'use strict';
 
const st = new ShardingTest({shards: 2, shardOptions: {setParameter: "featureFlagSBEGroupPushdown=true"}});
 
const db = st.getDB("sharding");
const dbAtShard0 = st.shard0.getDB("sharding");
const dbAtShard1 = st.shard1.getDB("sharding");
 
assert(
    assert.commandWorked(dbAtShard0.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1}))
        .featureFlagSBEGroupPushdown.value);
assert(
    assert.commandWorked(dbAtShard1.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1}))
        .featureFlagSBEGroupPushdown.value);
 
assert.commandWorked(st.s0.adminCommand({enableSharding: db.getName()}));
 
let runShardedGroupOnBothEngine = (coll, pipeline) => {
    // Turns to the classic engine at the shard before figuring out its result.
    assert.commandWorked(
        dbAtShard0.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true}));
    assert.commandWorked(
        dbAtShard1.adminCommand({setParameter: 1, internalQueryForceClassicEngine: true}));
 
    // Collects the classic engine's result as the expected result, executing the pipeline at the
    // mongos.
    const classicalRes =
        coll.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}})
            .cursor.firstBatch;
 
    jsTestLog("classic result ---");
    jsTestLog(classicalRes);
 
    // Turns to the SBE engine at the shard.
    assert.commandWorked(
        dbAtShard0.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false}));
    assert.commandWorked(
        dbAtShard1.adminCommand({setParameter: 1, internalQueryForceClassicEngine: false}));
 
    const sbeRes = coll.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}})
                       .cursor.firstBatch;
 
    jsTestLog("SBE result ---");
    jsTestLog(sbeRes);
};
 
let prepareCollection = coll => {
    coll.drop();
 
    // Makes sure that the collection is sharded.
    assert.commandWorked(st.s0.adminCommand({shardCollection: coll.getFullName(), key: {_id: "hashed"}}));
 
    return coll;
};
 
// Hash-sharded collection
let coll = prepareCollection(db.partial_sum);
// Unsharded collection
let coll2 = db.partial_sum2;
 
for (let i = 0; i < 3; ++i) {
    assert.commandWorked(
        coll.insert([
            {k: i, n: 1e+34},
            {k: i, n: NumberDecimal("0.1")},
            {k: i, n: NumberDecimal("0.01")},
            {k: i, n: -1e+34}]));
    assert.commandWorked(
        coll2.insert([
            {k: i, n: 1e+34},
            {k: i, n: NumberDecimal("0.1")},
            {k: i, n: NumberDecimal("0.01")},
            {k: i, n: -1e+34}]));
}
 
runShardedGroupOnBothEngine(coll, [{$group: {_id: "$k", s: {$sum: "$n"}}}, {$group: {_id: "$s"}}]);
// classic: [ { "_id" : NumberDecimal("0.11") }, { "_id" : NumberDecimal("0") } ]
// SBE: [ { "_id" : NumberDecimal("0") }, { "_id" : NumberDecimal("0.11") } ]
runShardedGroupOnBothEngine(coll2, [{$group: {_id: "$k", s: {$sum: "$n"}}}, {$group: {_id: "$s"}}]);
// classic: [ { "_id" : NumberDecimal("0.11") } ]
// SBE: [ { "_id" : NumberDecimal("0.11") } ]
 
runShardedGroupOnBothEngine(coll, [{$group: {_id: "$k", a: {$avg: "$n"}}}, {$group: {_id: "$a"}}]);
// classic: [ { "_id" : NumberDecimal("0.0275") }, { "_id" : NumberDecimal("0") } ]
// SBE: [ { "_id" : NumberDecimal("0") }, { "_id" : NumberDecimal("0.0275") } ]
runShardedGroupOnBothEngine(coll2, [{$group: {_id: "$k", a: {$avg: "$n"}}}, {$group: {_id: "$a"}}]);
// classic: [ { "_id" : NumberDecimal("0.0275") } ]
// SBE: [ { "_id" : NumberDecimal("0.0275") } ]
 
st.stop();
}());

Comment by Yoon Soo Kim [ 04/Feb/22 ]

mihai.andrei
About backporting:
I think that we should not backport this fix all the way down to 4.0 because this fix is basically breaking change. Instead, as you proposed offline, this fix should be applied to 5.3 and forward under FCV 5.3.

When we discontinue 5.2 (or 5.0?), we can remove the old code that sends old over-the-wire format.

It sounds like that we should add upgrade/downgrade scenario tests.

What would possibly happen to multiversion agg fuzzer tests?

Comment by Yoon Soo Kim [ 04/Feb/22 ]

mihai.andrei, Thanks for posting the more detailed proposed fix.

Actually, I think we missed one thing. We need to know what the type for non-decimal partial result is to merge partial results properly. For example, most data is int32 and just one datum happens to be a double and the datum happens to be at a shard which should send the partial result type == double because it adds int32s and a double. The merging-side should produce 'double' result in the end. But if shards do not send the partial result type for non-decimal sum, the merging-side neither can decide in which type non-decimal partial result can be added to the global result non-decimal part nor decide in which result type it should produce. So, I think we should always send 4 elements over the wire.

  1. decimal total: may be Nothing if no decimal input has been encountered
  2. 1st double from DoubleDoubleSummation
  3. 2nd double from DoubleDoubleSummation
  4. partial result type for non-decimal total: we use type information only from this element.

Actually, the SBE $sum algorithm maintains these 4 elements.

I think this issue has been a long-standing one which we haven't noticed so far.

Comment by Yoon Soo Kim [ 31/Jan/22 ]

steve.la This is specific only to the classic engine's DocumentSourceGroup which returns different $sum result for in-memory group and spilled group.

Removed SBE-related part from the repro steps to clarify this issue is related only to the classic engine.

The repro step only shows the classic engine's bug.

I think we need to investigate sharded accumulators ($sum / $avg / $stdDevPop / $stdDevSamp) behavior for both the classic engine and the SBE. We reuse the merge-side DocumentSourceGroup to implement the SBE sharded accumulators.

I haven't had a chance to investigate sharded accumulators behavior.

Generated at Thu Feb 08 05:56:29 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.