[CXX-2435] Why does change stream end when more logs exist Created: 31/Jan/22  Updated: 27/Oct/23  Resolved: 03/Feb/22

Status: Closed
Project: C++ Driver
Component/s: API
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Unknown
Reporter: Oded Raiches Assignee: Kevin Albertson
Resolution: Works as Designed Votes: 0
Labels: changestreams
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to CXX-2441 Add example of iterating change strea... Closed

 Description   

Hi,

I'm using change streams to gather statistics about my operations (need to take some info from each insert/update) - so I need it to act as a listener without closing.

I inserted 1,000,000 documents into a collection and on the change stream side I am using the start_at_operation_time() option to gather all the required info.

I noticed that the change stream closes randomly before all 1M objects are viewed (using a counter in the loop).

Also, I noticed that when I configure the max_await_time() to 5000 ms then it works fine and all documents are captured before it closes.

The behavior I need is to have the change-stream as a listener that waits for changes all the time. 

Is there a way to have this loop blocking? meaning that it will always wait for the next incoming change without exiting the loop.

I saw that someone once issued a "bug" about it, but this is the behavior I want to implement:

https://jira.mongodb.org/browse/CXX-2278?jql=project%20%3D%20CXX%20AND%20component%20%3D%20Implementation

From some POCs that I made with Golang and python driver, it looks like they are always blocking and waiting for the next change.

 

mongocxx driver version: 3.6.6

mongoc driver version: 1.20.1

 



 Comments   
Comment by Kevin Albertson [ 03/Feb/22 ]

Works! thanks a lot.

Great to hear.

On the documentation for operator++() and begin() it says that they are blocking. Does this mean that there is still a bug that should be fixed here? please answer whether this is a problem in mongocxx or the mongoc driver, or just the documentation:

operator++() and begin() are blocking operations. They may block for up to max_await_time waiting for a possible notification. I think the documentation could clarify what is meant. I created CXX-2441 to propose three documentation improvements.

Comment by Oded Raiches [ 02/Feb/22 ]

Works! thanks a lot.

On the documentation for operator++() and begin() it says that they are blocking.

Does this mean that there is still a bug that should be fixed here? please answer whether this is a problem in mongocxx or the mongoc driver, or just the documentation:

http://mongocxx.org/api/current/classmongocxx_1_1change__stream_1_1iterator.html#a9d7c8a486adc076d3d99a6792c143df3

Comment by Kevin Albertson [ 02/Feb/22 ]

Incrementing the iterator with cs_iter++ invalidates the bsoncxx::document::view returned by *cs_iter. Putting the access to event inside the loop should fix that segfault:

    int count = 0;     
    change_stream stream = db.watch(pipeline, options);     
    auto cs_iter = stream.begin();     
    bsoncxx::document::view event;     
    while (true) {         
    if (cs_iter == stream.end()) {             
        std::cout << "No event yet." << std::endl;             
        cs_iter = stream.begin(); // sends at most one getMore command.  
        continue;
    } else {             
        std::cout << count << std::endl;             
        event = *cs_iter;                 
        auto eventLen = event.length(); 
        std::cout << "eventLen " << eventLen << std::endl; 
        if (count == 11197) {             
            std::cout << bsoncxx::to_json(event) << std::endl;         
        }         
        auto f2 = event.find("ns");         
        std::cout << "f2" << std::endl;         
        count++;         
        // do more stuff with event...      
        }    
        cs_iter++; // sends at most one getMore command.         
    }

Comment by Oded Raiches [ 02/Feb/22 ]

Hi Kevin,

When using this approach, I get a segfault after 11197 loops on every attempt (when trying to access the event).

A bit of what I was trying to do:

 
 

int count = 0;     
change_stream stream = db.watch(pipeline, options);     
auto cs_iter = stream.begin();     
bsoncxx::document::view event;     
while (true) {         
  if (cs_iter == stream.end()) {             
    std::cout << "No event yet." << std::endl;             
    cs_iter = stream.begin(); // sends at most one getMore command.  
    continue;
  } else {             
    std::cout << count << std::endl;             
    event = *cs_iter;                     
    cs_iter++; // sends at most one getMore command.         
  } 
 
  auto eventLen = event.length(); 
  std::cout << "eventLen " << eventLen << std::endl; 
  if (count == 11197) {             
    std::cout << bsoncxx::to_json(event) << std::endl;         
  }         
  auto f2 = event.find("ns");         
  std::cout << "f2" << std::endl;         
  count++;         
  // do more stuff with event...      
}

 
This seems to segfault on the call to to_json(event):

 

...

...

11195
eventLen 355
f2
11196
eventLen 352
f2
11197
eventLen 355
Segmentation fault (core dumped)

 

Comment by Kevin Albertson [ 01/Feb/22 ]

Hello oraiches@zadarastorage.com,

Is there a way to have this loop blocking? meaning that it will always wait for the next incoming change without exiting the loop.

The iterator returned by mongocxx::change_stream::begin() is exhausted after one attempt to poll for new events (with the getMore command) from the server returns no results.

To continue iterating a change stream until the next event or error, the current API requires calling mongocxx::change_stream::begin() again to continue iteration.

auto client = mongocxx::client(mongocxx::uri(uristr));
auto db = client.database("db");
auto cs = db.collection("coll").watch();
auto cs_iter = cs.begin(); // sends at most one aggregate command.
while (true) {
    if (cs_iter == cs.end()) {
        std::cout << "No event yet." << std::endl;
        cs_iter = cs.begin (); // sends at most one getMore command.
    } else {
        bsoncxx::document::view event = *cs_iter;
        std::cout << "Got event: " << bsoncxx::to_json (event) << std::endl;
        cs_iter++; // sends at most one getMore command.
    }
}
// Note: if a resumable error occurs while iterating a change stream, there
// may be repeated attempts of the aggregate command in any of the begin or
// iteration calls.

Comment by Kevin Albertson [ 31/Jan/22 ]

Hello oraiches@zadarastorage.com, thank you for the report. We will look into this soon.

Generated at Wed Feb 07 22:05:55 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.