[JAVA-4167] Add MongoCursor#available method Created: 18/May/21  Updated: 28/Oct/23  Resolved: 01/Nov/21

Status: Closed
Project: Java Driver
Component/s: Change Streams, Query Operations
Affects Version/s: None
Fix Version/s: 4.4.0

Type: New Feature Priority: Unknown
Reporter: Matthew Shaylor Assignee: Jeffrey Yemin
Resolution: Fixed Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Related
is related to JAVA-4422 Add #available method to legacy curso... Closed
Backwards Compatibility: Fully Compatible
Documentation Changes: Needed

 Description   

I'm trying to write code that uses MongoChangeStream but also minimises the amount of blocking.

Currently if you have a MongoChangeStreamCursor and call tryNext() it will either return immediately with the next in batch, or if it doesn't have such an item it will contact the server to see if it has more items. In the case where nothing is available this will block for the maxAwaitTimeMS

I propose adding a getNextInBatch() api point to MongoChangeStreamCursor. This will return the next in the batch if it's available but if it's not available it will not cause a delay.

 

-------

 

My use case is that I'm trying to listener code that handles batches of work. It's tempting to repeatedly call tryNext() on the ChangeStreamCursor until it returns null, then handle all the objects it did return. However if the queue is being appended to then tryNext() will forever return objects. Whilst I can batch this, there's no way to align the batches that I handle with the batches fetched from the DB, which causes extra chat / small inefficiencies in the process.

The code change looks relatively simple, so if you'd be open to user submitted patches please say!

 

Thanks!! 

 

 

 



 Comments   
Comment by Githook User [ 01/Nov/21 ]

Author:

{'name': 'Jeff Yemin', 'email': 'jeff.yemin@mongodb.com', 'username': 'jyemin'}

Message: Add MongoCursor#available (#810)

JAVA-4167
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/7357bd5d428b9954bb2d5124d3c26574e0e5a4ff

Comment by Matthew Shaylor [ 25/Oct/21 ]

Sounds great. Thanks for fixing!

Comment by Jeffrey Yemin [ 21/Oct/21 ]

We decided to go with MongoCursor#available as the API.  Usage would look like this:

    public static List<ChangeStreamDocument<Document>> getThingsToDo(
              MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor) {
        
           List<ChangeStreamDocument<Document>> thingsToDo = new ArrayList<>();
 
            var work = cursor.tryNext();
            if (work == null)
                return Collections.emptyList();
 
            thingsToDo.add(work);
            while (cursor.available() > 0) {
                // drain everything we've fetched, but don't fetch any more
                thingsToDo.add(cursor.next());
            }
 
            return thingsToDo;
    }

Comment by Jeffrey Yemin [ 14/Jul/21 ]

Maybe instead of a new method to get documents from the cursor, we can have a new method that reports how many are currently available. This would be similar to the java.io.InputStream#available method in the JDK.

We'd be happy to check out a pull request if you're inclined to work on it.

Comment by Matthew Shaylor [ 21/May/21 ]

This is roughly my code:

 

public List getWork() {
    ChangeStreamIterable colit = collection
            .watch()
            .batchSize(100)
            //If there's nothing in the DB we want to wake up occasionally so we can check we're still alive!
            .maxAwaitTime(5000, TimeUnit.MILLISECONDS);
 
   List thingsToDo = new ArrayList();
   try (MongoChangeStreamCursor cursor = colit.cursor()) {   
       do {
           Object nextThing = cursor.tryNext();
           if(nextThing == null)
               break;
           thingsToDo.add(nextThing);
       } while (true);
 
       return thingsToDo;
   }
}

I want to batch the work I fetch from the DB since in my setup it's helpful to reduce DB chatter. But as a result I'll ALWAYS have to wait the timeout.

With my suggestion the inner loop would look like this:

 try (MongoChangeStreamCursor cursor = colit.cursor()) {
    Object work = cursor.tryNext();
    if(work == null) return  Collections.emptyList();
    thingsToDo.add(work);
    while(true) {
        work = cursor.getNextInBatch(); //drain everything we've fetched, but don't fetch any more
        if(work == null)
            break;
        thingsToDo.add(work);
    }
    
    return thingsToDo;
}

I hope that explains things. Let me know if you have any questions.

Comment by Jeffrey Yemin [ 20/May/21 ]

Hi mshaylor@mercuria.com,

Thanks for sharing your feedback. We'll take a look at this feature request and get back to you soon. In the meantime, could you post a code snippet showing exactly how you're attempting to work around the limitations of the current API?

Generated at Thu Feb 08 09:01:24 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.