Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-41164

Change Stream pipeline regex matches wrong oplog docs when using '|' pipe in db name

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: In Code Review
    • Priority: Minor - P4
    • Resolution: Unresolved
    • Affects Version/s: 4.0.9, 4.1.11
    • Fix Version/s: 4.1 Desired
    • Component/s: None
    • Labels:
      None
    • Operating System:
      ALL
    • Backport Requested:
      v4.0
    • Sprint:
      Query 2019-07-01
    • Case:

      Description

      Our documentation states Linux database names allow (do not disallow) use of vertical pipe 0x7C characters.

      When a pipe is used in a db name, the change stream aggregation on the oplog can return documents for the wrong database or invalidate the stream when creating a collection in the same database.

      Repro 1 in python 3.7 pymongo 3.8 MongoDB 4.0, 4.1.10 causes an invalidate event to occur on the watched collection when creating a new collection on the same db. It appears to process the implicit create collection oplog entry shown below here,

       operationType = DocumentSourceChangeStream::kInvalidateOpType;
      

      { "ts" : Timestamp(1557884892, 6), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "fails_because_of_pipe_char_in_db_name|.$cmd", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "create" : "alt", "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "fails_because_of_pipe_char_in_db_name|.alt" } } }
       
      { "ts" : Timestamp(1557884892, 7), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "i", "ns" : "fails_because_of_pipe_char_in_db_name|.alt", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "_id" : 1 } }
      

      import pymongo
      from pprint import pprint
      from pymongo import MongoClient
      client = MongoClient('localhost',28021)
       
      flag_fix_reproduction = False
      db_name = 'invalidates_wrong_stream_because_of_pipe_char_in_db_name|'
       
      def dump_queued_watch_events( stream ):
          last = {}
          last["operationType"] = "(none seen!)";
          while stream.alive:
              change = stream.try_next()
              if change is not None:
                  print("Change event = ")
                  pprint(change)
                  last["operationType"] = change["operationType"]
              elif stream.alive:
                  return last["operationType"]
          return "(stream closed!)"
       
      # assure cleanup between test runs
      client.drop_database( db_name )
      print("cleaning up test dbs...")
       
      db = client[db_name]
      documents = db["documents"]
      documents.insert_one({"_id": 1})
       
      # implicit create collection here outside watch avoids issue
       
      if ( flag_fix_reproduction ):
          alt = db["alt"]
          alt.insert_one({"_id": 1}) # IMPLICIT CREATE BEFORE STREAM
       
      with documents.watch() as stream:
          documents.insert_one({"_id": 2})
          assert dump_queued_watch_events( stream ) == "insert"
       
          documents.insert_one({"_id": 2.1})
          assert dump_queued_watch_events( stream ) == "insert"
       
          documents.insert_one({"_id": 2.2})
          assert dump_queued_watch_events( stream ) == "insert"
       
          documents.insert_one({"_id": 2.3})
          assert dump_queued_watch_events( stream ) == "insert"
          
          # implicit create collection here repros issue
          
          if ( flag_fix_reproduction ):
              alt.insert_one({"_id": 3.5}) # no DDL event IN-STREAM
          else:
              alt = db["alt"]
              alt.insert_one({"_id": 3}) # IMPLICIT CREATE IN-STREAM
              
          # repro fails here:
       
          assert dump_queued_watch_events( stream ) != "invalidate"
       
          documents.insert_one({"_id": 3})
          assert dump_queued_watch_events( stream ) == "insert"
       
          print(db_name + "OK to here as expected")
       
      print( "done!" )
      

      Repro 2 in the 4.0, 4.1.10 shell on MongoDB 4.0, 4.1.10, the final output document shows a Change Stream on namespace has_a_|pipe.documents.documents match and return an insert from a different db, has_no_pipe.documents :

      From changestream on has_a_|pipe.documents
      {
      	"_id" : {
      		"_data" : "825CDC58340000000D2B022C0100296E5A1004EDED18BF4358459FB3468AD4B8630D7E461E5F6964002B040004",
      		"_typeBits" : BinData(0,"QA==")
      	},
      	"operationType" : "insert",
      	"clusterTime" : Timestamp(1557944372, 13),
      	"fullDocument" : {
      		"_id" : 2
      	},
      	"ns" : {
      		"db" : "has_no_pipe",
      		"coll" : "documents"
      	},
      	"documentKey" : {
      		"_id" : 2
      	}
      }
      

      var db1 = db.getSiblingDB("has_no_pipe")
      var db2 = db.getSiblingDB("has_a_|pipe")
      var db3 = db.getSiblingDB("has_a_database")
       
      print("Drop test databases")
      db1.dropDatabase()
      db2.dropDatabase()
      db3.dropDatabase()
       
      var coll1 = db1.documents
      var coll2 = db2.documents
      var coll3 = db3.collection
       
      coll1.insertOne({_id:1})
      coll2.insertOne({_id:1})
       
      var cs1 = coll1.watch()
      var cs2 = coll2.watch()
       
      function waitFor(stream) {
          var i=10
          while (!stream.hasNext() && (i > 0)) {
              i--
          }
      }
       
      function printNext() {
          while (cs1.hasNext()) {
              print("From changestream on " + coll1.getFullName())
              printjson(cs1.next())
          }
          while (cs2.hasNext()) {
              print("From changestream on " + coll2.getFullName())
              printjson(cs2.next())
          }
      }
       
      print("Insert into " + coll1.getFullName())
      coll1.insertOne({_id:2})
      waitFor(cs1)
      printNext()
       
      print("Insert into " + coll2.getFullName())
      coll2.insertOne({_id:3})
      waitFor(cs2)
      printNext()
       
      print("Insert into (create) " + coll3.getFullName())
      coll3.insertOne({_id:4})
      waitFor(cs2)
      printNext()
      

      Both of these seem to be regex failures from unescaped pipes in the regex called here,

       return _nsRegex->PartialMatch(nsField.getString());
      

      Using pipes in db names seems like a poor idea yet customers have tried it, and the risk of documents leaking across databases seems fix-worthy.

        Attachments

          Activity

            People

            • Votes:
              0 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated: