-
Type: Task
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: 3.3.0
-
Component/s: None
-
None
I have an application that pulls data from one source and then upserts it to MongoDB. Sometimes the pulling process takes a lot of time and when app tries to push a row to MongoDB the following exception is raised probably to the timeout issue:
com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.connection.SocketStream.read(SocketStream.java:88) at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:492) at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:222) at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:105) at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:438) at com.mongodb.connection.WriteCommandProtocol.receiveMessage(WriteCommandProtocol.java:262) at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:104) at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64) at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37) at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168) at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289) at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143) at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:481) at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:647) at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:400) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:180) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:169) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:232) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:223) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:169) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:75) at com.mongodb.Mongo.execute(Mongo.java:827) at com.mongodb.Mongo$2.execute(Mongo.java:810) at com.mongodb.MongoCollectionImpl.executeSingleWriteRequest(MongoCollectionImpl.java:515) at com.mongodb.MongoCollectionImpl.update(MongoCollectionImpl.java:508) at com.mongodb.MongoCollectionImpl.updateOne(MongoCollectionImpl.java:355) at com.test.db.mongoDB.RetryingMongoCollection.updateOne(RetryingMongoCollection.java:909)
There is a wrapper for MongoCollection class that for each MongoException and now for java.lang.IllegalStateException thrown
@Override public UpdateResult updateOne(Bson filter, Bson update, UpdateOptions updateOptions) { UpdateResult retVal = null; final ExceptionHelper ex = new ExceptionHelper(CAConstants.RETRIES, ErrorType.ERROR, true); boolean isDone = false; while (!isDone) { try { retVal = proxied.updateOne(filter, update, updateOptions); isDone = true; } catch (final MongoException | IllegalStateException e) { ex.logIfZero("Failed to launch MongoDB operation", e); if (ex.lastTry()) { isDone = true; Log.error("Failed to launch MongoDB operation.", e); } else { Log.error("[MDB] Going to update mongo instance due to exception", e); db = MongoDBFactory.getUpdatedMongoDBInstance(); setCollection(); } } } return retVal; }
tries to reload MongoClient:
if ( mongoClient != null ) { try { mongoClient.close(); } catch (Exception e) { Log.error("Mongo client throws exception while closing connection", e); } ... mongoClient = new MongoClient(servers, Arrays.asList(credential), new MongoClientOptions.Builder() .requiredReplicaSetName(REPLICASET_NAME) .connectTimeout(0) .connectionsPerHost(CONNECTION_PER_HOST) .cursorFinalizerEnabled(false) .build()); db = mongoClient.getDatabase(database);
However, when the mongoClient is updated after "com.mongodb.MongoSocketReadException: Prematurely reached end of stream" it fails with:
java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:82) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71) at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:169) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:75) at com.mongodb.Mongo.execute(Mongo.java:827) at com.mongodb.Mongo$2.execute(Mongo.java:810) at com.mongodb.MongoCollectionImpl.executeSingleWriteRequest(MongoCollectionImpl.java:515) at com.mongodb.MongoCollectionImpl.update(MongoCollectionImpl.java:508) at com.mongodb.MongoCollectionImpl.updateOne(MongoCollectionImpl.java:355) at com.test.db.mongoDB.RetryingMongoCollection.updateOne(RetryingMongoCollection.java:909)
and continues failing all the time...
I use mongoDB ver 3.2.9
MongoDB Java client 3.3.0
Per request, the RetryingMongoCollection.java starts with:
public class RetryingMongoCollection<T> implements MongoCollection<T> { private MongoCollection<T> proxied; private MongoDatabase db; private final String collectionName; public RetryingMongoCollection(MongoDatabase db, final String collectionName) { this.collectionName = collectionName; this.db = db; setCollection(); } @SuppressWarnings("unchecked") private void setCollection() { this.proxied = (MongoCollection<T>) db.getCollection(collectionName); } ...
Then we override each and every com.mongodb.client.MongoCollection<T> method the same way as we do with updateOne method.