|
(Using v1.3.4)
When consuming a change stream in a for loop with successive calls to cursor.Next(), after the "invalidate" event is received on the stream the subsequent call to Next() never returns and CPU usage hits 100%.
In practice the caller should probably avoid this by detecting the invalidate event and breaking the loop, but it seems like the driver should be able to avoid spinning the CPU when the stream goes idle after being invalidated.
Here's a minimal test case to reproduce:
|
package main
|
|
import (
|
"context"
|
"fmt"
|
"log"
|
"testing"
|
"time"
|
|
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
)
|
|
func TestChangestream(t *testing.T) {
|
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
|
client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:26000"))
|
if err != nil {
|
log.Fatal(err)
|
}
|
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
err = client.Connect(ctx)
|
|
db1, coll1 := primitive.NewObjectID().Hex(), primitive.NewObjectID().Hex()
|
|
_, err = client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"x", "1"}})
|
if err != nil {
|
log.Fatal(err)
|
}
|
|
cursor, err := client.Database(db1).Watch(
|
context.Background(),
|
[]interface{}{},
|
)
|
|
if err != nil {
|
log.Fatal(err)
|
}
|
|
doneChan1 := make(chan struct{})
|
go func() {
|
defer close(doneChan1)
|
log.Println("starting changestream loop")
|
for {
|
log.Println("calling cursor next...")
|
if cursor.Next(context.Background()) {
|
event := bson.D{}
|
cursor.Decode(&event)
|
fmt.Println("got event", event)
|
}
|
log.Println("...cursor next returned.")
|
if err := cursor.Err(); err != nil {
|
log.Fatal(err)
|
}
|
}
|
}()
|
|
doneChan2 := make(chan struct{})
|
go func() {
|
defer close(doneChan2)
|
for i := 0; i < 5; i++ {
|
_, err := client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"test", i}})
|
if err != nil {
|
log.Fatal(err)
|
}
|
time.Sleep(time.Second)
|
}
|
|
err := client.Database(db1).Drop(context.Background())
|
if err != nil {
|
log.Fatal(err)
|
}
|
}()
|
|
<-doneChan1
|
<-doneChan2
|
}
|
|
A goroutine stack dump while the test is hung shows:
|
goroutine 7 [select]:
|
go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).update(0xc000139ae0)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:391 +0x3ec
|
created by go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).Connect
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:169 +0x14d
|
|
goroutine 12 [runnable]:
|
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).emptyBatch(0xc0002a0000, 0xc0000e23dd)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:602 +0x7d
|
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).updatePbrtFromCommand(0xc0002a0000)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:294 +0x64
|
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).loopNext(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:564 +0x18c
|
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).next(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0, 0x2)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:529 +0x170
|
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).Next(...)
|
/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:500
|
_/tmp/godriver.TestChangestream.func1(0xc00002e900, 0xc0002a0000)
|
/tmp/godriver/cursor_test.go:49 +0x111
|
created by _/tmp/godriver.TestChangestream
|
/tmp/godriver/cursor_test.go:44 +0x4a5
|
|
|