[SERVER-81321] Streams: Fix json deserializer to handle UTF16 escape sequences Created: 21/Sep/23  Updated: 29/Oct/23  Resolved: 27/Sep/23

Status: Closed
Project: Core Server
Component/s: None
Affects Version/s: None
Fix Version/s: 7.2.0-rc0

Type: Bug Priority: Major - P3
Reporter: Matthew Normyle Assignee: Matthew Normyle
Resolution: Fixed Votes: 0
Labels: init-337-m3, prioritize
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Assigned Teams:
Atlas Streams
Backwards Compatibility: Fully Compatible
Operating System: ALL
Sprint: Sprint 32
Participants:

 Description   

The string is represented by kafkacat with:

\ud83c\udf78

Which is UTF-16 for 🍸

Then after parsing it with fromjson and serializing it with tosjon, what results is:

\355\240\274\355\275\270

bsoncxx rejects this as invalid, which is true.

===========

 

The follow string can cause the error. Internal mongod BSON classes work fine.

$6 = 0x7f5edaf49c20 "{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\":

{\"$numberInt\":\"0\"}

,\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\":

{\"$numberInt\":\"48232\"}

,\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\":

{\"$numberInt\":\"135530929\"}

,\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\":

{\"$numberInt\":\"0\"}

,\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\":

{\"$numberLong\":\"1695272056821\"}

}},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"

 

 

 

 

mstreams: Invalid bytes in UTF8 string

 

Caused by a message like:

 

"{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\":

{\"$numberInt\":\"0\"}

,\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\":

{\"$numberInt\":\"48232\"}

,\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\":

{\"$numberInt\":\"135530929\"}

,\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\":

{\"$numberInt\":\"0\"}

,\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\":

{\"$numberLong\":\"1695272056821\"}

}},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"

 

This kafka:

kafkacat -C -b kafka.0x2f8.io:9093 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=mongo -X sasl.password=mongodata_123 -C -t “topic name”

 

This streamProcessor:

  {

    id: '650bb29b49d37c600ee82da1',

    name: 'kafkaToCollection',

    lastModified: ISODate("2023-09-21T03:03:55.814Z"),

    state: 'FAILED',

    errorMsg: 'resource has no heartbeat',

    pipeline: [

      *{ '$source':

{ connectionName: 'TestKafka1', topic: 'wiki1' }

},*

      {

        '$merge': {

          into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }

        }

      }

    ]

  }

 

Happened after 

{ partitions: [

{ partition: 0, offset: 10806944 }

] }

Happened again after 10821979

 

Splunk

{ [-]

   _p: F

   attr: { [-]

     context: { [-]

       streamProcessorId: 650bb29b49d37c600ee82da1

       streamProcessorName: kafkaToCollection

       tenantId: 650761da7df3a953fbe8390c

     }

     error: invalid bytes in UTF8 string: could not parse JSON document

   }

   c: STREAMS

   ctx: thread373

   id: 75897

   kube:

{ [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#]    }

   msg: encountered exception, exiting runLoop(): {error}

   s: E

   stream: stdout

   t:

{ [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#]    }

   time: 2023-09-21T03:11:07.151782139Z

}

 



 Comments   
Comment by Matthew Normyle [ 21/Sep/23 ]

Call stack:

 
_cxa_throw (@_cxa_throw:3)
bsoncxx::v_noabi::from_json(boost::basic_string_view<char, std::char_traits<char>>) (/home/ubuntu/mongo/src/mongo/db/modules/enterprise/src/streams/third_party/mongocxx/dist/bsoncxx/json.cpp:79)
streams::toBsoncxxDocument(mongo::BSONObj const&) (/home/ubuntu/mongo/src/mongo/db/modules/enterprise/src/streams/exec/mongocxx_utils.cpp:34)
streams::MongoDBProcessInterface::update(boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, std::unique_ptr<mongo::write_ops::UpdateCommandRequest, std::default_delete<mongo::write_ops::UpdateCommandRequest>>, mongo::WriteConcernOptions const&, mongo::MongoProcessInterface::UpsertType, bool, boost::optional<mongo::OID>) (/home/ubuntu/mongo/src/mongo/db/modules/enterprise/src/streams/exec/mongodb_process_interface.cpp:114)
auto mongo::(anonymous namespace)::makeUpdateStrategy()::$_3::operator()<boost::intrusive_ptr<mongo::ExpressionContext>, mongo::NamespaceString, mongo::WriteConcernOptions, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>, mongo::BatchedCommandRequest>(boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType) const (/home/ubuntu/mongo/src/mongo/db/pipeline/merge_processor.cpp:113)
void std::_invoke_impl<void, mongo::(anonymous namespace)::makeUpdateStrategy()::$_3&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>, mongo::BatchedCommandRequest, mongo::MongoProcessInterface::UpsertType>(std::_invoke_other, mongo::(anonymous namespace)::makeUpdateStrategy()::$_3&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>&&, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType&&) (/opt/mongodbtoolchain/revisions/69f4f0673ffcb290ce2307560a4883ecf2ad138c/stow/gcc-v4.u7o/include/c++/11.3.0/bits/invoke.h:61)
std::enable_if<is_invocable_r_v<void, mongo::(anonymous namespace)::makeUpdateStrategy()::$3&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>, mongo::BatchedCommandRequest, mongo::MongoProcessInterface::UpsertType>, void>::type std::_invoke_r<void, mongo::(anonymous namespace)::makeUpdateStrategy()::$_3&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>, mongo::BatchedCommandRequest, mongo::MongoProcessInterface::UpsertType>(mongo::(anonymous namespace)::makeUpdateStrategy()::$_3&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>&&, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType&&) (/opt/mongodbtoolchain/revisions/69f4f0673ffcb290ce2307560a4883ecf2ad138c/stow/gcc-v4.u7o/include/c++/11.3.0/bits/invoke.h:111)
std::_Function_handler<void (boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType), mongo::(anonymous namespace)::makeUpdateStrategy()::$_3>::_M_invoke(std::_Any_data const&, boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>&&, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType&&) (/opt/mongodbtoolchain/revisions/69f4f0673ffcb290ce2307560a4883ecf2ad138c/stow/gcc-v4.u7o/include/c++/11.3.0/bits/std_function.h:290)
std::function<void (boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType)>::operator()(boost::intrusive_ptr<mongo::ExpressionContext> const&, mongo::NamespaceString const&, mongo::WriteConcernOptions const&, boost::optional<mongo::OID>, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>&&, mongo::BatchedCommandRequest&&, mongo::MongoProcessInterface::UpsertType) const (/opt/mongodbtoolchain/revisions/69f4f0673ffcb290ce2307560a4883ecf2ad138c/stow/gcc-v4.u7o/include/c++/11.3.0/bits/std_function.h:590)
mongo::MergeProcessor::flush(mongo::BatchedCommandRequest, std::vector<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>, std::allocator<std::tuple<mongo::BSONObj, mongo::write_ops::UpdateModification, boost::optional<mongo::BSONObj>>>>) const (/home/ubuntu/mongo/src/mongo/db/pipeline/merge_processor.cpp:366)
streams::MergeOperator::processStreamDocs(streams::StreamDataMsg const&, unsigned long, unsigned long, unsigned long) (/home/ubuntu/mongo/src/mongo/db/modules/enterprise/src/streams/exec/merge_operator.cpp:136)
streams::MergeOperator::doSinkOnDataMsg(int, streams::StreamDataMsg, boost::optional<streams::StreamControlMsg>) (/home/ubuntu/mongo/src/mongo/db/modules/enterprise/src/streams/exec/merge_operator.cpp:88)

Generated at Thu Feb 08 06:46:10 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.