Details
-
New Feature
-
Resolution: Unresolved
-
Major - P3
-
None
-
None
-
Service Arch
-
5
Description
This ticket is to implement a producer-consumer queue with asynchronous blocking on an empty queue.
Fairly often, we have found cases in sharding where it would be helpful to have a blocking producer-consumer queue, but need it to be used in a context where it's not acceptable to block a thread. This creates the need for ungainly workarounds. We should come up with an abstraction (probably based on futures) that handles this.
Example idea (rough sketch):
template <typename T> |
class Stream { |
public: |
// Pulls message from stream. Alternate name pop/receive |
ExecutorFuture<T> getNext();
|
// Alternate names push/send |
void emplace(T t); |
// Shuts down the stream, interrupts waiter with an error code on the future |
void close(); |
};
|
|
|
// Usage
|
Stream<Message> stream(executor);
|
|
|
// Consumer:
|
AsyncTry([stream]{
|
return stream.getNext().then([] (Message m) { |
// Handle message |
});
|
}).until([](Status s) {
|
return s == ErrorCodes::StreamClosed /* or some existing error code */ |
}).on(executor);
|
|
|
// Producer:
|
stream.emplace(Message{});
|
Things to consider:
- Should it support more than one consumer? I think single producer, single consumer is probably fine for most cases.
- Should we support a max size for streams, in which case emplacing a message could block as well?
- Is futures really the best way to do this? It looks cute, but it might make more sense to just set a single callback on the stream that gets called whenever something is added.
- Are there cases where we want to just call a callback every time something is added to the stream, rather than executing them sequentially?
I think whether we call this a stream or a channel depends on the above considerations.
Acceptance Criteria:
Design a class for Create a Futures-compatible Stream/Channel abstraction by answering the questions above.