[JAVA-1282] Create new async GridFS API Created: 17/Jun/14  Updated: 19/Oct/16  Resolved: 28/Apr/16

Status: Closed
Project: Java Driver
Component/s: Async, GridFS
Affects Version/s: None
Fix Version/s: 3.3.0

Type: New Feature Priority: Major - P3
Reporter: Jeffrey Yemin Assignee: Ross Lawley
Resolution: Done Votes: 2
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to JAVA-2175 GridFSFile and users codecs can cause... Closed

 Description   

Currently there is no async support for GridFS. We need to design a new GridFS API that works properly with async.



 Comments   
Comment by Githook User [ 03/May/16 ]

Author:

{u'username': u'rozza', u'name': u'Ross Lawley', u'email': u'ross.lawley@gmail.com'}

Message: Increased the timeouts for the slow GridFS tests

Added runSlow method to Async GridFS test helper

JAVA-1282
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/93b1142c4d4aae3f2053227592d452cbc8fa828a

Comment by Steve Hummingbird [ 28/Apr/16 ]

Awesome. Thanks Ross. The GridFSFile codec now seems to work great with my Instant Codec.

Comment by Ross Lawley [ 28/Apr/16 ]

Thanks again Steve Hummingbird for testing and reporting the issues back.

The codec and the small chunk issues have now been fixed and a new snapshot has just been released.

Comment by Githook User [ 28/Apr/16 ]

Author:

{u'username': u'rozza', u'name': u'Ross Lawley', u'email': u'ross.lawley@gmail.com'}

Message: Improved how small chunksizes are handled in Async GridFS

Added minimum bufferSize when downloading to AsyncOutputStream
Reduced recursiveness of GridFSDownloadStreamImpl when consuming chunks.

JAVA-1282
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/a159ad7bc57f161f822cdc72f47b3baef4759cf6

Comment by Githook User [ 27/Apr/16 ]

Author:

{u'username': u'rozza', u'name': u'Ross Lawley', u'email': u'ross.lawley@gmail.com'}

Message: Added Codec for GridFSFile

GridFSFile requires certain types eg. Date, but takes a users Codec which may
have custom type mapping. Metadata and Extra elements are decoded using the
users codec, all other fields are decoded using the type safe BsonDocumentCodec.

JAVA-2175 (refs JAVA-1282)
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/ff0c5d189cf4684517e9ee4df8b94c0224f66e60

Comment by Ross Lawley [ 20/Apr/16 ]

Reopening this ticket, pending fixes for the above issues.

Comment by Jeffrey Yemin [ 20/Apr/16 ]

Given what I'm seeing, a larger file size won't cause a problem, only a small chunk size. The issue is triggered when a lot of small chunks fit in a single response to a getMore command. With the way the code is implemented now, it ends up triggering a whole series of callbacks back and forth, all on the same thread. Eventually the stack overflows.

Comment by Steve Hummingbird [ 20/Apr/16 ]

Hi Jeff,
yes, I was using higher values for the chunk size to work around that for now, but that issue might persist if the file size suddenly rises. I couldn't reproduce the issue when having fewer chunks in gridFS as well. However, there has not been a particular reason to use that small chunk size. When I started to use the new driver, I just took the code from the GridFSTour, which uses 1024 as the chunk size without refining that value. When I first added a file with a few hundred kb, I suddenly wasn't able to download it any more.

Comment by Jeffrey Yemin [ 19/Apr/16 ]

Hi Steve,

I'm able to reproduce the second problem you're seeing. It looks like a bug, but you'll likely only encounter it with a low value for chunkSizeBytes, like 1024 as in your example. So you can work around the issue by using a larger value: for instance, I can't reproduce the issue when I leave it as the default of (255 * 1024). Is there a particular reason you were testing with such a small batch size?

Comment by Jeffrey Yemin [ 19/Apr/16 ]

Hi Steve,

First of all, we really appreciate the advanced testing you are doing for async GridFS.

