/* * Author: Joseph Wang */ package com.leadpoint.syndication.db.mongo.mortgage; import java.util.ArrayList; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.leadpoint.config.Meta; import com.leadpoint.db.Tuple; import com.leadpoint.syndication.common.Constants; import com.leadpoint.syndication.common.mortgage.BaseTableRow; import com.leadpoint.syndication.db.mongo.mortgage.MongoConnnection; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; public class BaseTableQueryEngine { private static Meta META = Meta.getMeta(BaseTableQueryEngine.class); private static boolean enable_debug = META .getBoolean("enable_debug", false); protected BasicDBObject dbQuery; protected String currentDb = Constants.BLANK_STRING; protected String currentCollection = Constants.BLANK_STRING; protected boolean success = true; private static Log log = LogFactory.getLog(BaseTableQueryEngine.class); protected BaseTableQueryEngine(Map args) { } public boolean isSuccess() { return success; } public void setSuccess(boolean success) { this.success = success; } protected ArrayList fetch(int timeout) { ArrayList tuples = new ArrayList(200); try { Mongo mongo = MongoConnnection.getInstance().getMongo(); long fStart = System.currentTimeMillis(); if (mongo != null) { DB db = mongo.getDB(currentDb); if (enable_debug) { log.debug("BaseTableQueryEngine: Connected to db " + currentDb + " collection " + currentCollection); } if (db != null) { DBCollection coll = db.getCollection(currentCollection); DBCursor cur = null; cur = coll.find(dbQuery).addOption( Bytes.QUERYOPTION_SLAVEOK); DBObject dbObject = db.getLastError(); if (dbObject != null && dbObject.get("err") != null) { log.warn("BaseTableQueryEngine: Encounter error for query " + dbQuery.toString()); setSuccess(false); } if (enable_debug) { log.debug("BaseTableQueryEngine: Run query " + dbQuery.toString()); log.debug("BaseTableQueryEngine: Found " + cur.count() + " in " + (System.currentTimeMillis() - fStart)); } while (cur.hasNext()) { BasicDBObject dbo = (BasicDBObject) cur.next(); BaseTableRow row = new BaseTableRow(dbo); if (row.isValid()) { tuples.add(row.getTuple()); } long time = (timeout - (System.currentTimeMillis() - fStart)); if (time < 0) { break; } } if (enable_debug) { log.debug("BaseTableQueryEngine: tuples " + tuples.size() + " in time " + (System.currentTimeMillis() - fStart)); } } } else { log.info("BaseTableQueryEngine: reconnect MongoDB"); MongoConnnection.getInstance().reconnect(); } } catch (Exception ex) { log.error("BaseTableQueryEngine: exception " + ex); ex.printStackTrace(); setSuccess(false); } return tuples; } public ArrayList execute(int timeout) { long fStart = System.currentTimeMillis(); ArrayList tuples = fetch(timeout); long time = (timeout - (System.currentTimeMillis() - fStart)); if ((tuples == null || !isSuccess()) && time > 0) { ArrayList tuples2 = fetch((int) time); if (tuples2 != null && (tuples == null || tuples.size() < tuples2.size())) { return tuples2; } } return tuples; } }