[JAVA-2306] NettyStream does not handle clean socket closures from the server Created: 12/Sep/16  Updated: 19/Oct/16  Resolved: 23/Sep/16

Status: Closed
Project: Java Driver
Component/s: Async, Connection Management
Affects Version/s: 3.0.0
Fix Version/s: 3.4.0-rc1, 3.4.0

Type: Bug Priority: Major - P3
Reporter: daimin Assignee: Ross Lawley
Resolution: Done Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

Linux jdk1.6



 Description   

When restarting the server the NettyStream's channel does not capture socket closed events and any pending readers are left hanging.

Was:

After restart server, MongoDB Async Java Client can not recover all connections

Hey guys.
I test mongodb like this:

  1. start one mongodb instance(3.2.9) on a Linux server as a server-side
  2. start 40 mongodb java async client(3.3.0) processes on another two servers, Linux too. For each process, I use 100 connections.
  3. as we can see on mongostat outputs, 4040 connections total. each process with a monitor connection. so far so good
  4. restart mongo-server. however mongostat shows connections is much less than 4040.

Some information that might be helpful:

  1. I was using 3.0.2 at the very beginning, at that time, this problem is pretty much worse than current 3.3.0. In fact, I saw some issues that fixed some bugs about connection pool. So I did the upgrade.
  2. I opened the trace log with log4j. What I found seems like some connections go into some logical branch, maybe some exception, but forget to close the connection and release it into the com.mongodb.internal.connection.ConcurrentPool.available and notify "Semaphore permits". One potential place I found is com.mongodb.connection.InternalStreamConnection.readAsync(int, SingleResultCallback<ByteBuf>)

try {
            stream.readAsync(numBytes, new AsyncCompletionHandler<ByteBuf>() {
                @Override
                public void completed(final ByteBuf buffer) {
                    callback.onResult(buffer, null);
                }
 
                @Override
                public void failed(final Throwable t) {
                    close();
                    callback.onResult(null, translateReadException(t));
                }
            });
        } catch (Exception e) {
            callback.onResult(null, translateReadException(e));
        }

without a close() statement.

Thanks for your guys paying attention.



 Comments   
Comment by daimin [ 23/Sep/16 ]

Hi, Jeff Yemin and Ross Lawley:

I made a copy of these files:

  • src/main/java/com/mongodb/connection/netty/NettyStream0.java
  • src/main/java/com/mongodb/connection/netty/NettyStreamFactory0.java
  • src/main/java/com/mongodb/async/client/MongoClients0.java

And then apply the patch https://github.com/mongodb/mongo-java-driver/commit/47ab401ada0664fabdfae91b1392e2cb02cc6a83 to NettyStream0.java.
And the load test result shows the problem is fixed perfectly. Number of connections looks fine after the restarting.

Thanks a lot!

Comment by Ross Lawley [ 23/Sep/16 ]

Hi daimin,

Many thanks for your sample code, it helped identify the cause of the issue. Initially, I thought it a race condition but it turned out that was not the case. There was no handling for closed connections in the NettyStream, so pending readers were left waiting for a closed socket to return data and their callbacks never called to signify the socket was closed. This is why I couldn't reproduce the issue with the NIO2 stream. Now fixed, you will see that once the internal maintenance task runs the min pool size is ensured.

Ross

Comment by Githook User [ 23/Sep/16 ]

Author:

{u'username': u'rozza', u'name': u'Ross Lawley', u'email': u'ross.lawley@gmail.com'}

Message: NettyStream fix

Ensure any cleanly closed connections from the server notify any
pending readers that the connection has closed.

JAVA-2306
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/47ab401ada0664fabdfae91b1392e2cb02cc6a83

Comment by daimin [ 17/Sep/16 ]

Hi. Here is the code:

Test.java

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
import org.bson.Document;
 
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.SocketSettings;
 
/**
 * MongoDB Demo
 * 
 * params: ip connections batch
 * 
 * @author daimin
 * @version $Id: MongoBenchMark.java, v 0.1 Aug 30, 2016 2:12:54 PM daimin Exp $
 */
public class MongoBenchMark {
 
