[GODRIVER-2959] Mongodb watch change stream will panic, when no data be found by query condition Created: 28/Aug/23  Updated: 27/Oct/23  Resolved: 28/Sep/23

Status: Closed
Project: Go Driver
Component/s: None
Affects Version/s: 1.12.1
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: xkey N/A Assignee: Preston Vasquez
Resolution: Works as Designed Votes: 0
Labels: changestreams, watch
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to GODRIVER-2998 Make ChangeStream.Close thread safe Backlog
Documentation Changes Summary:

1. What would you like to communicate to the user about this feature?
2. Would you like the user to see examples of the syntax and/or executable code and its output?
3. Which versions of the driver/connector does this apply to?


 Description   

Summary

mongodb go driver version from 1.11.1 to 1.12.1, watch changestream will be panic.

go.mongodb.org/mongo-driver/mongo.(*ChangeStream).loopNext(0xc000002140, {0x124ca00, 0x18e8040}, 0x0)
	/go/pkg/mod/go.mongodb.org/mongo-driver@v1.12.1/mongo/change_stream.go:658 +0x6b
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).next(0xc000002140, {0x124ca00?, 0x18e8040?}, 0xd0?)
	/go/pkg/mod/go.mongodb.org/mongo-driver@v1.12.1/mongo/change_stream.go:627 +0x5d
go.mongodb.org/mongo-driver/mongo.(*ChangeStream).Next(...)
	/go/pkg/mod/go.mongodb.org/mongo-driver@v1.12.1/mongo/change_stream.go:598
draw-platform/storage/mongodb.(*StorageCollection).WatchRunnerResponse.func1()

How to Reproduce

Create a empty collection and set the Watch mongo.Pipeline{matchStage} like this:

 
matchStage := bson.D{{
Key: "$match", Value: bson.D{

{Key: "fullDocument.env", Value: store.env}

,

{Key: "fullDocument.requestUUID", Value: requestUUID}

,
}},
}
 
and the query condition no datas will be found, because of I want to watch data from first data by the query condition.

 



 Comments   
Comment by Preston Vasquez [ 28/Sep/23 ]

myselfxukai@163.comThe Go Driver documents the correct usage of closing the change stream:

Next and TryNext must not be called after Close has been called.

Similar to how we should only close a channel in a sender goroutine. I've opened GODRIVER-2998 to investigate this potential improvement further and will close this ticket as works as designed.

Thank you for your feedback!

Comment by xkey N/A [ 28/Sep/23 ]

preston.vasquez@mongodb.com Thanks, it works. But I also think the Close() is called first, then the Next and TryNext will be panic. The design should be changed. 

Other frameworks, such as etcd, consul will not  happen it.

Comment by Preston Vasquez [ 27/Sep/23 ]

myselfxukai@163.com , the code you provide illustrates the same issue given in the example, it closes a change stream in the middle of calling "Next". From the documentation on the ChangeStream.Close method:

Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After the first call, any subsequent calls will not change the state.

You should close the change stream in the same thread that you are running it in:

		go func() {
			for {
				select {
				case <-close: // chan struct{}
					changeStream.Close(ctx)
					return
				default:
					if changeStream.Next(context.Background()) {
						if changeStream.Err() != nil || changeStream.ID() == 0 {
							log.Println("change stream closed")
							close(closed)
							return
						}
 
						ce := runnerChangeEvent{}
						changeStream.Decode(&ce)
 
						if ce.OperationType != "insert" && ce.OperationType != "update" {
							continue
						}
 
						ch <- ce
					} else {
						log.Println("change stream closed")
						close(closed)
						return
					}
				}
			}
		}()

Comment by xkey N/A [ 27/Sep/23 ]

preston.vasquez@mongodb.com 
func Test_MongoWatch(t *testing.T) {
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
clientOptions.ReadPreference = readpref.PrimaryPreferred()
clientOptions.SetMaxPoolSize(12)
clientOptions.SetMaxConnIdleTime(10 * time.Second)
clientOptions.SetConnectTimeout(10 * time.Second)

ctx, cancel := context.WithTimeout(context.Background(), constant.MongodbOpTimeout)
defercancel()
client, err := mongo.Connect(ctx, clientOptions)
if err !=nil

{ panic(err) }

ctx2, cancel2 := context.WithTimeout(context.Background(), constant.MongodbOpTimeout)
defercancel2()
err = client.Ping(ctx2, readpref.PrimaryPreferred())
if err !=nil { panic(err) }

collection := client.Database("test").Collection("test_coll")

typerunnerChangeEventstruct

{ OperationType string `bson:"operationType"` ClusterTime primitive.Timestamp `bson:"clusterTime"` FullDocument any `bson:"fullDocument"` }

watchFn := func(ctx context.Context, ch chan runnerChangeEvent) (*mongo.ChangeStream, error) {
pipeline := mongo.Pipeline{bson.D{{
Key: "$match",
Value: bson.D{

{Key: "fullDocument.env", Value: "dev"}

,
//{Key: "fullDocument.requestUUID", Value: "y"},
}},
}}

option := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(5 * time.Second)

changeStream, err := collection.Watch(ctx, pipeline, option)
if err !=nil

{ returnnil, err }

gofunc() {
for changeStream.Next(context.Background()) {
if changeStream.Err() !=nil|| changeStream.ID() ==0

{ log.Println("change stream closed") break }

ce := runnerChangeEvent{}
changeStream.Decode(&ce)

// https://www.mongodb.com/docs/manual/reference/change-events/
// sdk上报的消息只可能存在insert或update
if ce.OperationType !="insert"&& ce.OperationType !="update"

{ continue }

ch <- ce
}
}()

return changeStream, nil
}

watchctx := context.Background()
ch := make(chan runnerChangeEvent, 10)
stream, err := watchFn(watchctx, ch)
if err !=nil

{ panic(err) }

gofunc() {
for i :=0; i <10; i++ {
doc := bson.D{ {Key: "env", Value: "dev"}, {Key: "num", Value: i + 1},
}
_, err = collection.InsertOne(context.Background(), doc)
if err !=nil { panic(err) }

time.Sleep(2 * time.Second)
}
}()

cnt := 0
loop:
for {
select {
case resp :=<-ch:
log.Println(resp.FullDocument)
cnt++
if cnt >=6

{ break loop }

}
}

stream.Close(watchctx)

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)

