/** * Copyright (C) 2011 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb.perf; import com.mongodb.*; import org.bson.types.ObjectId; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * Test for reading from a large connection pool */ public class MongoUpsertContentionTest { public static void testPoolsOfDifferentSizeWithDifferentThreadCounts() throws UnknownHostException, InterruptedException { ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); threadMXBean.setThreadContentionMonitoringEnabled(true); threadMXBean.setThreadCpuTimeEnabled(true); // PLAY AROUND WITH THESE NUMBERS TO GET THE EFFECT YOU WANT int numUpsertsPerJob = 10000; boolean reserve = false; // int numThreads = 40; // int poolSize = 10; System.out.println("Mongo version: " + Mongo.MAJOR_VERSION + "." + Mongo.MINOR_VERSION); System.out.println("upserts per job: " + numUpsertsPerJob); System.out.println(); System.out.println(); for (int numThreads = 5; numThreads <= 45; numThreads += 10) { for (int poolSize = 10; poolSize <= 200; poolSize += 20) { doRun(numThreads, poolSize, numUpsertsPerJob, reserve); } } } private static void doRun(int numThreads, int poolSize, int numUpsertsPerJob, boolean reserve) throws UnknownHostException, InterruptedException { MongoOptions options = new MongoOptions(); options.connectionsPerHost = poolSize; Mongo mongo = new Mongo(Arrays.asList(new ServerAddress("127.0.0.1", 27100)), options); DB db = mongo.getDB("largeConnectionPoolDB"); db.dropDatabase(); // System.out.println("Warming up..."); executeParallelOperation(mongo, poolSize, 10, true, WriteConcern.SAFE, false); // System.out.println(); System.out.println("threads: " + numThreads); System.out.println("pool size: " + poolSize); System.out.println("----------"); executeParallelOperation(mongo, numThreads, numUpsertsPerJob, reserve, WriteConcern.NONE, true); mongo.getDB("largeConnectionPoolDB").dropDatabase(); mongo.close(); } private static void executeParallelOperation(Mongo mongo, int numThreads, int numUpserts, boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws UnknownHostException, InterruptedException { DBCollection coll = mongo.getDB("largeConnectionPoolDB").getCollection("largeConnectionPoolDB"); coll.drop(); doOne(numThreads, numUpserts, coll, reserveConnection, writeConcern, printResults); } private static void doOne(int numThreads, final int numUpserts, final DBCollection coll, final boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws InterruptedException { long start = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(numThreads); List jobs = new ArrayList(); for (int x = 0; x < numThreads; x++) { ContentionJob job = new ContentionJob(coll, reserveConnection, numUpserts, writeConcern); jobs.add(job); es.submit(job); } for (ContentionJob job : jobs) { job.awaitCompletion(); } if (printResults) { long end = System.currentTimeMillis(); System.out.println("time elapsed: " + (end - start) + " ms"); printPoolThreadInfo(); System.out.println(); } es.shutdown(); es.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } public static void printPoolThreadInfo() throws InterruptedException { ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); long totalBlockedCount = 0; long totalBlockedTime = 0; long totalCpuTime = 0; long totalThreadUserTime = 0; for (ThreadInfo cur : threadInfos) { if (!cur.getThreadName().startsWith("pool")) { continue; } totalBlockedCount += cur.getBlockedCount(); totalBlockedTime += cur.getBlockedTime(); totalCpuTime += threadMXBean.getThreadCpuTime(cur.getThreadId()) / 1000000; totalThreadUserTime = threadMXBean.getThreadUserTime(cur.getThreadId()) / 1000000; // System.out.print(cur.getThreadName() + ": "); // System.out.print("blocked count: " + cur.getBlockedCount()); // System.out.print(", blocked time: " + cur.getBlockedTime() + " ms"); // System.out.print(", cpu time: " + (threadMXBean.getThreadCpuTime(cur.getThreadId()) / 1000000) + " ms"); // System.out.print(", user time: " + (threadMXBean.getThreadUserTime(cur.getThreadId()) / 1000000) + " ms"); // System.out.println(); } // System.out.println(); // System.out.println("Totals: "); System.out.println("blocked count: " + totalBlockedCount); System.out.println("blocked time: " + totalBlockedTime + " ms"); System.out.println("cpu time: " + totalCpuTime + " ms"); // System.out.println("user time: " + totalThreadUserTime + " ms"); } public static void main(String args[]) throws UnknownHostException, InterruptedException { testPoolsOfDifferentSizeWithDifferentThreadCounts(); } static class ContentionJob implements Runnable { protected final DBCollection coll; private final boolean reserveConnection; private final int num; private final WriteConcern writeConcern; private volatile boolean done; ContentionJob(DBCollection coll, boolean reserveConnection, int num, WriteConcern writeConcern) { this.coll = coll; this.reserveConnection = reserveConnection; this.num = num; this.writeConcern = writeConcern; } public void run() { if (reserveConnection) coll.getDB().requestStart(); try { DBObject obj = new BasicDBObject(); obj.put("_id", new ObjectId()); DBObject upsert = new BasicDBObject("$inc", new BasicDBObject("x", 1)); for (int i = 0; i < num; i++) { coll.update(obj, upsert, true, false, writeConcern); } } catch (RuntimeException e) { Logger.getAnonymousLogger().log(Level.INFO, "Exception", e); } finally { if (reserveConnection) coll.getDB().requestDone(); } synchronized (this) { done = true; notify(); } } public synchronized void awaitCompletion() throws InterruptedException { while (!done) { wait(); } } } }