Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-51765

Create a Futures-compatible Stream/Channel abstraction

    • Service Arch
    • 5

      This ticket is to implement a producer-consumer queue with asynchronous blocking on an empty queue.

      Fairly often, we have found cases in sharding where it would be helpful to have a blocking producer-consumer queue, but need it to be used in a context where it's not acceptable to block a thread. This creates the need for ungainly workarounds. We should come up with an abstraction (probably based on futures) that handles this.

      Example idea (rough sketch):

      template <typename T>
      class Stream {
      public:
          // Pulls message from stream. Alternate name pop/receive
          ExecutorFuture<T> getNext();
          // Alternate names push/send
          void emplace(T t);
         // Shuts down the stream, interrupts waiter with an error code on the future
          void close();
      };
      
      // Usage
      Stream<Message> stream(executor);
      
      // Consumer:
      AsyncTry([stream]{
              return stream.getNext().then([] (Message m) {
                  // Handle message
              });
          }).until([](Status s) { 
              return s == ErrorCodes::StreamClosed /* or some existing error code */ 
          }).on(executor);
      
      // Producer:
      stream.emplace(Message{});
      

      Things to consider:

      • Should it support more than one consumer? I think single producer, single consumer is probably fine for most cases.
      • Should we support a max size for streams, in which case emplacing a message could block as well?
      • Is futures really the best way to do this? It looks cute, but it might make more sense to just set a single callback on the stream that gets called whenever something is added.
      • Are there cases where we want to just call a callback every time something is added to the stream, rather than executing them sequentially?

      I think whether we call this a stream or a channel depends on the above considerations.

      Acceptance Criteria: 

      Design a class for Create a Futures-compatible Stream/Channel abstraction by answering the questions above. 

            Assignee:
            backlog-server-servicearch [DO NOT USE] Backlog - Service Architecture
            Reporter:
            matthew.saltz@mongodb.com Matthew Saltz (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated: