Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-5279

ResizingByteBufferFlux remains active after outer Mono Subscription cancellation

    • Type: Icon: Bug Bug
    • Resolution: Unresolved
    • Priority: Icon: Minor - P4 Minor - P4
    • None
    • Affects Version/s: None
    • Component/s: GridFS
    • None

      Description: A potential bug has been identified in the GridFSUploadPublisher implementation, particularly in the createSaveChunksMono method. The core of the issue lies in the behavior of ResizingByteBufferFlux which, when created within another Mono and instantly subscribed to, does not properly react to the cancellation of the outer Mono. As a result, the ResizingByteBufferFlux remains active and continues processing data without contributing to the intended database operations and causing resource consumption.

      Steps to Reproduce:

      1. Set up a MongoDB client.
      2. Create a GridFSBucket.
      3. Initiate an upload process using uploadFromPublisher.
      4. Emit data to the upload source and then cancel the subscription.
      5. Observe that even after cancellation, ResizingByteBufferFlux continues processing data without saving it to the database.

      Expected Result: Upon cancellation of the subscription, ResizingByteBufferFlux should also cease its operations, avoiding unnecessary CPU usage.

      Actual Result: ResizingByteBufferFlux continues to process data from the source after cancellation of the subscription, resulting in wasted resources. This issue is compounded when GridFSUploadPublisher is subscribed to again, leading to the creation of an additional ResizingByteBufferFlux leaving the previous one active.

      Possible Fix: A potential fix would be to modify the subscription logic such that the Disposable returned by the subscribe method is passed to sink.doOnCancel. This change would ensure that cancellation of the downstream subscription is properly propagated, allowing ResizingByteBufferFlux to terminate its operations accordingly.

      Disposable subscribe = new ResizingByteBufferFlux(source, chunkSizeBytes) .flatMap(...) .subscribe(null, sink::error, () -> sink.success(lengthInBytes.get())); 
      
      sink.onDispose(subscribe); 

       

       

            Assignee:
            Unassigned Unassigned
            Reporter:
            slav.babanin@mongodb.com Slav Babanin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: