Details
Description
Fix for this ticket also fixes other issues with the following regex special characters: *,+,|,(,),^,?,[,],.,/,\,$.
When a pipe is used in a db name, the change stream aggregation on the oplog can return documents for the wrong database or invalidate the stream when creating a collection in the same database.
Repro 1 in python 3.7 pymongo 3.8 MongoDB 4.0, 4.1.10 causes an invalidate event to occur on the watched collection when creating a new collection on the same db. It appears to process the implicit create collection oplog entry shown below here,
operationType = DocumentSourceChangeStream::kInvalidateOpType;
|
{ "ts" : Timestamp(1557884892, 6), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "fails_because_of_pipe_char_in_db_name|.$cmd", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "create" : "alt", "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "fails_because_of_pipe_char_in_db_name|.alt" } } }
|
|
{ "ts" : Timestamp(1557884892, 7), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "i", "ns" : "fails_because_of_pipe_char_in_db_name|.alt", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "_id" : 1 } }
|
import pymongo |
from pprint import pprint |
from pymongo import MongoClient |
client = MongoClient('localhost',28021) |
|
flag_fix_reproduction = False |
db_name = 'invalidates_wrong_stream_because_of_pipe_char_in_db_name|' |
|
def dump_queued_watch_events( stream ): |
last = {} |
last["operationType"] = "(none seen!)"; |
while stream.alive: |
change = stream.try_next() |
if change is not None: |
print("Change event = ") |
pprint(change)
|
last["operationType"] = change["operationType"] |
elif stream.alive: |
return last["operationType"] |
return "(stream closed!)" |
|
# assure cleanup between test runs
|
client.drop_database( db_name )
|
print("cleaning up test dbs...") |
|
db = client[db_name] |
documents = db["documents"] |
documents.insert_one({"_id": 1}) |
|
# implicit create collection here outside watch avoids issue
|
|
if ( flag_fix_reproduction ): |
alt = db["alt"] |
alt.insert_one({"_id": 1}) # IMPLICIT CREATE BEFORE STREAM |
|
with documents.watch() as stream:
|
documents.insert_one({"_id": 2}) |
assert dump_queued_watch_events( stream ) == "insert" |
|
documents.insert_one({"_id": 2.1}) |
assert dump_queued_watch_events( stream ) == "insert" |
|
documents.insert_one({"_id": 2.2}) |
assert dump_queued_watch_events( stream ) == "insert" |
|
documents.insert_one({"_id": 2.3}) |
assert dump_queued_watch_events( stream ) == "insert" |
|
# implicit create collection here repros issue |
|
if ( flag_fix_reproduction ): |
alt.insert_one({"_id": 3.5}) # no DDL event IN-STREAM |
else: |
alt = db["alt"] |
alt.insert_one({"_id": 3}) # IMPLICIT CREATE IN-STREAM |
|
# repro fails here: |
|
assert dump_queued_watch_events( stream ) != "invalidate" |
|
documents.insert_one({"_id": 3}) |
assert dump_queued_watch_events( stream ) == "insert" |
|
print(db_name + "OK to here as expected") |
|
print( "done!" ) |
Repro 2 in the 4.0, 4.1.10 shell on MongoDB 4.0, 4.1.10, the final output document shows a Change Stream on namespace has_a_|pipe.documents.documents match and return an insert from a different db, has_no_pipe.documents :
From changestream on has_a_|pipe.documents
|
{
|
"_id" : {
|
"_data" : "825CDC58340000000D2B022C0100296E5A1004EDED18BF4358459FB3468AD4B8630D7E461E5F6964002B040004",
|
"_typeBits" : BinData(0,"QA==")
|
},
|
"operationType" : "insert",
|
"clusterTime" : Timestamp(1557944372, 13),
|
"fullDocument" : {
|
"_id" : 2
|
},
|
"ns" : {
|
"db" : "has_no_pipe",
|
"coll" : "documents"
|
},
|
"documentKey" : {
|
"_id" : 2
|
}
|
}
|
var db1 = db.getSiblingDB("has_no_pipe") |
var db2 = db.getSiblingDB("has_a_|pipe") |
var db3 = db.getSiblingDB("has_a_database") |
|
print("Drop test databases") |
db1.dropDatabase()
|
db2.dropDatabase()
|
db3.dropDatabase()
|
|
var coll1 = db1.documents |
var coll2 = db2.documents |
var coll3 = db3.collection |
|
coll1.insertOne({_id:1})
|
coll2.insertOne({_id:1})
|
|
var cs1 = coll1.watch() |
var cs2 = coll2.watch() |
|
function waitFor(stream) { |
var i=10 |
while (!stream.hasNext() && (i > 0)) { |
i--
|
}
|
}
|
|
function printNext() { |
while (cs1.hasNext()) { |
print("From changestream on " + coll1.getFullName()) |
printjson(cs1.next())
|
}
|
while (cs2.hasNext()) { |
print("From changestream on " + coll2.getFullName()) |
printjson(cs2.next())
|
}
|
}
|
|
print("Insert into " + coll1.getFullName()) |
coll1.insertOne({_id:2})
|
waitFor(cs1)
|
printNext()
|
|
print("Insert into " + coll2.getFullName()) |
coll2.insertOne({_id:3})
|
waitFor(cs2)
|
printNext()
|
|
print("Insert into (create) " + coll3.getFullName()) |
coll3.insertOne({_id:4})
|
waitFor(cs2)
|
printNext()
|
Both of these seem to be regex failures from unescaped pipes in the regex called here,
return _nsRegex->PartialMatch(nsField.getString()); |
Using pipes in db names seems like a poor idea yet customers have tried it, and the risk of documents leaking across databases seems fix-worthy.