From c5765b8e2bcfe4c26eaa1bc3ee34cec7258d940b Mon Sep 17 00:00:00 2001 From: mpilar Date: Fri, 15 Apr 2011 14:22:10 -0400 Subject: [PATCH 2/2] CS-200 Background replicaSet connections to slaves when slaveOk=true and return as soon as the master is found. --- Driver/Internal/ReplicaSetConnector.cs | 92 +++++++++++++++++++++++++------- 1 files changed, 73 insertions(+), 19 deletions(-) diff --git a/Driver/Internal/ReplicaSetConnector.cs b/Driver/Internal/ReplicaSetConnector.cs index 93a68cd..4e5c6e6 100644 --- a/Driver/Internal/ReplicaSetConnector.cs +++ b/Driver/Internal/ReplicaSetConnector.cs @@ -83,6 +83,28 @@ namespace MongoDB.Driver.Internal { var exceptions = new List(); var timeoutAt = DateTime.UtcNow + timeout; + + ProcessQueryResponses(timeoutAt, responsesQueue, exceptions, false); + + if (primaryEndPoint == null) { + var innerException = exceptions.FirstOrDefault(); + var exception = new MongoConnectionException("Unable to connect to server", innerException); + if (exceptions.Count > 1) { + exception.Data.Add("InnerExceptions", exceptions); + } + throw exception; + } + } + + #endregion + + #region private methods + private void ProcessQueryResponses( + DateTime timeoutAt, + BlockingQueue responsesQueue, + List exceptions, + bool backgrounded + ) { while (responses.Count < queries.Count) { var timeRemaining = timeoutAt - DateTime.UtcNow; var response = responsesQueue.Dequeue(timeRemaining); @@ -104,6 +126,21 @@ namespace MongoDB.Driver.Internal { maxMessageLength = response.MaxMessageLength; if (!server.Settings.SlaveOk) { break; // if we're not going to use the secondaries no need to wait for their replies + } else { + if (!backgrounded) { + //do the work for the secondaries, but return the primary ASAP. + ExploreSeedList(responsesQueue, response); + var args = new BackgroundResponseParameters() + { + Queries = queries, + Responses = responses, + TimeoutAt = timeoutAt, + ResponseQueue = responsesQueue, + Exceptions = exceptions + }; + ThreadPool.QueueUserWorkItem(QueryResponseWorkItem, args); + } + break; } } else { if (server.Settings.SlaveOk) { @@ -115,31 +152,28 @@ namespace MongoDB.Driver.Internal { } // look for additional members of the replica set that might not have been in the seed list and query them also - foreach (var address in GetHostAddresses(response)) { - if (!queries.Contains(address)) { - var args = new QueryNodeParameters { - Address = address, - EndPoint = address.ToIPEndPoint(server.Settings.AddressFamily), - ResponseQueue = responsesQueue - }; - ThreadPool.QueueUserWorkItem(QueryNodeWorkItem, args); - queries.Add(address); - } - } + ExploreSeedList(responsesQueue, response); } + } - if (primaryEndPoint == null) { - var innerException = exceptions.FirstOrDefault(); - var exception = new MongoConnectionException("Unable to connect to server", innerException); - if (exceptions.Count > 1) { - exception.Data.Add("InnerExceptions", exceptions); + private void ExploreSeedList( + BlockingQueue responsesQueue, + QueryNodeResponse response + ) { + foreach (var address in GetHostAddresses(response)) { + if (!queries.Contains(address)) { + var args = new QueryNodeParameters + { + Address = address, + EndPoint = address.ToIPEndPoint(server.Settings.AddressFamily), + ResponseQueue = responsesQueue + }; + ThreadPool.QueueUserWorkItem(QueryNodeWorkItem, args); + queries.Add(address); } - throw exception; } } - #endregion - #region private methods private List GetHostAddresses( QueryNodeResponse response ) { @@ -172,6 +206,14 @@ namespace MongoDB.Driver.Internal { return responseQueue; } + private void QueryResponseWorkItem( + object parameters + ) + { + var args = (BackgroundResponseParameters) parameters; + ProcessQueryResponses(args.TimeoutAt, args.ResponseQueue, args.Exceptions, true); + } + // note: this method will run on a thread from the ThreadPool private void QueryNodeWorkItem( object parameters @@ -216,6 +258,18 @@ namespace MongoDB.Driver.Internal { #region private nested classes // note: OK to use automatic properties on private helper class + private class BackgroundResponseParameters + { + public HashSet Queries { get; set; } + public Dictionary Responses { get; set; } + public List EndPoints { get; set; } + public List Connections { get; set; } + public BlockingQueue ResponseQueue { get; set; } + public List Exceptions { get; set; } + public DateTime TimeoutAt { get; set; } + } + + // note: OK to use automatic properties on private helper class private class QueryNodeParameters { public MongoServerAddress Address { get; set; } public IPEndPoint EndPoint { get; set; } -- 1.7.3.1.msysgit.0