[JAVA-5279] ResizingByteBufferFlux remains ResizingByteBufferFlux remains active after outer Mono Subscription cancellation Created: 02/Jan/24 Updated: 04/Jan/24 |
|
| Status: | Backlog |
| Project: | Java Driver |
| Component/s: | GridFS |
| Affects Version/s: | None |
| Fix Version/s: | None |
| Type: | Bug | Priority: | Minor - P4 |
| Reporter: | Slav Babanin | Assignee: | Unassigned |
| Resolution: | Unresolved | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Description |
|
Description: A potential bug has been identified in the GridFSUploadPublisher implementation, particularly in the createSaveChunksMono method. The core of the issue lies in the behavior of ResizingByteBufferFlux which, when created within another Mono and instantly subscribed to, does not properly react to the cancellation of the outer Mono. As a result, the ResizingByteBufferFlux remains active and continues processing data without contributing to the intended database operations and causing resource consumption. Steps to Reproduce:
Expected Result: Upon cancellation of the subscription, ResizingByteBufferFlux should also cease its operations, avoiding unnecessary CPU usage. Actual Result: ResizingByteBufferFlux continues to process data from the source after cancellation of the subscription, resulting in wasted resources. This issue is compounded when GridFSUploadPublisher is subscribed to again, leading to the creation of an additional ResizingByteBufferFlux leaving the previous one active. Possible Fix: A potential fix would be to modify the subscription logic such that the Disposable returned by the subscribe method is passed to sink.doOnCancel. This change would ensure that cancellation of the downstream subscription is properly propagated, allowing ResizingByteBufferFlux to terminate its operations accordingly.
|
| Comments |
| Comment by Ross Lawley [ 04/Jan/24 ] | ||||||||||||||||||||||||||||||||||||||||||||||
|
Having looked at the ResizingByteBufferFlux it utilizes Flux.<ByteBuffer>push() which according to the docs:
So it is expected behavior for it to consume greedily. The question is should an alternative internal implementation be added to ensure that it can respond to cancellation effectively. The reactive stream spec mandates:
This is the case although the internally used Flux.push potentially will consume more data than needed. The following test shows that cancellation is eventually honoured for the async Flux<ByteBuffer>
However, with a synchronous Flux all the data is consumed greedily:
This is because BaseSubscriber.hookOnNext essentially creates a sync loop, so the cancellation call is done after the loop has consumed (buffered) all resources. Ross |