[JAVA-3118] Provide an option to use pooled NIO direct buffers to minimise impact on GC Created: 09/Dec/18  Updated: 14/Sep/23

Status: Backlog
Project: Java Driver
Component/s: Performance
Affects Version/s: None
Fix Version/s: None

Type: New Feature Priority: Major - P3
Reporter: Farzad Pezeshkpour Assignee: Unassigned
Resolution: Unresolved Votes: 0
Labels: performance
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

When the driver is being used with GridFS to serve binary data ('files) over HTTP, the ByteBuffers that are returned are on-heap. Servers such as vertx are capable of directly serving ByteBuffers, using Netty's ByteBuf abstraction. By using on-heap buffers, we place undue pressure on the JVM GC, as the concurrent load on the service (that's using the driver) increases. Please can we have an option at least to direct the driver to use pooled direct buffers. Thanks.



 Comments   
Comment by Jeffrey Yemin [ 10/Dec/18 ]

For #2: yes, please open a separate issue.

Thanks,
Jeff

Comment by Farzad Pezeshkpour [ 10/Dec/18 ]

Hi Jeff,

Specifically, we use:

 

GridFSBucket#downloadToStream(String, AsyncOutputStream)

 

We implement the AsyncOutputStream to wrap a vertx RoutingContext as follows (apologies, it's in Kotlin):

class RoutingContextAsyncOutputStream(private val routingContext: RoutingContext) : AsyncOutputStream {
  companion object {
    private val log = loggerFor<RoutingContextAsyncOutputStream>()
  }
 
  override fun write(src: ByteBuffer?): Publisher<Int> {
    return when (src) {
      // case: when no source passed in - we should comply to reactive streams spec
      null -> Publisher { subscriber ->
        subscriber.onSubscribe(object : Subscription {
          override fun cancel() {
            routingContext.response().setStatusCode(500).setStatusMessage("stream cancelled").end()
          }
 
          override fun request(n: Long) {
            subscriber.onNext(0)
            subscriber.onComplete()
          }
        })
      }
      // case: when we've passed in a source ...
      else -> Publisher { subscriber ->
        subscriber.onSubscribe(object : Subscription {
          override fun cancel() {
            routingContext.response().setStatusCode(500).setStatusMessage("stream cancelled").end()
          }
 
          override fun request(n: Long) {
            // *******************************************
            // *** Jeff: The src buffer is on-heap here  *
           // *******************************************
            val size = src.remaining()
            try {
              val wrapped = Buffer.buffer(Unpooled.wrappedBuffer(src).slice(0, size))
              routingContext.response().write(wrapped)
              subscriber.onNext(size)
              subscriber.onComplete()
            } catch (err: Throwable) {
              log.error("failed to wrap and send buffer for response", err)
              subscriber.onError(err)
            }
          }
        })
      }
    }
  }
 
  override fun close(): Publisher<Success> {
    /****************************************************
     *** Jeff:  the mongo reactive streams driver doesn't respect the reactive streams protocol
     * and never calls the close method on the output stream
     ***************************************************/
    return Publisher { subscriber ->
      subscriber.onSubscribe(object : Subscription {
        override fun cancel() {
          routingContext.response().setStatusCode(500).setStatusMessage("stream cancelled").end()
        }
 
        override fun request(n: Long) {
          routingContext.response().end()
          subscriber.onNext(Success.SUCCESS)
          subscriber.onComplete()
        }
      })
    }
  }
}

Two points:
1. the src buffer passed to the adapter by the driver is on-heap.
2. as an aside, the close method is never called by driver (so it doesn't comply to the reactive-streams spec?). Open a new issue?

cheers
Fuzz

Comment by Jeffrey Yemin [ 10/Dec/18 ]

Hi dazraf, there are a number of places where the driver uses byte buffers, some of them internal to the driver.  Can you tell me where specifically that you think they are needed for your use case?

Comment by Farzad Pezeshkpour [ 09/Dec/18 ]

Thanks Jeff, much appreciated. If it helps with prioritisation, the reason why we use the async / reactive drivers is for performance under concurrent load. Something that would be of value to many I hope.

Comment by Jeffrey Yemin [ 09/Dec/18 ]

Moved to JAVA, since that's where this would have to start.  If we do this we'll make sure to expose it in Reactive/Scala drivers.

Generated at Thu Feb 08 08:58:50 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.