Proposed interface:
resharding_data_copy_util.h
/** * Returns the largest _id value in the collection. */ Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection); /** * Returns a batch of documents totaling in size to at most batchSizeLimitBytes. */ std::vector<InsertStatement> fillBatchForInsert(Pipeline& pipeline, int batchSizeLimitBytes); /** * Atomically inserts a batch of documents in a single storage transaction. Returns the number of * bytes inserted. * * Throws NamespaceNotFound if the collection doesn't already exist. */ int insertBatch(OperationContext* opCtx, const NamespaceString& nss, std::vector<InsertStatement>& batch);
resharding_collection_cloner.h
class ReshardingCollectionCloner { public: ... /** * Schedules work to repeatedly fetch and insert batches of documents. * * Returns a future that becomes ready when either: * (a) all documents have been fetched and inserted, or * (b) the cancellation token was canceled due to a stepdown or abort. */ SemiFuture<void> run(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken); /** * Fetches and inserts a single batch of documents. * * Returns true if there are more documents to be fetched and inserted, and returns false * otherwise. */ bool doOneBatch(OperationContext* opCtx, Pipeline& pipeline); private: std::unique_ptr<Pipeline, PipelineDeleter> _restartPipeline(OperationContext* opCtx); ... };
- is depended on by
-
SERVER-55330 Rewrite ReshardingOplogFetcher to use Pipeline instead of Fetcher
- Backlog