MongoMicroBatchStream uses BsonTimestampOffset for identifying partitions. Unlike ResumeTokenBasedOffset, this is a poor tool for the job, but there is likely no alternative (see "Entertaining a potential fix" below).
MongoMicroBatchStream provides Spark with the partition boundaries by returning Offset s from MongoMicroBatchStream.initialOffset/latestOffset:
- the first partition is [initialOffset, latestOffset1)
- the second partition is [latestOffset1, latestOffset2), where latestOffset2 = latestOffset1 + Δ1, and Δ1 is positive
- the third partition is [latestOffset2, latestOffset3), where latestOffset3 = latestOffset2 + Δ2, and Δ2 is positive
- ...1
The initialOffset is new BsonTimestampOffset(new BsonTimestamp(-1)) be default, which tells the MongoMicroBatchPartitionReader to create a change stream that reads only the latest events (those occurring after creating the stream).
The latestOffset is computed as Instant.now().getEpochSecond().
Thus, the first partition is effectively [initialServerTimestamp, initialClientTimestamp), where initialServerTimestamp is taken after the server receiving the client request to create a change stream, and initialClientTimestamp is taken before the client sending that request. Note that while initialClientTimestamp is BSON Timestamp, it is essentially just seconds since the Epoch, as initialClientTimestamp.ordinal is always 0.
When the server and the client clock are well-synchronized, it is likely that initialServerTimestamp > initialClientTimestamp, i.e., the start of the first partition is greater than the end of the first partition. When this happens, the first partition contains no change stream events, and the second partition [initialClientTimestamp, initialClientTimestamp + Δ), starts before the first one. If there are oplog entries (we are considering only those that may produce change stream events) with time_t in the timestamps being equal to initialClientTimestamp.time_t, the second partition contains corresponding change stream events despite their timestamps being potentially smaller than initialServerTimestamp. Such streaming of events from the past violates causal consistency (more specifically, monotonic reads, as far as I understand), and makes it possible for change stream events to spill over from a test (or the set up / tear down activities) to a succeeding test2.
Entertaining a potential fix.
In theory this problem can be fixed if we start using ResumeTokenBasedOffset (it contains values from MongoChangeStreamCursor.getResumeToken) instead of BsonTimestampOffset. The main difficulty here is that MongoChangeStreamCursor.getResumeToken is accessible only to MongoMicroBatchPartitionReader, while being needed by MongoMicroBatchStream. There seems to be no communication channel from MongoMicroBatchPartitionReader to MongoMicroBatchStream: these objects may exist in different processes (I didn't know this, thank you,ross@mongodb.com), so if there is a channel, it must be one provided by Spark. If we discover such a channel, then we will likely be able to fix the situation.
1 This is a simplification, as the listed intervals may be split into multiple contiguous partitions by MongoMicroBatchStream.planInputPartitions(Offset start, Offset end) (start and end are the interval boundaries) based on the value of the change.stream.micro.batch.max.partition.count configuration property, which is 1 by default. However, this splitting is irrelevant to us here.
2 See MongoMicroBatchStreamTest.testStreamInferSchemaWithDataPublishFullOnly failing with "AssertionFailedError: Expected to see 25 documents ==> expected: <25> but was: <26>". Here is a similar failure with a fresher codebase.
There are also failures when a test does not observe enough events, but I don't have an explanation for them.