git grep -P '(\$change|change stream|change streams|ChangeStream)' src/mongo/s/query/cluster_aggregate.cpp src/mongo/db/pipeline/sharded_agg_helpers.* src/mongo/s/query/cluster_aggregation_planner.cpp
|
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // present, then $changeStream should immediately return an empty cursor just as other
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // - Any aggregation which begins with a $changeStream stage.
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: if (!(litePipe.hasChangeStream() &&
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // In order for a $changeStream to work reliably, we need the shard registry to be at least as
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines
|
src/mongo/db/pipeline/sharded_agg_helpers.cpp: if (litePipe.hasChangeStream()) {
|
src/mongo/s/query/cluster_aggregate.cpp: invariant(!litePipe.hasChangeStream());
|
src/mongo/s/query/cluster_aggregate.cpp: } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
|
src/mongo/s/query/cluster_aggregate.cpp: // This exception is thrown when a $changeStream stage encounters an event
|
src/mongo/s/query/cluster_aggregate.cpp: // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
|
src/mongo/s/query/cluster_aggregate.cpp: // If this is a change stream or a collectionless aggregation, we immediately return the user-
|
src/mongo/s/query/cluster_aggregate.cpp: if (litePipe.hasChangeStream() || nss.isCollectionlessAggregateNS()) {
|
src/mongo/s/query/cluster_aggregate.cpp: // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we
|
src/mongo/s/query/cluster_aggregate.cpp: // $changeStream, we allow the operation to continue so that stream cursors can be established
|
src/mongo/s/query/cluster_aggregate.cpp: // does not exist and this is not a $changeStream, then we return an empty cursor.
|
src/mongo/s/query/cluster_aggregate.cpp: } else if (!(litePipe.hasChangeStream() &&
|
src/mongo/s/query/cluster_aggregate.cpp: // If we don't have a routing table, then this is a $changeStream which must run on all shards.
|
src/mongo/s/query/cluster_aggregate.cpp: invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
|
src/mongo/s/query/cluster_aggregate.cpp: auto tailMode = liteParsedPipeline.hasChangeStream()
|
src/mongo/s/query/cluster_aggregation_planner.cpp: // For change streams, we need to set up a custom stage to establish cursors on new shards when
|
src/mongo/s/query/cluster_aggregation_planner.cpp: if (liteParsedPipeline.hasChangeStream()) {
|