private static IAsyncCursor<TResult> GetAggregateCursor<TQuery, TResult>(IMongoCollection<TQuery> collection, IMongoQueryable<TQuery> originalQuery, List<BsonDocument> additionalPipeline, int? batchSize) where TResult : new()
|
{
|
string originalQueryString = originalQuery.ToString();
|
var aggregateString = originalQueryString.Replace("aggregate(", "").Replace(", \"_id\" : 0", "").RemoveFromEnd(")");
|
|
|
var originalPipeline = BsonSerializer.Deserialize<List<BsonDocument>>(aggregateString);
|
var newList = new List<BsonDocument>();
|
foreach ( BsonDocument bsonDocument in originalPipeline )
|
{
|
string name = bsonDocument?.Elements?.FirstOrDefault().Name;
|
|
if ( name == "$unwind" )
|
{
|
newList.Add(BsonSerializer.Deserialize<BsonDocument>($"{{ $unwind:{{path:\"{bsonDocument.FirstOrDefault().Value.ToString()}\",preserveNullAndEmptyArrays:true}}}}"));
|
}
|
else
|
{
|
newList.Add(bsonDocument);
|
}
|
}
|
|
if ( additionalPipeline.IsNotEmpty() )
|
{
|
newList.AddRange(additionalPipeline);
|
}
|
|
|
PipelineDefinition<TQuery, TResult> query = new BsonDocumentStagePipelineDefinition<TQuery, TResult>(newList);
|
|
var options = new AggregateOptions()
|
{
|
BatchSize = batchSize > 0 ? batchSize : null
|
};
|
|
return collection.Aggregate(query, options);
|
|
}
|
|