[GODRIVER-2959] Mongodb watch change stream will panic, when no data be found by query condition Created: 28/Aug/23 Updated: 27/Oct/23 Resolved: 28/Sep/23 |
|
| Status: | Closed |
| Project: | Go Driver |
| Component/s: | None |
| Affects Version/s: | 1.12.1 |
| Fix Version/s: | None |
| Type: | Bug | Priority: | Major - P3 |
| Reporter: | xkey N/A | Assignee: | Preston Vasquez |
| Resolution: | Works as Designed | Votes: | 0 |
| Labels: | changestreams, watch | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Issue Links: |
|
||||||||
| Documentation Changes Summary: | 1. What would you like to communicate to the user about this feature? |
||||||||
| Description |
| Comments |
| Comment by Preston Vasquez [ 28/Sep/23 ] | ||||||||||||||||||||||||||||||
|
myselfxukai@163.comThe Go Driver documents the correct usage of closing the change stream:
Similar to how we should only close a channel in a sender goroutine. I've opened GODRIVER-2998 to investigate this potential improvement further and will close this ticket as works as designed. Thank you for your feedback! | ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 28/Sep/23 ] | ||||||||||||||||||||||||||||||
|
preston.vasquez@mongodb.com Thanks, it works. But I also think the Close() is called first, then the Next and TryNext will be panic. The design should be changed. Other frameworks, such as etcd, consul will not happen it. | ||||||||||||||||||||||||||||||
| Comment by Preston Vasquez [ 27/Sep/23 ] | ||||||||||||||||||||||||||||||
|
myselfxukai@163.com , the code you provide illustrates the same issue given in the example, it closes a change stream in the middle of calling "Next". From the documentation on the ChangeStream.Close method:
You should close the change stream in the same thread that you are running it in:
| ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 27/Sep/23 ] | ||||||||||||||||||||||||||||||
|
preston.vasquez@mongodb.com ctx, cancel := context.WithTimeout(context.Background(), constant.MongodbOpTimeout) ctx2, cancel2 := context.WithTimeout(context.Background(), constant.MongodbOpTimeout) defercancel2() err = client.Ping(ctx2, readpref.PrimaryPreferred()) if err !=nil { panic(err) } collection := client.Database("test").Collection("test_coll") typerunnerChangeEventstruct { OperationType string `bson:"operationType"` ClusterTime primitive.Timestamp `bson:"clusterTime"` FullDocument any `bson:"fullDocument"` }watchFn := func(ctx context.Context, ch chan runnerChangeEvent) (*mongo.ChangeStream, error) { , option := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(5 * time.Second) changeStream, err := collection.Watch(ctx, pipeline, option) gofunc() { ce := runnerChangeEvent{} // https://www.mongodb.com/docs/manual/reference/change-events/ ch <- ce return changeStream, nil watchctx := context.Background() gofunc() { for i :=0; i <10; i++ { doc := bson.D{ {Key: "env", Value: "dev"}, {Key: "num", Value: i + 1}, } _, err = collection.InsertOne(context.Background(), doc) if err !=nil { panic(err) } time.Sleep(2 * time.Second) cnt := 0 } stream.Close(watchctx) signals := make(chan os.Signal, 1) log.Println(<-signals) | ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 27/Sep/23 ] | ||||||||||||||||||||||||||||||
|
preston.vasquez@mongodb.com Thanks. I will test it by the https://gist.github.com/prestonvasquez/30f15e9f84c76a63e6bf818663a18309 codes | ||||||||||||||||||||||||||||||
| Comment by PM Bot [ 25/Sep/23 ] | ||||||||||||||||||||||||||||||
|
Hi myselfxukai@163.com! If this is still an issue for you, please open Jira to review the latest status and provide your feedback. Thanks! | ||||||||||||||||||||||||||||||
| Comment by Preston Vasquez [ 15/Sep/23 ] | ||||||||||||||||||||||||||||||
|
myselfxukai@163.com It appears that the cursor is being closed before the Next loop completes. Is your change stream being used concurrently by multiple Go routines? Note that this is not supported by the Go Driver and is documented here: https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#ChangeStream Here is a gist illustrating this point: https://gist.github.com/prestonvasquez/30f15e9f84c76a63e6bf818663a18309 | ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 29/Aug/23 ] | ||||||||||||||||||||||||||||||
|
func (store *StorageCollection) WatchRunnerResponse(requestUUID string, ch chan RunnerResponse) (*mongo.ChangeStream, error) { , {Key: "fullDocument.requestUUID", Value: requestUUID}, opts := options.ChangeStream().SetMaxAwaitTime(5 * time.Second) changeStream, err := store.runnerCollection.Watch( if err !=nil { returnnil, err }gofunc() { if!next { continue }ce := runnerChangeEvent{} changeStream.Decode(&ce) // https://www.mongodb.com/docs/manual/reference/change-events/ if ce.OperationType !="insert"&& ce.OperationType !="update" { continue } ch <- ce.FullDocument return changeStream, nil The comments editor should support markdown. Now the editor paste code is so bad. | ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 28/Aug/23 ] | ||||||||||||||||||||||||||||||
|
for { if cs.cursor.Next(ctx) { // non-empty batch returned cs.batch, cs.err = cs.cursor.Batch().Documents() return } cs.err = replaceErrors(cs.cursor.Err()) if cs.err ==nil { // Check if cursor is alive if cs.ID() ==0 { return } // If a getMore was done but the batch was empty, the batch cursor will return false with no error. continue// loop getMore until a non-empty batch is returned or an error occurs if!cs.isResumableError() { return }// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch _ = cs.cursor.Close(ctx) if cs.err = cs.executeOperation(ctx, true); cs.err !=nil { return } } | ||||||||||||||||||||||||||||||
| Comment by xkey N/A [ 28/Aug/23 ] | ||||||||||||||||||||||||||||||
|
I call `changeStream.Close()` function, then watch goroutine panic
```go // Print out all change stream events in the order they're received. ```
so I guess that the driver maybe provide a method. Don't use `changeStream.Next(context.TODO())` as death loop for get next data. | ||||||||||||||||||||||||||||||
| Comment by PM Bot [ 28/Aug/23 ] | ||||||||||||||||||||||||||||||
|
Hi myselfxukai@163.com, thank you for reporting this issue! The team will look into it and get back to you soon. |