[JAVA-5279] ResizingByteBufferFlux remains ResizingByteBufferFlux remains active after outer Mono Subscription cancellation Created: 02/Jan/24  Updated: 04/Jan/24

Status: Backlog
Project: Java Driver
Component/s: GridFS
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Minor - P4
Reporter: Slav Babanin Assignee: Unassigned
Resolution: Unresolved Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

Description: A potential bug has been identified in the GridFSUploadPublisher implementation, particularly in the createSaveChunksMono method. The core of the issue lies in the behavior of ResizingByteBufferFlux which, when created within another Mono and instantly subscribed to, does not properly react to the cancellation of the outer Mono. As a result, the ResizingByteBufferFlux remains active and continues processing data without contributing to the intended database operations and causing resource consumption.

Steps to Reproduce:

  1. Set up a MongoDB client.
  2. Create a GridFSBucket.
  3. Initiate an upload process using uploadFromPublisher.
  4. Emit data to the upload source and then cancel the subscription.
  5. Observe that even after cancellation, ResizingByteBufferFlux continues processing data without saving it to the database.

Expected Result: Upon cancellation of the subscription, ResizingByteBufferFlux should also cease its operations, avoiding unnecessary CPU usage.

Actual Result: ResizingByteBufferFlux continues to process data from the source after cancellation of the subscription, resulting in wasted resources. This issue is compounded when GridFSUploadPublisher is subscribed to again, leading to the creation of an additional ResizingByteBufferFlux leaving the previous one active.

Possible Fix: A potential fix would be to modify the subscription logic such that the Disposable returned by the subscribe method is passed to sink.doOnCancel. This change would ensure that cancellation of the downstream subscription is properly propagated, allowing ResizingByteBufferFlux to terminate its operations accordingly.

Disposable subscribe = new ResizingByteBufferFlux(source, chunkSizeBytes) .flatMap(...) .subscribe(null, sink::error, () -> sink.success(lengthInBytes.get())); 
 
sink.onDispose(subscribe); 

 

 



 Comments   
Comment by Ross Lawley [ 04/Jan/24 ]

Having looked at the ResizingByteBufferFlux it utilizes Flux.<ByteBuffer>push() which according to the docs:

This Flux factory is useful if one wants to adapt some other single-threaded multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

So it is expected behavior for it to consume greedily. The question is should an alternative internal implementation be added to ensure that it can respond to cancellation effectively. The reactive stream spec mandates:

If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.

This is the case although the internally used Flux.push potentially will consume more data than needed.

The following test shows that cancellation is eventually honoured for the async Flux<ByteBuffer>

    @Test
    public void testAndVerifyResizingByteBufferPublisherCancellation() {
        List<Long> internalRequests = new ArrayList<>();
 
        String[] stringArray = {"fo", "ob", "ar", "foo", "bar", "ba", "z"};
 
        Flux<ByteBuffer> internal = Flux.interval(Duration.ofSeconds(1))
                .take(stringArray.length)
                .map(i -> stringArray[i.intValue()])
                .map(STRING_BYTE_BUFFER_FUNCTION)
                .doOnRequest(internalRequests::add);
        Flux<ByteBuffer> publisher = new ResizingByteBufferFlux(internal, 3);
 
        Duration waitDuration = Duration.ofMillis(500);
        StepVerifier.create(publisher, 0)
                .expectSubscription()
                .expectNoEvent(waitDuration)
                .thenRequest(1)
                .expectNext(STRING_BYTE_BUFFER_FUNCTION.apply("foo"))
                .thenCancel()
                .verify();
 
        assertIterableEquals(asList(1L, 1L, 1L), internalRequests); // Only three requests are made on the async flux (see: internal)
    }

However, with a synchronous Flux all the data is consumed greedily:

    @Test
    public void testAndVerifyResizingByteBufferPublisherCancellationSync() {
        List<Long> internalRequests = new ArrayList<>();
 
        String[] stringArray = {"fo", "ob", "ar", "foo", "bar", "ba", "z"};
 
        Flux<ByteBuffer> internal = Flux.fromArray(stringArray)
                .map(STRING_BYTE_BUFFER_FUNCTION)
                .doOnRequest(internalRequests::add);
        Flux<ByteBuffer> publisher = new ResizingByteBufferFlux(internal, 3);
 
        Duration waitDuration = Duration.ofMillis(500);
        StepVerifier.create(publisher, 0)
                .expectSubscription()
                .expectNoEvent(waitDuration)
                .thenRequest(1)
                .expectNext(STRING_BYTE_BUFFER_FUNCTION.apply("foo"))
                .thenCancel()
                .verify();
 
        assertIterableEquals(asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), internalRequests);
    }

This is because BaseSubscriber.hookOnNext essentially creates a sync loop, so the cancellation call is done after the loop has consumed (buffered) all resources.

Ross

Generated at Thu Feb 08 09:04:11 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.