The custom codec issue should be considered a bug. While we investigate a solution, you can work around it by passing a MongoDatabase instance to GridFSBuckets.create that is configured with the default codec registry, e.g.

GridFSBuckets.create(database.withCodecRegistry(MongoClients.getDefaultCodecRegistry())

This will be more resource-efficient that using a different MongoClient just for GridFS.

As for the modified GridFSTour throwing an error, that also looks to be a bug, so we'll investigate that as well.

Comment by Steve Hummingbird [ 19/Apr/16 ]

There has been another issue I have been running into. In case there is huge number of chunks created (in this case 500), downloading the chunks leads to an Exception which also gets swallowed by the same error handler. Right now I am not sure if I am running into another (or even to be expected) limitation.

This is a modified version of the GridFSTour which leads to the error on my machine:

/*
 * Copyright 2015 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
package gridfs;
 
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.async.client.gridfs.AsyncInputStream;
import com.mongodb.async.client.gridfs.AsyncOutputStream;
import com.mongodb.async.client.gridfs.GridFSBucket;
import com.mongodb.async.client.gridfs.GridFSBuckets;
import com.mongodb.async.client.gridfs.helpers.AsyncStreamHelper;
import com.mongodb.client.gridfs.model.GridFSDownloadByNameOptions;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import org.bson.Document;
import org.bson.types.ObjectId;
 
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
 
import static com.mongodb.async.client.gridfs.helpers.AsyncStreamHelper.toAsyncInputStream;
 
/**
 * The GridFS code example see: https://mongodb.github.io/mongo-java-driver/3.1/driver/reference/gridfs
 */
public final class GridFSTour {
 
    /**
     * Run this main method to see the output of this quick example.
     *
     * @param args takes an optional single argument for the connection string
     * @throws FileNotFoundException if the sample file cannot be found
     * @throws IOException if there was an exception closing an input stream
     * @throws InterruptedException if a latch was interrupted
     */
    public static void main(final String[] args) throws FileNotFoundException, InterruptedException, IOException {
        MongoClient mongoClient;
 
        if (args.length == 0) {
            // connect to the local database server
            mongoClient = MongoClients.create();
        } else {
            mongoClient = MongoClients.create(args[0]);
        }
 
        // get handle to "mydb" database
        MongoDatabase database = mongoClient.getDatabase("mydb");
        final CountDownLatch dropLatch = new CountDownLatch(1);
        database.drop(new SingleResultCallback<Void>() {
            @Override
            public void onResult(final Void result, final Throwable t) {
                dropLatch.countDown();
            }
        });
        dropLatch.await();
 
        GridFSBucket gridFSBucket = GridFSBuckets.create(database);
 
        /*
         * UploadFromStream Example
         */
        // Get the input stream
        byte[] b = new byte[1024 * 500];
        new Random().nextBytes(b);
 
        final AsyncInputStream streamToUploadFrom = toAsyncInputStream(b);
 
        // Create some custom options
        GridFSUploadOptions options = new GridFSUploadOptions()
                .chunkSizeBytes(1024)
                .metadata(new Document("type", "presentation"));
 
        final AtomicReference<ObjectId> fileIdRef = new AtomicReference<ObjectId>();
        final CountDownLatch uploadLatch = new CountDownLatch(1);
        gridFSBucket.uploadFromStream("mongodb-tutorial", streamToUploadFrom, options, new SingleResultCallback<ObjectId>() {
            @Override
            public void onResult(final ObjectId result, final Throwable t) {
                fileIdRef.set(result);
                System.out.println("The fileId of the uploaded file is: " + result.toHexString());
                streamToUploadFrom.close(new SingleResultCallback<Void>() {
                    @Override
                    public void onResult(final Void result, final Throwable t) {
                        uploadLatch.countDown();
                    }
                });
            }
        });
        uploadLatch.await();
 
        /*
         * DownloadToStreamByName
         */
        final CountDownLatch downloadLatch2 = new CountDownLatch(1);
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        AsyncOutputStream asyncOutputStream = AsyncStreamHelper.toAsyncOutputStream(outStream);
 
        GridFSDownloadByNameOptions downloadOptions = new GridFSDownloadByNameOptions().revision(0);
        gridFSBucket.downloadToStreamByName("mongodb-tutorial", asyncOutputStream, downloadOptions,
                new SingleResultCallback<Long>() {
                    @Override
                    public void onResult(final Long result, final Throwable t) {
                        downloadLatch2.countDown();
                        System.out.println("downloaded file sized: " + result);
                    }
                });
        downloadLatch2.await();
 
        asyncOutputStream.close(new SingleResultCallback<Void>() {
            @Override
            public void onResult(final Void aVoid, final Throwable t) {
                System.out.println("stream closed");
            }
        });
 
        // Final cleanup
        final CountDownLatch dropLatch2 = new CountDownLatch(1);
        database.drop(new SingleResultCallback<Void>() {
            @Override
            public void onResult(final Void result, final Throwable t) {
                dropLatch2.countDown();
            }
        });
        dropLatch2.await();
        System.out.println("Finished");
    }
 
    private GridFSTour() {
    }
}

And this is the stack trace that gets swallowed:

java.lang.IllegalStateException: Attempted to decrement the reference count below 0
	at com.mongodb.connection.AbstractReferenceCounted.release(AbstractReferenceCounted.java:41)
	at com.mongodb.connection.DefaultServerConnection.release(DefaultServerConnection.java:58)
	at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:249)
	at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:234)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:53)
	at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor$1.onResult(DefaultServer.java:176)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:53)
	at com.mongodb.connection.CommandProtocol$CommandResultCallback.callCallback(CommandProtocol.java:275)
	at com.mongodb.connection.ResponseCallback.onResult(ResponseCallback.java:48)
	at com.mongodb.connection.ResponseCallback.onResult(ResponseCallback.java:23)
	at com.mongodb.connection.DefaultConnectionPool$PooledConnection$2.onResult(DefaultConnectionPool.java:464)
	at com.mongodb.connection.DefaultConnectionPool$PooledConnection$2.onResult(DefaultConnectionPool.java:458)
	at com.mongodb.connection.UsageTrackingInternalConnection$3.onResult(UsageTrackingInternalConnection.java:127)
	at com.mongodb.connection.UsageTrackingInternalConnection$3.onResult(UsageTrackingInternalConnection.java:123)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:53)
	at com.mongodb.connection.InternalStreamConnection.executeCallbackAndReceiveResponse(InternalStreamConnection.java:375)
	at com.mongodb.connection.InternalStreamConnection.access$1700(InternalStreamConnection.java:65)
	at com.mongodb.connection.InternalStreamConnection$ResponseBuffersCallback.onResult(InternalStreamConnection.java:417)
	at com.mongodb.connection.InternalStreamConnection$ResponseBuffersCallback.onResult(InternalStreamConnection.java:386)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.onSuccess(InternalStreamConnection.java:559)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.access$2200(InternalStreamConnection.java:514)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback$ResponseBodyCallback.onResult(InternalStreamConnection.java:581)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback$ResponseBodyCallback.onResult(InternalStreamConnection.java:565)
	at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:444)
	at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:441)
	at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:218)
	at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:201)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
	at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:553)
	at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
	at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
	at com.mongodb.connection.AsynchronousSocketChannelStream.readAsync(AsynchronousSocketChannelStream.java:125)
	at com.mongodb.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:441)
	at com.mongodb.connection.InternalStreamConnection.access$2000(InternalStreamConnection.java:65)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.onResult(InternalStreamConnection.java:538)
	at com.mongodb.connection.InternalStreamConnection$ResponseHeaderCallback.onResult(InternalStreamConnection.java:514)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:53)
	at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:444)
	at com.mongodb.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:441)
	at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:218)
	at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:201)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:430)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
	at sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:301)
	at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Comment by Steve Hummingbird [ 19/Apr/16 ]

