[GODRIVER-869] How to use collection.Watch without the slice/array error? Created: 11/Mar/19  Updated: 11/Sep/19  Resolved: 02/Apr/19

Status: Closed
Project: Go Driver
Component/s: Core API
Affects Version/s: 1.0.0-rc2
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Melle Assignee: Kristofer Brandow (Inactive)
Resolution: Done Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

When using the following code always getting the same error for the 'pipeline' variable that is passed in:
 
can only transform slices and arrays into aggregation pipelines, but got
 
Example code:
 
client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=my-mongo-set"))

    

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)     err = client.Connect(ctx) 
db := DocumentationDatabase(client) 
documentation_examples.InsertExamples(t, db) 
 
// we are going to monitor the inventory collection     
// for changes     
coll := db.Collection("inventory") 
 
var pipeline interface{} // set up pipeline - 
// in new version has to be a some bsontype.Array but does not work??
cur, err := coll.Watch(ctx, pipeline, options.ChangeStream()) 
// above always returns with an error?

 



 Comments   
Comment by Melle [ 28/Mar/19 ]

Thank you! This now works. The code awaits a change in a collection and then continues.

 

changed codeline from above:

var pipeline mongo.Pipeline // set up pipeline

Comment by Kristofer Brandow (Inactive) [ 20/Mar/19 ]

The Watch method has not changed, but you can no longer pass in nil as a parameter for any method that takes and empty interface. If you don't have any constraints and want all changes for a collection you can pass in an empty mongo.Pipeline.

Comment by Melle [ 20/Mar/19 ]

Ok, so that is different from the driver before where the watch just
triggered for any update on the given collection.

Now for the inventory collection that comes as an example with the driver
what would possibly be a properly defined pipeline to see updates on the
collection?
When the watch works differently can you please point me to an example?

Comment by Kristofer Brandow (Inactive) [ 20/Mar/19 ]

Hi mkoning,
You still have not provided the actual pipeline you are using. In this example you are passing in empty interface as the pipeline which is not valid. You must pass a slice of something to Watch.

--Kris

Comment by Melle [ 20/Mar/19 ]

In the below test, the line with 'coll.Watch' always returns an err.
Have verified the collection "inventory" does exists in the mongo replica set. I can also change (edit) a mongo document using a GUI tool in that collection. However, I guess if the coll.Watch is not setup properly the rest of the code will not properly await for any object change either?

Below is the test code used that puts the following in the console:
API server listening at: 127.0.0.1:46646
can only transform slices and arrays into aggregation pipelines, but got invalid 
PASS
The test passes only because the test-code just stops when the Watch returns an error.
 

The pipeline interface is just there as this has worked with a previous beta release, so probably the use case for a Watch is now different. Is there a good example for how to use the Watch?

package sandboxmongodb

import (
    "context"
    "fmt"
    "log"
    "testing"
    "time"

    "go.mongodb.org/mongo-driver/bson"
    // "github.com/mongodb/mongo-go-driver/bson"
    //documentation_examples "github.com/mongodb/mongo-go-driver/examples/documentation_examples"
    documentation_examples "go.mongodb.org/mongo-driver/examples/documentation_examples"
    //"github.com/mongodb/mongo-go-driver/mongo"

    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)
 
// TestWatchCursor should test for any changes that occur in the database so that we can 
// act upon those changes
func TestWatchCursor(t *testing.T) {

    client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=my-mongo-set"))

    ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
    err = client.Connect(ctx)

    // db := client.Database("documentation_examples")
    db := DocumentationDatabase(client)
    documentation_examples.InsertExamples(t, db)

    // we are going to monitor the inventory collection
    // for changes
    coll := db.Collection("inventory")

    var pipeline interface{} // set up pipeline

    cur, err := coll.Watch(ctx, pipeline) // Watch can only be done against a MongoDB REPLICA set
    if err != nil

{         // Handle err <-- when debugging we always end up here, why?           fmt.Println(err)         t.Log(err)         return     }

    defer cur.Close(ctx)

    for ever := false; !ever; { // forever... hmmm...
        for cur.Next(ctx) {
            t.Log("something in collection changed!")

            var result bson.M
            if err := cur.Decode(&result); err != nil

{                 log.Fatal(err)             }

            // do something with result
            t.Log(result)
        }
        time.Sleep(1 * time.Second) // should be put in async process
        t.Log("A change..")
    }

    if err := cur.Err(); err != nil

{         log.Fatal(err)     }

}

Comment by Ian Whalen (Inactive) [ 11/Mar/19 ]

Hi mkoning - can you please provide the full pipeline you're using that's causing the error?

Generated at Thu Feb 08 08:35:08 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.