[CXX-2289] Can't use resume_token after program restart Created: 18/Jun/21  Updated: 27/Oct/23  Resolved: 17/Aug/22

Status: Closed
Project: C++ Driver
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Unknown
Reporter: Maxime Guillem Assignee: Ezra Chung
Resolution: Works as Designed Votes: 1
Labels: changestreams, needs-first-responder, post-5.0, size-small
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to CXX-2337 "ChangeStream must continuously track... Closed
Quarter: FY23Q2

 Description   

Hello,

I have an issue with change_streams and resume_tokens.
Here is my use case : I have a server that read a collection and uses change_stream to sync to it. it uses a specific thread that has a while loop that constantly reads the change_stream.

while (true)
{
  auto changeStream = collection.watch(changeStreamPipeline);
  if(changeStream.begin() != changeStream.end())
  {
    for(const auto& change : changeStream)
    {
      auto document = change["fullDocument"].get_document();
      std::string jsonChangeString = bsoncxx::to_json(document);
      resumeToken = changeStream.get_resume_token();
      /* Compute index */
    }
  }
}

Where resumeToken is a class member.

 

 

Then the resume token is saved as a string. in a struct called indexer_data

 

boost::archive::binary_oarchive         archive(ofs);
IndexerData                             indexer_data;
 
std::string data = bsoncxx::to_json(_lastResumeToken.get());
indexer_data.lastResumeToken   = data;
 
archive << indexer_data;

 

 

Then when the server restart the archive is read and we try to resume the stream from the resumeToken

 

mongocxx::pipeline changeStreamPipeline;
changeStreamPipeline.match(bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("operationType", "insert")));
mongocxx::options::change_stream changeStreamOptions;
if(!savedResumeToken.empty())
{
  auto resumeToken = bsoncxx::from_json(savedResumeToken);
  changeStreamOptions.resume_after(resumeToken);
}
 
 
auto changeStream = collection.watch(changeStreamPipeline, changeStreamOptions); // CRASH HERE
 
/* Loop to get the changes from the change stream */

When we try to watch the collection with the options with the resumeToken, there is a crash, a SEGFAULT.
Is it a known bug, do I use the resume_token the wrong way ?
Note that it also happens if the resume_token is passed with the "start_after" option.

Thanks in advance

 

 



 Comments   
Comment by Alexis Jacob [ 17/Aug/22 ]

Hello ezra.chung@mongodb.com.

Thank you for your time and input. I have looked at the code myself and your advice seems to make sense.

Since your suggested changes I haven't noted crashes or memory errors. I will let you know if other issues come up, but for now the code seems to be working as intended.

Thanks again for your valuable advice on the topic

Comment by Ezra Chung [ 15/Aug/22 ]

Hello, maxime.guillem@systrangroup.com and alexis.jacob@systrangroup.com.

I believe this is due to a dangling view caused by this piece of code:

if (!_lastResumeToken.empty()) {
  auto resumeToken = bsoncxx::from_json(_lastResumeToken); // Scoped variable.
  changeStreamOptions.resume_after(resumeToken.view()); // View to local variable.
} // Scoped variable is destroyed. (!!)
 
while (true) {
  try {
    // Use of dangling view to destroyed local variable: undefined behavior.
    auto changeStream = collection.watch(/* ... */, changeStreamOptions);
    // ...
  }
  // ...
}

A solution to this issue would be to provide std::move(resumeToken) rather than resumeToken.view() to .resume_after() so that ownership (and thus lifetime) is properly preserved. Directly passing the result of from_json() without an intermediate variable would also have the same effect.

Can you please confirm if this proposed solution resolves your issue?

 

As an aside, I noticed an unrelated peculiarity with the provided code that may also be worth addressing.

In the snippets posted above, the changeStream object is being constructed in every iteration of the while loop with the resumeAfter token set in changeStreamOptions.
This has the effect of resuming the change stream from the same event on each construction, which would "replay" the same series of events following the event corresponding to the last resume token on every iteration of the outer while loop:

using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
 
auto client = mongocxx::client(mongocxx::uri("mongodb://localhost:27017"));
auto coll = client["db"]["coll"];
 
std::string resume_token;
 
{
    auto stream = coll.watch();
 
    // Single event to generate resume token.
    coll.insert_one(make_document(kvp("x", 1)));
 
    if (stream.begin() != stream.end()) {
        auto token = stream.get_resume_token();
        auto token_as_str = bsoncxx::to_json(*token);
        resume_token = token_as_str;
    }
}
 
// Three events after to-be-resumed-from event.
coll.insert_many(std::vector<bsoncxx::document::value>({
    make_document(kvp('a', 1)),
    make_document(kvp('b', 2)),
    make_document(kvp('c', 3)),
}));
 
mongocxx::options::change_stream opts;
opts.resume_after(bsoncxx::from_json(resume_token));
 
// Iterate three times for demonstrative purposes.
for (int i = 0; i < 3; ++i) {
    auto stream = coll.watch(opts);  // Resumes from the same `resume_token` every iteration.
    if (stream.begin() != stream.end()) {
        for (auto change : stream) {
            // Prints the same three inserted documents per iteration.
            std::cout << bsoncxx::to_json(change["fullDocument"].get_document()) << std::endl;
 
            // No effect during this series of iterations; `opts` is unchanged.
            resume_token = bsoncxx::to_json(*stream.get_resume_token());
        }
    }
}

