[CSHARP-2372] Unwatch/stop a change stream Created: 30/Aug/18  Updated: 27/Oct/23  Resolved: 04/Jan/19

Status: Closed
Project: C# Driver
Component/s: Read Operations
Affects Version/s: 2.5
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: daniel moqvist Assignee: Robert Stam
Resolution: Works as Designed Votes: 0
Labels: question
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

Windows



 Description   

I cannot find a proper way to cancel a change stream watcher.
 
In the following example I can stop watching when I dispose the enumerator, but the exception is not correct in my opinion.
Instead of disposing the enumerator, I would like to run something like cursor.Cancel() to stop watching for changes and close the change stream. And It could throw an OperationCanceledException.
 

var cts = new CancellationTokenSource(5000);
var client = new MongoClient("mongodb://localhost");
var db = client.GetDatabase("test");
var collection = db.GetCollection<BsonDocument>("test");
 
Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Start watching...");
 
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();
 
 
cts.Token.Register(() =>
{
	Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Cancellation token was cancelled.");
	enumerator.Dispose();
});
 
try
{
	while (enumerator.MoveNext())
	{
		Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Received a change from database.");
	}
}
catch (Exception e)
{
	Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Exception: {e.Message}");
}
 
Console.WriteLine("Done");

Console output:

Console output:
10:50:55 - Start watching...
10:50:59 - Cancellation token was cancelled.
10:50:59 - Exception: Cannot access a disposed object.
Object name: 'MongoDB.Driver.Core.Bindings.CoreSessionHandle'.

In the following example I get the correct exception, but the MoveNext method returns true every second and I have to check if the cursor has any document.

var cts = new CancellationTokenSource(5000);
var client = new MongoClient("mongodb://localhost");
var db = client.GetDatabase("test");
var collection = db.GetCollection<BsonDocument>("test");
 
Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Start watching...");
 
var cursor = collection.Watch();
 
try
{
	while (cursor.MoveNext(cts.Token))
	{
		if(cursor.Current.Count()>0)
			Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Received {cursor.Current.Count()} changes from database.");
		else
			Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Nothing changed in database.");
	}
}
catch (Exception e)
{
	Console.WriteLine($"{DateTime.Now.ToLongTimeString()} - Exception: {e.Message}");
}
 
Console.WriteLine("Done");

Console output:

11:15:26 - Start watching...
11:15:26 - Nothing changed in database.
11:15:27 - Nothing changed in database.
11:15:28 - Nothing changed in database.
11:15:29 - Nothing changed in database.
11:15:30 - Exception: The operation was canceled.
Done

Here is an example how to cancel a file download using the .net WebClient. The same pattern could fit the cancellation of a change stream watcher.

var cts = new CancellationTokenSource(5000);
var wc = new WebClient();
IWebProxy defaultWebProxy = WebRequest.DefaultWebProxy;
defaultWebProxy.Credentials = CredentialCache.DefaultCredentials;
wc.Proxy = defaultWebProxy;
cts.Token.Register(() =>
{
	Console.WriteLine("Cancel!");
	wc.CancelAsync();
 
});
Console.WriteLine("Downloading...");
try
{
	wc.DownloadFile(new Uri("http://downloads.mongodb.org/win32/mongodb-win32-x86_64-2008plus-ssl-4.0.2.zip"), @"c:\temp\mongodb.zip");
}
catch (Exception e)
{
	Console.WriteLine("Downloading was cancelled");
}



 Comments   
Comment by Robert Stam [ 04/Jan/19 ]

You can abort an ongoing Watch by passing a cancellationToken to the ToEnumerable method. The following example aborts the Watch after 10 seconds:

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)))
{
    foreach (var change in collection.Watch().ToEnumerable(cancellationToken: cts.Token))
    {
        Console.WriteLine(change.BackingDocument.ToJson());
    }
}

You don't have to use a timer (that's just for this example). You can call cts.Cancel() from any thread to request cancellation of the Watch.

Note: the original call to Watch also has an optional cancellationToken parameter. The cancellationToken passed to the Watch method only applies to the first call to the server to begin the Watch operation. You can of course pass the same cancellationToken to both the Watch and ToEnumerable methods.

When cancellation is requested via the cancellationToken an OperationCanceledException will be thrown.

 

 

Generated at Wed Feb 07 21:42:22 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.