package com.thistech.spotlink.util; import com.mongodb.*; import java.net.UnknownHostException; public class MapReduceTest { private String name = this.getClass().getSimpleName(); private String host = "some.host"; private String database = "database"; private String username = "username"; private String password = "password"; private String source = "collection_name"; public static final String MAP_FUNCTION = "function(){" + " var ts = this.ts;" + " ts.setMinutes(0); ts.setSeconds(0); ts.setMilliseconds(0);" + " emit({'ts': ts, 'ads': this.payload.adsResponses[0].adsName}, 1);" + "}"; public static final String REDUCE_FUNCTION = "function (key, val) {" + " var total = 0;" + " for (var i in val){" + " total += val[i];" + " }" + " return total" + ";}"; public void executeAggregation() { System.out.println("Running Aggregator for " + name); try { DBCollection collection = getCollection(); System.out.println("Setting ReadPreference.secondary() on the DBCollection"); collection.setReadPreference(ReadPreference.secondary()); MapReduceCommand cmd = new MapReduceCommand( collection, MAP_FUNCTION, REDUCE_FUNCTION, null, MapReduceCommand.OutputType.INLINE, null ); System.out.println("Setting ReadPreference.secondary() on MapReduceCommand"); cmd.setReadPreference(ReadPreference.secondary()); System.out.println(cmd.toString()); collection.mapReduce(cmd); System.out.println("Finished Aggregator for " + name); } catch (Exception e) { System.err.println(String.format("Failed to execute Aggregator for %s: from %s", name, source)); e.printStackTrace(); } } public DBCollection getCollection() throws UnknownHostException { System.out.println("Setting ReadPreference.secondary() on MongoClientOptions"); MongoClientOptions options = new MongoClientOptions.Builder() .readPreference(ReadPreference.secondary()) .build(); MongoClient client = new MongoClient(host, options); System.out.println("Setting ReadPreference.secondary() on MongoClient itself"); client.setReadPreference(ReadPreference.secondary()); DB db = client.getDB(database); db.authenticate(username, password.toCharArray()); System.out.println("Setting ReadPreference.secondary() on the DB object"); db.setReadPreference(ReadPreference.secondary()); return db.getCollection(source); } public static void main(String[] args) { new MapReduceTest().executeAggregation(); } }