Assuming an initially empty db.coll collection, this code generates the following repetitious output:

{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }
{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }
{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }

I suspect this is not the intended behavior. Consider moving the construction of the changeStream object out of the while loops they are used in:

auto changeStream = collection.watch(changeStreamPipeline); 
while (true) {
  if (changeStream.begin() != changeStream.end()) { /* ... */ }
}

auto changeStream = collection.watch(changeStreamPipeline, changeStreamOptions);
while (true) {
  try {
    if (changeStream.begin() != changeStream.end()) { /* ... */ }
  }
  /* ... */
}

 

Comment by Alexis Jacob [ 14/Dec/21 ]

Hello,

I am a colleague of Maxime. We noticed that there was activity on this issue but we wanted an update.

Is there any additional info that you would like to know? So far, our code still uses oplogs and we'd like to switch to resume_tokens. Let us know if you need any input from us. Thanks in advance.

Best,
Alexis

Comment by Maxime Guillem [ 07/Sep/21 ]

Here is a stack trace :
__memmove_avx_unaligned_erms 0x00007fe9bd41baa4
_bson_append_va 0x00007fe9bde78fca
_bson_append 0x00007fe9bde792b9
bson_append_document 0x00007fe9bde7a8b5
bsoncxx::v_noabi::builder::core::append 0x00007fe9bde67f48
bsoncxx::v_noabi::builder::core::append 0x00007fe9bde69a25
bsoncxx::v_noabi::builder::basic::impl::generic_append<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> impl.hpp:55
bsoncxx::v_noabi::builder::basic::impl::value_append<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> impl.hpp:60
bsoncxx::v_noabi::builder::basic::sub_document::append_<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> sub_document.hpp:77
bsoncxx::v_noabi::builder::basic::sub_document::append<std::tuple<std::_cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&>>(std::tuple<std::_cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&>&&) sub_document.hpp:47
mongocxx::v_noabi::options::(anonymous namespace)::append_if<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> > 0x00007fe9be2ccd7f
mongocxx::v_noabi::options::change_stream::as_bson 0x00007fe9be2cc8de
mongocxx::v_noabi::collection::_watch 0x00007fe9be299713
mongocxx::v_noabi::collection::watch 0x00007fe9be299574
indexer::Indexable::syncEntriesLoop indexable.cc:137 <<== This is our code
boost::_mfi::mf0<void, indexer::Indexable>::operator() mem_fn_template.hpp:49
boost::_bi::list1<boost::_bi::value<indexer::Indexable*> >::operator()<boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list0> bind.hpp:237
boost::_bi::bind_t<void, boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list1<boost::_bi::value<indexer::Indexable*> > >::operator() bind.hpp:1272
boost::detail::thread_data<boost::_bi::bind_t<void, boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list1<boost::_bi::value<indexer::Indexable*> > > >::run thread.hpp:120
boost::(anonymous namespace)::thread_proxy 0x00007fe9bccc7fa1
start_thread 0x00007fe9bca9a6db
clone 0x00007fe9bd3ae71f

And the segfault doesen't seems to occur without the resume_after.

Here the total code:

/* in function syncEntriesLoop */

mongocxx::pipeline changeStreamPipeline;
changeStreamPipeline.match(bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("operationType", "insert")));
mongocxx::options::change_stream changeStreamOptions;
 
if(!_lastResumeToken.empty()) //_lastResumeToken is a  std::string
{
  auto resumeToken = bsoncxx::from_json(_lastResumeToken);
  changeStreamOptions.resume_after(resumeToken.view());
}
 
IndexerDb::ConnectionHandle connectionHandle;
auto collection = (*connectionHandle)[_database_name][_collection_name];
while (true)
{
  try
  {
    auto changeStream = collection.watch(changeStreamPipeline, changeStreamOptions); // CRASH here
    if(changeStream.begin() != changeStream.end())
    {
      for(const auto& change : changeStream)
      {        //Compute change
        auto document = change["fullDocument"].get_document();
        std::string jsonChangeString = bsoncxx::to_json(document);
        _do_index(document.view()["_id"].get_oid().value, document);
      }
      /* compute all the new changes */
      if(!changeStream.get_resume_token()->empty())
        _lastResumeToken = bsoncxx::to_json(changeStream.get_resume_token().get());
    }
  }
  catch (boost::thread_interrupted&)
  {
    LOG_DEBUG("Stopping syncEntriesLoop() thread");
    break;
  }
  catch (mongocxx::exception& e)
  {
    LOG_ERROR("DBException while syncing with the db: " + std::string(e.what()) + ". Retrying...");
  }
  catch (std::exception& e)
  {
    LOG_ERROR("Exception while syncing with the db: " + std::string(e.what()) + ". Retrying...");
  }
}

Comment by Jesse Williamson (Inactive) [ 01/Sep/21 ]

Hello maxime.guillem@systrangroup.com, I'm having a look at this issue-- thank you for your patience!

Do you happen to have a stack trace? Additionally, does the segfault occur regardless of whether resume_after has been specified?

Comment by Maxime Guillem [ 30/Aug/21 ]

Hello, would it be possible to have an update on this issue please. We will need more visibility on this subject. Thanks in advance

Comment by Kevin Albertson [ 22/Jun/21 ]

Hello maxime.guillem@systrangroup.com, thank you for the report! We will further investigate this after we have completed high priority work for upcoming server support. Thank you for your patience.

Generated at Wed Feb 07 22:05:26 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.