    static String               server      = "10.189.224.12";
 
    static int                  port        = 9018;
 
    static int                  connections = 100;
 
    static AtomicInteger        good        = new AtomicInteger(0);
    static AtomicInteger        bad         = new AtomicInteger(0);
 
    static int                  batch       = 500;
 
    static int                  queue       = 5000;
 
    static Map<String, Integer> map         = new HashMap<String, Integer>();
 
    static AtomicInteger        scan        = new AtomicInteger(0);
 
    public static void main(String[] args) {
        if (args != null && args.length >= 1) {
            server = args[0];
            System.err.println("mongo: " + server);
        }
        if (args != null && args.length >= 2) {
            connections = Integer.parseInt(args[1]);
            System.err.println("connections: " + connections);
        }
        if (args != null && args.length >= 3) {
            batch = Integer.parseInt(args[2]);
            System.err.println("batch: " + batch);
        }
        MongoClient client = buildMongoClient(server, port);
        MongoDatabase db = client.getDatabase("bench");
        startRoutine(db);
    }
 
    static void startRoutine(MongoDatabase db) {
        List<MongoCollection<Document>> cs = new ArrayList<MongoCollection<Document>>(10);
        for (int i = 0; i < 10; i++) {
            cs.add(db.getCollection("raw" + i));
        }
        for (int i = 0;; i++) {
            int all = i * batch * 10;
            int _good = good.get();
            int _bad = bad.get();
            int loss = all - _good - _bad;
            System.err.println("start round " + i + ", expect: " + all + ", count: " + _good + "/"
                               + _bad + " queue: " + loss);
            try {
                String mapstr = null;
                synchronized (map) {
                    mapstr = map.toString();
                    map.clear();
                }
                long pre = System.currentTimeMillis();
                for (int j = 0; j < 10; j++) {
                    doRoundSave(i, cs.get(j));
                    doRoundScan(i, cs.get(j));
                }
                System.err.println("cost: " + (System.currentTimeMillis() - pre) + ", scan size: "
                                   + scan.getAndSet(0));
                System.err.println("errors: " + mapstr);
            } catch (Throwable t) {
                t.printStackTrace();
            }
            try {
                Thread.sleep(5000);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }
 
    static void doRoundScan(final int round, MongoCollection<Document> c) {
        int b = getRandom(batch - 200);
        for (int i = b; i < b + 100; i++) {
            int hour = getRandom(10);
            long end = System.currentTimeMillis() - hour * hour;
            long start = end - 5 * minute;
            final String startRow = i + "_" + start;
            final String endRow = i + "_" + end;
            c.find(
                new Document("_id", new Document().append("$gt", startRow).append("$lt", endRow)))
                .into(new ArrayList<Document>(), new SingleResultCallback<ArrayList<Document>>() {
                    @Override
                    public void onResult(ArrayList<Document> result, Throwable t) {
                        scan.addAndGet(result.size());
                    }
                });
            ;
        }
    }
 
    static List<Double> vs = Arrays.asList(new Double[] { 1.1, 2.2, 3.3, 4.4 });
 
    static void doRoundSave(final int round, MongoCollection<Document> c) {
        final long time = System.currentTimeMillis();
        for (int i = 0; i < batch; i++) {
            final String k = i + "";
            final String key = k + "_" + time;
            c.insertOne(new Document("_id", key).append("k", k).append("v", vs).append("t", time),
                new SingleResultCallback<Void>() {
                    @Override
                    public void onResult(Void result, Throwable t) {
                        if (t == null) {
                            good.incrementAndGet();
                        } else {
                            bad.incrementAndGet();
                            String key = t.getClass().getName() + "@" + round;
                            synchronized (map) {
                                Integer old = map.get(key);
                                if (old == null) {
                                    old = 0;
                                }
                                map.put(key, old + 1);
                            }
                        }
                    }
                });
        }
    }
 
    static MongoClient buildMongoClient(String server, int port) {
        String mongoString = "mongodb://" + server + ":" + port;
        ConnectionString connectionString = new ConnectionString(mongoString);
        ClusterSettings clusterSettings = ClusterSettings.builder()
            .applyConnectionString(connectionString).maxWaitQueueSize(queue).build();
        ConnectionPoolSettings poolSettings = ConnectionPoolSettings.builder().maxSize(connections)
            .minSize(connections).maxWaitQueueSize(queue).maxWaitTime(minute, TimeUnit.MILLISECONDS)
            .maxConnectionLifeTime(0, TimeUnit.MILLISECONDS)
            .maxConnectionIdleTime(0, TimeUnit.MILLISECONDS).build();
        SocketSettings socketSettings = SocketSettings.builder().keepAlive(true).build();
        MongoClientSettings settings = MongoClientSettings.builder()
            .clusterSettings(clusterSettings).readPreference(ReadPreference.secondaryPreferred())
            .connectionPoolSettings(poolSettings).socketSettings(socketSettings)
            .streamFactoryFactory(NettyStreamFactoryFactory.builder().build()).build();
        return MongoClients.create(settings);
    }
 
    private static Random seed = new Random();
 
    private static int getRandom(int max) {
        return (seed.nextInt() % max + max) % max;
    }
 
    static long minute = 60 * 1000;
    static long hour   = minute * 60;
}

It's quite simple: do some reads and writes every 5 seconds. Totally I started 40 instances to one mongo server, which was in standalone mode. In this situation it's not difficult to reproduce.
Hope it helps.
Thanks a lot.

Comment by Jeffrey Yemin [ 16/Sep/16 ]

Hi Daimin,

Can you share the sample application? It's possible that the problem is related to the specific read or write operation you're performing (e.g. count, vs find vs aggregate, or perhaps it's related to cursor management).

Regards,
Jeff

Comment by daimin [ 16/Sep/16 ]

Hi Jeff Yemin,

  • My MongoClient is built with this method:

    Test.java

        static int queue = 5000;
        static int connections = 100;
        private static MongoClient buildMongoClient(String server, int port) {
            String mongoString = "mongodb://" + server + ":" + port;
            ConnectionString connectionString = new ConnectionString(mongoString);
            ClusterSettings clusterSettings = ClusterSettings.builder()
                .applyConnectionString(connectionString).maxWaitQueueSize(queue).build();
            ConnectionPoolSettings poolSettings = ConnectionPoolSettings.builder().maxSize(connections)
                .minSize(connections).maxWaitQueueSize(queue)
                .maxWaitTime(60000, TimeUnit.MILLISECONDS)
                .maxConnectionLifeTime(0, TimeUnit.MILLISECONDS)
                .maxConnectionIdleTime(0, TimeUnit.MILLISECONDS).build();
            SocketSettings socketSettings = SocketSettings.builder().keepAlive(true).build();
            MongoClientSettings settings = MongoClientSettings.builder()
                .clusterSettings(clusterSettings).readPreference(ReadPreference.secondaryPreferred())
                .connectionPoolSettings(poolSettings).socketSettings(socketSettings)
                .streamFactoryFactory(NettyStreamFactoryFactory.builder()
                    .eventLoopGroup(ShareEventLoopGroup.getNioEventLoopGroup()).build())
                .build();
            return MongoClients.create(settings);
        }
    

  • the application is a sample application to test the bugs we met in our production environment. The problem is quite the same: mongo was killed by oom killer, our monitor program started it quickly. But the clients can not write and read any more accidentally(some of them can recover while others can not). The problem is kind of like: client got some connections 'in use' and those connections will never be released to 'avail pool', and they are not actually in use, maybe just some error occurs and the user forget to release them. We define the number of those connections 'x'. If x < 100 in our case, the IO performance decreased and if x = 100, it's unable to connect to mongo by any means. The affect is that we have to start all clients when mongo restarted.

Thanks a lot.

Comment by Jeffrey Yemin [ 16/Sep/16 ]

Hi Daimin,

Can you tell us a little more about the application? It would be useful to know

  • how you are constructing your MongoClient instance (connection string, settings, etc)
  • what the application is doing (a general description, or if possible a small sample application)

Thanks,
Jeff

Generated at Thu Feb 08 08:56:53 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.