I have been running into issues with the new GridFS implementation when using custom codecs. I have implemented a Codec and a Provider which make use of the java 8 Instant instead of the old java.util.Date class. The GridFSFileHelper however expects the uploadDate to always be returned as java.util.Date:

Date uploadDate = document.getDate("uploadDate");

In the end this leads to an exception as an Instant can not be cast to a date, which does not seem to be handed back to the client. The only behaviour that is visible to the client is that the callback never gets called and the driver seems to be stuck.

As the GridFSFile is composed programmatically, I guess there isn't any way to tell my custom codec to not to be used for the GridFSFile, or is it?
Secondly, as the GridFSFileHelper is wired via a static import, I guess there currently is no way to swap that with my own implementation easily which knows that a date is an instant? So, how do I deal with that? (I am temporarily just using a dedicated client that does not know about my custom codecs as a workaround for GridFS)

The exception gets caught in that block in ErrorHandlingResultCallback.java

    @Override
    public void onResult(final T result, final Throwable t) {
        try {
            wrapped.onResult(result, t);
        } catch (Exception e) {
            if (logger != null) {
                logger.warn("Callback onResult call produced an error", e);
            }
        }
    }

where the logger is null, so the exception just gets swallowed. I don't know the implementation of the driver that well to judge if there might be any side effects, but it kind of seems to make sense to report any occurring exception back to the client instead of just swallowing and/or logging them.