log.Println(<-signals)
}
 
This is my test code for break watch loop at any time. But it will be panic.  I mean the `changeStream.Next(context.Background())` can't be a loop, give user a channel singal for exit the for loop. 

Comment by xkey N/A [ 27/Sep/23 ]

preston.vasquez@mongodb.com Thanks. I will test it by  the  https://gist.github.com/prestonvasquez/30f15e9f84c76a63e6bf818663a18309 codes

Comment by PM Bot [ 25/Sep/23 ]

Hi myselfxukai@163.com! GODRIVER-2959 is awaiting your response.

If this is still an issue for you, please open Jira to review the latest status and provide your feedback. Thanks!

Comment by Preston Vasquez [ 15/Sep/23 ]

myselfxukai@163.com It appears that the cursor is being closed before the Next loop completes. Is your change stream being used concurrently by multiple Go routines? Note that this is not supported by the Go Driver and is documented here: https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#ChangeStream

Here is a gist illustrating this point: https://gist.github.com/prestonvasquez/30f15e9f84c76a63e6bf818663a18309

Comment by xkey N/A [ 29/Aug/23 ]

 
// https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-change-events
type runnerChangeEvent struct

{ OperationType string `bson:"operationType"` ClusterTime primitive.Timestamp `bson:"clusterTime"` FullDocument RunnerResponse `bson:"fullDocument"` }

func (store *StorageCollection) WatchRunnerResponse(requestUUID string, ch chan RunnerResponse) (*mongo.ChangeStream, error) {
matchStage := bson.D{{
Key: "$match", Value: bson.D{

{Key: "fullDocument.env", Value: store.env}

,

{Key: "fullDocument.requestUUID", Value: requestUUID}

,
}},
}

opts := options.ChangeStream().SetMaxAwaitTime(5 * time.Second)

changeStream, err := store.runnerCollection.Watch(
context.Background(),
mongo.Pipeline{matchStage},
opts,
)

if err !=nil

{ returnnil, err }

gofunc() {
for {
next := changeStream.TryNext(context.Background())
if changeStream.Err() !=nil|| changeStream.ID() ==0

{ break }

if!next

{ continue }

ce := runnerChangeEvent{}
changeStream.Decode(&ce)

// https://www.mongodb.com/docs/manual/reference/change-events/
if ce.OperationType !="insert"&& ce.OperationType !="update" { continue }

ch <- ce.FullDocument
}
}()

return changeStream, nil
}
 

The comments editor should support markdown. Now the editor paste code is so bad.

Comment by xkey N/A [ 28/Aug/23 ]

for {
if cs.cursor ==nil

{ return }

if cs.cursor.Next(ctx) { // non-empty batch returned cs.batch, cs.err = cs.cursor.Batch().Documents() return }

cs.err = replaceErrors(cs.cursor.Err()) 
if cs.err ==nil {
// Check if cursor is alive
if cs.ID() ==0 { return }

// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
// Update the tracked resume token to catch the post batch resume token from the server response.
cs.updatePbrtFromCommand()
if nonBlocking

{ // stop after a successful getMore, even though the batch was empty return }

continue// loop getMore until a non-empty batch is returned or an error occurs
}

if!cs.isResumableError()

{ return }

// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
_ = cs.cursor.Close(ctx)
if cs.err = cs.executeOperation(ctx, true); cs.err !=nil { return }

}
 
so I guess `replaceErrors` should check cs.cursor != nil again

Comment by xkey N/A [ 28/Aug/23 ]

I call `changeStream.Close()` function, then watch goroutine panic

 

```go

// Print out all change stream events in the order they're received.
  // See the mongo.ChangeStream documentation for more examples of using
    // change streams.
    for changeStream.Next(context.TODO())

{         fmt.Println(changeStream.Current)     }

```

 

so I guess that the driver maybe provide a method.  Don't use `changeStream.Next(context.TODO())` as death loop for get next data.

Comment by PM Bot [ 28/Aug/23 ]

Hi myselfxukai@163.com, thank you for reporting this issue! The team will look into it and get back to you soon.

Generated at Thu Feb 08 08:39:44 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.