(Using v1.3.4)
When consuming a change stream in a for loop with successive calls to cursor.Next(), after the "invalidate" event is received on the stream the subsequent call to Next() never returns and CPU usage hits 100%.
In practice the caller should probably avoid this by detecting the invalidate event and breaking the loop, but it seems like the driver should be able to avoid spinning the CPU when the stream goes idle after being invalidated.
Here's a minimal test case to reproduce:
package main import ( "context" "fmt" "log" "testing" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) func TestChangestream(t *testing.T) { log.SetFlags(log.LstdFlags | log.Lshortfile) client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:26000")) if err != nil { log.Fatal(err) } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) err = client.Connect(ctx) db1, coll1 := primitive.NewObjectID().Hex(), primitive.NewObjectID().Hex() _, err = client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"x", "1"}}) if err != nil { log.Fatal(err) } cursor, err := client.Database(db1).Watch( context.Background(), []interface{}{}, ) if err != nil { log.Fatal(err) } doneChan1 := make(chan struct{}) go func() { defer close(doneChan1) log.Println("starting changestream loop") for { log.Println("calling cursor next...") if cursor.Next(context.Background()) { event := bson.D{} cursor.Decode(&event) fmt.Println("got event", event) } log.Println("...cursor next returned.") if err := cursor.Err(); err != nil { log.Fatal(err) } } }() doneChan2 := make(chan struct{}) go func() { defer close(doneChan2) for i := 0; i < 5; i++ { _, err := client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"test", i}}) if err != nil { log.Fatal(err) } time.Sleep(time.Second) } err := client.Database(db1).Drop(context.Background()) if err != nil { log.Fatal(err) } }() <-doneChan1 <-doneChan2 }
A goroutine stack dump while the test is hung shows:
goroutine 7 [select]: go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).update(0xc000139ae0) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:391 +0x3ec created by go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).Connect /Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:169 +0x14d goroutine 12 [runnable]: go.mongodb.org/mongo-driver/mongo.(*ChangeStream).emptyBatch(0xc0002a0000, 0xc0000e23dd) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:602 +0x7d go.mongodb.org/mongo-driver/mongo.(*ChangeStream).updatePbrtFromCommand(0xc0002a0000) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:294 +0x64 go.mongodb.org/mongo-driver/mongo.(*ChangeStream).loopNext(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:564 +0x18c go.mongodb.org/mongo-driver/mongo.(*ChangeStream).next(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0, 0x2) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:529 +0x170 go.mongodb.org/mongo-driver/mongo.(*ChangeStream).Next(...) /Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:500 _/tmp/godriver.TestChangestream.func1(0xc00002e900, 0xc0002a0000) /tmp/godriver/cursor_test.go:49 +0x111 created by _/tmp/godriver.TestChangestream /tmp/godriver/cursor_test.go:44 +0x4a5