Comment by Steve Hummingbird [ 18/Apr/16 ]

Hi Jeff,

thank you. that made things clearer.

Comment by Jeffrey Yemin [ 10/Apr/16 ]

Hi Steve,

You're describing a producer-consumer problem where fileUpload is the producer and uploadStream is the consumer. The GridFSUploadStream API, like the Java AsynchronousSocketChannel API on which it is modelled, does not include any support for buffering, so your application will have to. To avoid this buffer growing unbounded, you may have to find a way to apply back pressure to the file uploader.

Comment by Steve Hummingbird [ 10/Apr/16 ]

Great

I was trying to use the new api, however I was having some trouble when working with data that is available asynchronously, but I might just be missing something obvious. I am using vertx 2, where I have to register a handler when new data is available.

Apparently I can not do something like that, where I would just call the write() method when data is available, as this leads to an exception: com.mongodb.MongoGridFSException: The AsyncOutputStream does not support concurrent writing.

GridFSUploadStream uploadStream = gridFS.openUploadStream('test', options)
fileUpload.dataHandler { Buffer buffer ->
    uploadStream.write(ByteBuffer.wrap(buffer.bytes), { Integer bytesWritten, Throwable t ->
        System.out.println('wrote: ' + bytesWritten + ' bytes. t: ' + t)
    } as SingleResultCallback<Integer>)
}

This would lead the conclusion that I have to buffer all the data programmatically until the GridFS driver is calling the callback, where I then am able to add another slice of data, where I can add another callback to handle the next slice that has been available in the meantime, or maybe that slice isn't available yet, which will complicate things further.

In case I implement my own AsyncInputStream, the same thing applies. I would need to somehow buffer the data until the read method is called by the driver where I can provide a slice of data, which in turn seems quite cumbersome to me. And just in case, if the next slice of data is not yet available when the read method is called, and I provide 0 bytes, does the driver periodically poll if the data is available yet?

So the only option that appears to me is to buffer all data until everything is available and then call the GridFS driver, but that does not seem very efficient to me.

I guess there must be a better way to do this, which I am just missing. Could you please add some detail on how the driver should be used in such case?

Comment by Githook User [ 06/Apr/16 ]

Author:

{u'username': u'rozza', u'name': u'Ross Lawley', u'email': u'ross.lawley@gmail.com'}

Message: Asynchronous GridFS Implementation

Uses a custom AsyncInputStream and AsyncOutputStream for easy adaptability to custom async byte I/O.

JAVA-1282
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/7283dc1674fc9d8b4144a3dac44957022b7aba4e

Comment by Ross Lawley [ 30/Mar/16 ]

PR: https://github.com/rozza/mongo-java-driver/pull/148

Generated at Thu Feb 08 08:54:13 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.