The docs for watch() incorrectly call ChangeStream.close():
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
if change_stream is not None:
change_stream.close()
change_stream.close() is an async method so calling it without await leaks a coroutine. Really it should work via task cancellation, not through KeyboardInterrupt.