Details
-
Task
-
Resolution: Works as Designed
-
Major - P3
-
None
-
3.3.0
-
None
-
None
Description
I have an application that pulls data from one source and then upserts it to MongoDB. Sometimes the pulling process takes a lot of time and when app tries to push a row to MongoDB the following exception is raised probably to the timeout issue:
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
|
at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
|
at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:492)
|
at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:222)
|
at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:105)
|
at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:438)
|
at com.mongodb.connection.WriteCommandProtocol.receiveMessage(WriteCommandProtocol.java:262)
|
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:104)
|
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64)
|
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37)
|
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
|
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
|
at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143)
|
at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:481)
|
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:647)
|
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:400)
|
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:180)
|
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:169)
|
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:232)
|
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:223)
|
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:169)
|
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:75)
|
at com.mongodb.Mongo.execute(Mongo.java:827)
|
at com.mongodb.Mongo$2.execute(Mongo.java:810)
|
at com.mongodb.MongoCollectionImpl.executeSingleWriteRequest(MongoCollectionImpl.java:515)
|
at com.mongodb.MongoCollectionImpl.update(MongoCollectionImpl.java:508)
|
at com.mongodb.MongoCollectionImpl.updateOne(MongoCollectionImpl.java:355)
|
at com.test.db.mongoDB.RetryingMongoCollection.updateOne(RetryingMongoCollection.java:909)
|
There is a wrapper for MongoCollection class that for each MongoException and now for java.lang.IllegalStateException thrown
@Override |
public UpdateResult updateOne(Bson filter, Bson update, |
UpdateOptions updateOptions) {
|
UpdateResult retVal = null; |
final ExceptionHelper ex = new ExceptionHelper(CAConstants.RETRIES, ErrorType.ERROR, true); |
boolean isDone = false; |
while (!isDone) { |
try { |
retVal = proxied.updateOne(filter, update, updateOptions);
|
isDone = true; |
} catch (final MongoException | IllegalStateException e) { |
ex.logIfZero("Failed to launch MongoDB operation", e); |
if (ex.lastTry()) { |
isDone = true; |
Log.error("Failed to launch MongoDB operation.", e); |
} else { |
Log.error("[MDB] Going to update mongo instance due to exception", e); |
db = MongoDBFactory.getUpdatedMongoDBInstance();
|
setCollection();
|
}
|
}
|
}
|
return retVal; |
}
|
tries to reload MongoClient:
if ( mongoClient != null ) { |
try { |
mongoClient.close();
|
} catch (Exception e) { |
Log.error("Mongo client throws exception while closing connection", e); |
}
|
...
|
mongoClient = new MongoClient(servers, Arrays.asList(credential), |
new MongoClientOptions.Builder() |
.requiredReplicaSetName(REPLICASET_NAME)
|
.connectTimeout(0) |
.connectionsPerHost(CONNECTION_PER_HOST)
|
.cursorFinalizerEnabled(false) |
.build());
|
|
db = mongoClient.getDatabase(database);
|
However, when the mongoClient is updated after "com.mongodb.MongoSocketReadException: Prematurely reached end of stream" it fails with:
java.lang.IllegalStateException: state should be: open
|
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
|
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:82)
|
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
|
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
|
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68)
|
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
|
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:169)
|
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:75)
|
at com.mongodb.Mongo.execute(Mongo.java:827)
|
at com.mongodb.Mongo$2.execute(Mongo.java:810)
|
at com.mongodb.MongoCollectionImpl.executeSingleWriteRequest(MongoCollectionImpl.java:515)
|
at com.mongodb.MongoCollectionImpl.update(MongoCollectionImpl.java:508)
|
at com.mongodb.MongoCollectionImpl.updateOne(MongoCollectionImpl.java:355)
|
at com.test.db.mongoDB.RetryingMongoCollection.updateOne(RetryingMongoCollection.java:909)
|
and continues failing all the time...
I use mongoDB ver 3.2.9
MongoDB Java client 3.3.0
Per request, the RetryingMongoCollection.java starts with:
public class RetryingMongoCollection<T> implements MongoCollection<T> { |
private MongoCollection<T> proxied; |
private MongoDatabase db; |
private final String collectionName; |
|
public RetryingMongoCollection(MongoDatabase db, final String collectionName) { |
this.collectionName = collectionName; |
this.db = db; |
setCollection();
|
}
|
|
@SuppressWarnings("unchecked") |
private void setCollection() { |
this.proxied = (MongoCollection<T>) db.getCollection(collectionName); |
}
|
...
|
Then we override each and every com.mongodb.client.MongoCollection<T> method the same way as we do with updateOne method.