[SERVER-50964] Create temporary swappable promise construct in order to receive messages on the RecipientStateMachine Created: 15/Sep/20  Updated: 06/Dec/22  Resolved: 21/Oct/20

Status: Closed
Project: Core Server
Component/s: Sharding
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Blake Oler Assignee: [DO NOT USE] Backlog - Sharding Team
Resolution: Won't Do Votes: 0
Labels: PM-234-M2, PM-234-T-lifecycle
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to SERVER-49900 Update config.localReshardingOperatio... Closed
is related to SERVER-49908 Update config.localReshardingOperatio... Closed
Assigned Teams:
Sharding
Participants:

 Description   

Current rough draft implementation:

  • Define different message types, each with their own IDL struct.
  • Thread all message types to a single swappable ShardPromise of type std::variant<All, Message, Types>
  • Use a single mutex to ensure concurrency when emplacing onto or replacing the promise.
  • Under mutex, producer threads will: emplace a promise.
  • Under mutex, the consumer thread will: pick up the message from the promise, then swap out for a new promise.

This will get us roughly a producer-consumer queue or mailbox of size one.



 Comments   
Comment by Blake Oler [ 21/Oct/20 ]

renctan has completed this with SERVER-50120.

Comment by Janna Golden [ 24/Sep/20 ]

I vote for alternative #2!

Comment by Max Hirschhorn [ 24/Sep/20 ]

It sounds like the design for primary-only services intended to have the Instance be the exclusive writer to the state document. SERVER-49900 and SERVER-49908 propose having a progressDonor array in the config.localReshardingOperations.recipient document where resharding's config.transactions cloner, donor oplog applier, etc. would be attempting to update this document. Rather than components needing to communicate in-memory to the primary-only service to get it to do this update to the state document, I'd like to consider some alternatives.

Alternative #2 - store progress in separate documents

We could have a config.localReshardingOperations.recipient.progress_applier collection which stores a document

{
    _id: {reshardingUUID: <reshardingUUID>, shardId: <donor1's shard ID>},
    progress: <donor1's progress {clusterTime, ts}>,
}

The ReshardingOplogApplier would update that document following applying a batch of oplog entries from that donor shard. Note that being able to apply batches from different donors independently is a recent outcome from changes to the rules for resharding's oplog application in the design document.

Consider the following interface for the ReshardingOplogApplier:

class ReshardingOplogApplier {
public:
    /**
     * The ReshardingOplogApplier would be given a ScopedTaskExecutor distinct
     * from the one the RecipientStateMachine is using. But its lifetime would
     * be managed by the RecipientStateMachine. Meaning that when interrupt()
     * is called, the RecipientStateMachine would shutdown the
     * ReshardingOplogApplier's executor.
     *
     * In replication's initial sync, cloneStopTimestamp is timestamp of sync
     * source's last oplog entry. In initial sync it would an OpTime, but
     * resharding ignores the term component because the reads from the donor's
     * oplog happen with level "majority" read concern.
     *
     * For resharding, it may be sufficient to use the value of the
     * "operationTime" field in the response of the last batch of documents
     * fetched during cloning.
     */
    ReshardingOplogApplier(
        std::shared_ptr<executor::ScopedTaskExecutor> executor,
        Timestamp cloneStopTimestamp);
 
    /**
     * Returns a future which becomes ready when at least one oplog entry
     * beyond cloneStopTimestamp has been applied.
     */
    SharedSemiFuture<void> awaitDoneApplyPhase();
 
    /**
     * Returns a future which becomes ready when all oplog entries from the
     * donor shard have been applied.
     */
    SharedSemiFuture<void> awaitDoneCatchUpPhase();
};

The idea would be to have the RecipientStateMachine create a ReshardingOplogApplier instance for each donor shard upon transitioning into the "applying" state. And then blocking the transition into the "steady-state" state is all the future returned by awaitDoneApplyPhase() for each of the ReshardingOplogAppliers becoming ready. In other words, _cloneThenTransitionToApplying() would create the ReshardingOplogAppliers, _applyThenTransitionToSteadyState() would wait for then all to ready awaitDoneApplyPhase(), and _awaitAllDonorsMirroringThenTransitionToStrictConsistency() would wait for them all to ready awaitDoneCatchUpPhase(). Note that I'm not sure we would need RecipientStateMachine::_allDonorsMirroring because the ReshardingOplogApplier would implicitly need to wait for that state by waiting for the final oplog entry to come through.

It is important that during recovery for the primary-only service that the ReshardingOplogApplier gets instantiated so its can recover whether to immediately ready its SharedPromises based on the contents of its progress document. I'm not prepared to make the leap of saying ReshardingOplogApplier ought to be its own separate primary-only service because I wasn't sure how well a primary-only service Instance deals with being co-managed by another primary-only service. We could explore that avenue too.

Alternative #3 - have an OpObserver invalidate the recipient state document

(Not going into as much depth here because I'm more in favor of Alternative #2.)

RecoveryUnit::onCommit() handlers are called after the storage transaction has committed but while still holding locks. This means for IX writes to a collection, the onCommit() handlers may be called in a different order than the order of the optimes assigned to the storage transactions.

We could do as SERVER-49900 and SERVER-49908 propose to store the progress information in the config.localReshardingOperations.recipient document. Then ReshardingOpObserver would notify the RecipientStateMachine when the config.localReshardingOperations.recipient document has changed. ReshardingOpObserver wouldn't be able to supply the most up-to-date view of the config.localReshardingOperations.recipient document because it is possible for an onCommit() handler to run for the most up-to-date view and then later on in wall-clock time for an onCommit() handler to run for an earlier view. We could attempt to order these views by OpTime but we don't currently always have the point-in-time the state document was read from storage / written at. Invalidating the state document to say to RecipientStateMachine "the config.localReshardingOperations.recipient document has changed but cannot be said exactly how" is similar to how the DurableViewCatalog is invalidated via DurableViewCatalog::onExternalChange() in OpObserverImpl.

Generated at Thu Feb 08 05:24:07 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.