-
Type: New Feature
-
Resolution: Unresolved
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: Internal Code
-
Labels:
-
Service Arch
-
5
This ticket is to implement a producer-consumer queue with asynchronous blocking on an empty queue.
Fairly often, we have found cases in sharding where it would be helpful to have a blocking producer-consumer queue, but need it to be used in a context where it's not acceptable to block a thread. This creates the need for ungainly workarounds. We should come up with an abstraction (probably based on futures) that handles this.
Example idea (rough sketch):
template <typename T> class Stream { public: // Pulls message from stream. Alternate name pop/receive ExecutorFuture<T> getNext(); // Alternate names push/send void emplace(T t); // Shuts down the stream, interrupts waiter with an error code on the future void close(); }; // Usage Stream<Message> stream(executor); // Consumer: AsyncTry([stream]{ return stream.getNext().then([] (Message m) { // Handle message }); }).until([](Status s) { return s == ErrorCodes::StreamClosed /* or some existing error code */ }).on(executor); // Producer: stream.emplace(Message{});
Things to consider:
- Should it support more than one consumer? I think single producer, single consumer is probably fine for most cases.
- Should we support a max size for streams, in which case emplacing a message could block as well?
- Is futures really the best way to do this? It looks cute, but it might make more sense to just set a single callback on the stream that gets called whenever something is added.
- Are there cases where we want to just call a callback every time something is added to the stream, rather than executing them sequentially?
I think whether we call this a stream or a channel depends on the above considerations.
Acceptance Criteria:
Design a class for Create a Futures-compatible Stream/Channel abstraction by answering the questions above.