[GODRIVER-1648] calling cursor.Next() after changestream is invalidated enters spin loop Created: 13/Jun/20  Updated: 28/Oct/23  Resolved: 24/Jun/20

Status: Closed
Project: Go Driver
Component/s: CRUD
Affects Version/s: None
Fix Version/s: 1.3.5

Type: Bug Priority: Major - P3
Reporter: Michael O'Brien Assignee: Kriti Ravindran (Inactive)
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

(Using v1.3.4)
When consuming a change stream in a for loop with successive calls to cursor.Next(), after the "invalidate" event is received on the stream the subsequent call to Next() never returns and CPU usage hits 100%.

In practice the caller should probably avoid this by detecting the invalidate event and breaking the loop, but it seems like the driver should be able to avoid spinning the CPU when the stream goes idle after being invalidated.

Here's a minimal test case to reproduce:

 
package main
 
import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"
 
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)
 
func TestChangestream(t *testing.T) {
	log.SetFlags(log.LstdFlags | log.Lshortfile)
 
	client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:26000"))
	if err != nil {
		log.Fatal(err)
	}
 
	ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
	err = client.Connect(ctx)
 
	db1, coll1 := primitive.NewObjectID().Hex(), primitive.NewObjectID().Hex()
 
	_, err = client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"x", "1"}})
	if err != nil {
		log.Fatal(err)
	}
 
	cursor, err := client.Database(db1).Watch(
		context.Background(),
		[]interface{}{},
	)
 
	if err != nil {
		log.Fatal(err)
	}
 
	doneChan1 := make(chan struct{})
	go func() {
		defer close(doneChan1)
		log.Println("starting changestream loop")
		for {
			log.Println("calling cursor next...")
			if cursor.Next(context.Background()) {
				event := bson.D{}
				cursor.Decode(&event)
				fmt.Println("got event", event)
			}
			log.Println("...cursor next returned.")
			if err := cursor.Err(); err != nil {
				log.Fatal(err)
			}
		}
	}()
 
	doneChan2 := make(chan struct{})
	go func() {
		defer close(doneChan2)
		for i := 0; i < 5; i++ {
			_, err := client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"test", i}})
			if err != nil {
				log.Fatal(err)
			}
			time.Sleep(time.Second)
		}
 
		err := client.Database(db1).Drop(context.Background())
		if err != nil {
			log.Fatal(err)
		}
	}()
 
	<-doneChan1
	<-doneChan2
}

A goroutine stack dump while the test is hung shows:

 
goroutine 7 [select]:
go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).update(0xc000139ae0)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:391 +0x3ec
created by go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).Connect
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:169 +0x14d
 
goroutine 12 [runnable]:
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).emptyBatch(0xc0002a0000, 0xc0000e23dd)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:602 +0x7d
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).updatePbrtFromCommand(0xc0002a0000)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:294 +0x64
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).loopNext(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:564 +0x18c
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).next(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0, 0x2)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:529 +0x170
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).Next(...)
	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:500
_/tmp/godriver.TestChangestream.func1(0xc00002e900, 0xc0002a0000)
	/tmp/godriver/cursor_test.go:49 +0x111
created by _/tmp/godriver.TestChangestream
	/tmp/godriver/cursor_test.go:44 +0x4a5



 Comments   
Comment by Githook User [ 23/Jun/20 ]

Author:

{'name': 'Kriti Ravindran', 'email': 'kriti.ravindran@mongodb.com', 'username': 'KritiRav'}

Message: GODRIVER-1648 Add check and test for closed cursor in Next (#428)
Branch: release/1.3
https://github.com/mongodb/mongo-go-driver/commit/80643959ed77a95c2dc1a187b23f603a9a7a41d1

Comment by Githook User [ 23/Jun/20 ]

Author:

{'name': 'Kriti Ravindran', 'email': 'kriti.ravindran@mongodb.com', 'username': 'KritiRav'}

Message: GODRIVER-1648 Add check and test for closed cursor in Next (#428)
Branch: master
https://github.com/mongodb/mongo-go-driver/commit/954304dd54562ea5f9215b985b30ff713b78efc3

Generated at Thu Feb 08 08:36:50 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.