[SERVER-51765] Create a Futures-compatible Stream/Channel abstraction Created: 20/Oct/20  Updated: 09/Feb/23

Status: Backlog
Project: Core Server
Component/s: Internal Code
Affects Version/s: None
Fix Version/s: None

Type: New Feature Priority: Major - P3
Reporter: Matthew Saltz (Inactive) Assignee: Backlog - Service Architecture
Resolution: Unresolved Votes: 0
Labels: servicearch-wfbf-day
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Assigned Teams:
Service Arch
Participants:
Story Points: 5

 Description   

This ticket is to implement a producer-consumer queue with asynchronous blocking on an empty queue.

Fairly often, we have found cases in sharding where it would be helpful to have a blocking producer-consumer queue, but need it to be used in a context where it's not acceptable to block a thread. This creates the need for ungainly workarounds. We should come up with an abstraction (probably based on futures) that handles this.

Example idea (rough sketch):

template <typename T>
class Stream {
public:
    // Pulls message from stream. Alternate name pop/receive
    ExecutorFuture<T> getNext();
    // Alternate names push/send
    void emplace(T t);
   // Shuts down the stream, interrupts waiter with an error code on the future
    void close();
};
 
// Usage
Stream<Message> stream(executor);
 
// Consumer:
AsyncTry([stream]{
        return stream.getNext().then([] (Message m) {
            // Handle message
        });
    }).until([](Status s) { 
        return s == ErrorCodes::StreamClosed /* or some existing error code */ 
    }).on(executor);
 
// Producer:
stream.emplace(Message{});

Things to consider:

  • Should it support more than one consumer? I think single producer, single consumer is probably fine for most cases.
  • Should we support a max size for streams, in which case emplacing a message could block as well?
  • Is futures really the best way to do this? It looks cute, but it might make more sense to just set a single callback on the stream that gets called whenever something is added.
  • Are there cases where we want to just call a callback every time something is added to the stream, rather than executing them sequentially?

I think whether we call this a stream or a channel depends on the above considerations.

Acceptance Criteria: 

Design a class for Create a Futures-compatible Stream/Channel abstraction by answering the questions above. 



 Comments   
Comment by Lauren Lewis (Inactive) [ 24/Feb/22 ]

We haven’t heard back from you for at least one calendar year, so this issue is being closed. If this is still an issue for you, please provide additional information and we will reopen the ticket.

Generated at Thu Feb 08 05:26:22 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.