|
Hello ezra.chung@mongodb.com.
Thank you for your time and input. I have looked at the code myself and your advice seems to make sense.
Since your suggested changes I haven't noted crashes or memory errors. I will let you know if other issues come up, but for now the code seems to be working as intended.
Thanks again for your valuable advice on the topic 
|
|
Hello, maxime.guillem@systrangroup.com and alexis.jacob@systrangroup.com.
I believe this is due to a dangling view caused by this piece of code:
if (!_lastResumeToken.empty()) {
|
auto resumeToken = bsoncxx::from_json(_lastResumeToken); // Scoped variable.
|
changeStreamOptions.resume_after(resumeToken.view()); // View to local variable.
|
} // Scoped variable is destroyed. (!!)
|
|
while (true) {
|
try {
|
// Use of dangling view to destroyed local variable: undefined behavior.
|
auto changeStream = collection.watch(/* ... */, changeStreamOptions);
|
// ...
|
}
|
// ...
|
}
|
A solution to this issue would be to provide std::move(resumeToken) rather than resumeToken.view() to .resume_after() so that ownership (and thus lifetime) is properly preserved. Directly passing the result of from_json() without an intermediate variable would also have the same effect.
Can you please confirm if this proposed solution resolves your issue?
As an aside, I noticed an unrelated peculiarity with the provided code that may also be worth addressing.
In the snippets posted above, the changeStream object is being constructed in every iteration of the while loop with the resumeAfter token set in changeStreamOptions.
This has the effect of resuming the change stream from the same event on each construction, which would "replay" the same series of events following the event corresponding to the last resume token on every iteration of the outer while loop:
using bsoncxx::builder::basic::kvp;
|
using bsoncxx::builder::basic::make_document;
|
|
auto client = mongocxx::client(mongocxx::uri("mongodb://localhost:27017"));
|
auto coll = client["db"]["coll"];
|
|
std::string resume_token;
|
|
{
|
auto stream = coll.watch();
|
|
// Single event to generate resume token.
|
coll.insert_one(make_document(kvp("x", 1)));
|
|
if (stream.begin() != stream.end()) {
|
auto token = stream.get_resume_token();
|
auto token_as_str = bsoncxx::to_json(*token);
|
resume_token = token_as_str;
|
}
|
}
|
|
// Three events after to-be-resumed-from event.
|
coll.insert_many(std::vector<bsoncxx::document::value>({
|
make_document(kvp('a', 1)),
|
make_document(kvp('b', 2)),
|
make_document(kvp('c', 3)),
|
}));
|
|
mongocxx::options::change_stream opts;
|
opts.resume_after(bsoncxx::from_json(resume_token));
|
|
// Iterate three times for demonstrative purposes.
|
for (int i = 0; i < 3; ++i) {
|
auto stream = coll.watch(opts); // Resumes from the same `resume_token` every iteration.
|
if (stream.begin() != stream.end()) {
|
for (auto change : stream) {
|
// Prints the same three inserted documents per iteration.
|
std::cout << bsoncxx::to_json(change["fullDocument"].get_document()) << std::endl;
|
|
// No effect during this series of iterations; `opts` is unchanged.
|
resume_token = bsoncxx::to_json(*stream.get_resume_token());
|
}
|
}
|
}
|
Assuming an initially empty db.coll collection, this code generates the following repetitious output:
{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6612" }, "a" : 1 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6613" }, "b" : 2 }
|
{ "_id" : { "$oid" : "62fa93152f661603390c6614" }, "c" : 3 }
|
I suspect this is not the intended behavior. Consider moving the construction of the changeStream object out of the while loops they are used in:
auto changeStream = collection.watch(changeStreamPipeline);
|
while (true) {
|
if (changeStream.begin() != changeStream.end()) { /* ... */ }
|
}
|
auto changeStream = collection.watch(changeStreamPipeline, changeStreamOptions);
|
while (true) {
|
try {
|
if (changeStream.begin() != changeStream.end()) { /* ... */ }
|
}
|
/* ... */
|
}
|
|
|
Hello,
I am a colleague of Maxime. We noticed that there was activity on this issue but we wanted an update.
Is there any additional info that you would like to know? So far, our code still uses oplogs and we'd like to switch to resume_tokens. Let us know if you need any input from us. Thanks in advance.
Best,
Alexis
|
|
Here is a stack trace :
__memmove_avx_unaligned_erms 0x00007fe9bd41baa4
_bson_append_va 0x00007fe9bde78fca
_bson_append 0x00007fe9bde792b9
bson_append_document 0x00007fe9bde7a8b5
bsoncxx::v_noabi::builder::core::append 0x00007fe9bde67f48
bsoncxx::v_noabi::builder::core::append 0x00007fe9bde69a25
bsoncxx::v_noabi::builder::basic::impl::generic_append<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> impl.hpp:55
bsoncxx::v_noabi::builder::basic::impl::value_append<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> impl.hpp:60
bsoncxx::v_noabi::builder::basic::sub_document::append_<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&> sub_document.hpp:77
bsoncxx::v_noabi::builder::basic::sub_document::append<std::tuple<std::_cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&>>(std::tuple<std::_cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> const&>&&) sub_document.hpp:47
mongocxx::v_noabi::options::(anonymous namespace)::append_if<bsoncxx::v_noabi::view_or_value<bsoncxx::v_noabi::document::view, bsoncxx::v_noabi::document::value> > 0x00007fe9be2ccd7f
mongocxx::v_noabi::options::change_stream::as_bson 0x00007fe9be2cc8de
mongocxx::v_noabi::collection::_watch 0x00007fe9be299713
mongocxx::v_noabi::collection::watch 0x00007fe9be299574
indexer::Indexable::syncEntriesLoop indexable.cc:137 <<== This is our code
boost::_mfi::mf0<void, indexer::Indexable>::operator() mem_fn_template.hpp:49
boost::_bi::list1<boost::_bi::value<indexer::Indexable*> >::operator()<boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list0> bind.hpp:237
boost::_bi::bind_t<void, boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list1<boost::_bi::value<indexer::Indexable*> > >::operator() bind.hpp:1272
boost::detail::thread_data<boost::_bi::bind_t<void, boost::_mfi::mf0<void, indexer::Indexable>, boost::_bi::list1<boost::_bi::value<indexer::Indexable*> > > >::run thread.hpp:120
boost::(anonymous namespace)::thread_proxy 0x00007fe9bccc7fa1
start_thread 0x00007fe9bca9a6db
clone 0x00007fe9bd3ae71f
And the segfault doesen't seems to occur without the resume_after.
Here the total code:
/* in function syncEntriesLoop */
|
mongocxx::pipeline changeStreamPipeline;
|
changeStreamPipeline.match(bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("operationType", "insert")));
|
mongocxx::options::change_stream changeStreamOptions;
|
|
if(!_lastResumeToken.empty()) //_lastResumeToken is a std::string
|
{
|
auto resumeToken = bsoncxx::from_json(_lastResumeToken);
|
changeStreamOptions.resume_after(resumeToken.view());
|
}
|
|
IndexerDb::ConnectionHandle connectionHandle;
|
auto collection = (*connectionHandle)[_database_name][_collection_name];
|
while (true)
|
{
|
try
|
{
|
auto changeStream = collection.watch(changeStreamPipeline, changeStreamOptions); // CRASH here
|
if(changeStream.begin() != changeStream.end())
|
{
|
for(const auto& change : changeStream)
|
{ //Compute change
|
auto document = change["fullDocument"].get_document();
|
std::string jsonChangeString = bsoncxx::to_json(document);
|
_do_index(document.view()["_id"].get_oid().value, document);
|
}
|
/* compute all the new changes */
|
if(!changeStream.get_resume_token()->empty())
|
_lastResumeToken = bsoncxx::to_json(changeStream.get_resume_token().get());
|
}
|
}
|
catch (boost::thread_interrupted&)
|
{
|
LOG_DEBUG("Stopping syncEntriesLoop() thread");
|
break;
|
}
|
catch (mongocxx::exception& e)
|
{
|
LOG_ERROR("DBException while syncing with the db: " + std::string(e.what()) + ". Retrying...");
|
}
|
catch (std::exception& e)
|
{
|
LOG_ERROR("Exception while syncing with the db: " + std::string(e.what()) + ". Retrying...");
|
}
|
}
|
|