-
Type:
Task
-
Resolution: Cannot Reproduce
-
Priority:
Blocker - P1
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
Currently I have a process where I am tailing a mongo operations log. Intermittently I have an issue where the process hangs, and becomes unresponsive to kill signals or keyboard interrupts. I believe the issue is occurring in the tailing of the mongo oplog. Below is my code, I am using python3.4 and the latest version of pymongo. Any idea would be appreciated:
from pymongo import MongoClient,ReadPreference,MongoReplicaSetClient import logging class MongoHelper: def __init__(self,**kwargs): if "replicaSet" in kwargs and kwargs["replicaSet"]: self.client = MongoReplicaSetClient(host=kwargs["host"], port=(kwargs["port"] if "port" in kwargs and type(kwargs["port"]) is int else 27017), replicaSet = kwargs["replicaSet"], read_preference=ReadPreference.SECONDARY_PREFERRED, tz_aware=True) else: self.client = MongoClient(host=kwargs["host"], port=kwargs["port"], tz_aware=True) self.db = self.client[kwargs["db"]] self.db.authenticate(kwargs["user"], kwargs["password"], source=kwargs["auth_source"]) self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__) def collection(self,collection_name): return self.db[collection_name]
from pymongo.errors import AutoReconnect from bson import timestamp from time import sleep from collections import OrderedDict import logging class MongoTailer: schema = OrderedDict([ ("name", { "python_type" : "string", "sql_type" : "varchar", "length" : 10 }), ("time", { "python_type" : "integer", "sql_type" : "integer" }), ("ordinal", { "python_type" : "integer", "sql_type" : "integer" }) ]) def __init__(self,**kwargs): self.client = kwargs["mongo"].client self.sql = kwargs["sql"] self._create_table() if "reimport" in kwargs and kwargs["reimport"] == True: self.current_ts, self.current_ordinal = self._get_current_oplog_time() else: self.current_ts, self.current_ordinal = self._get_stored_oplog_time() self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__) def _get_current_oplog_time(self): r = self.client["local"].oplog.rs.find(limit=1).sort("$natural", -1)[0] if r: return r["ts"].time, r["ts"].inc else: raise NotImplemented("No oplog detected") def _get_stored_oplog_time(self): q = "SELECT time, ordinal FROM etl_mongo_oplog" r = self.sql.fetch_records(q) if len(r) > 0: if r[0][0] is not None and r[0][1] is not None: return r[0][0], r[0][1] return self._get_current_oplog_time() def _create_table(self): query = "SELECT * FROM information_schema.tables WHERE table_name = 'etl_mongo_oplog'" exists = self.sql.fetch_records(query) if len(exists) == 0: self.sql.execute(self.sql.generate_create_table(schema=self.schema,name="etl_mongo_oplog",if_exists=True)) self.sql.execute("INSERT INTO etl_mongo_oplog (name,time,ordinal) VALUES (%s, %s, %s)", ["tracker", None, None]) def _update_oplog_time(self, ts): query = "UPDATE etl_mongo_oplog SET time = %s, ordinal = %s" self.sql.execute(query, [ts.time, ts.inc]) def tail(self, instances): self.instances = instances query = {'ts': {'$gt': timestamp.Timestamp(self.current_ts, self.current_ordinal)}} cursor = self.client["local"].oplog.rs.find(query,tailable=True, timeout=False).sort("$natural", 1) try: while cursor.alive: for doc in cursor: self._handle_oplog(doc) except AutoReconnect as e: self.logger.warn("Lost connection to mongo...") sleep(10) finally: if cursor: cursor.close() def _handle_oplog(self, doc): if doc["op"] == "n": self.logger.info("System message in replica set: " + str(doc["o"])) elif doc["ns"] in self.instances: self.logger.info("Handling an oplog event {0} for {1}".format(doc["op"], doc["ns"])) self._handle_known_oplog(doc) else: args = { "ns" : str(doc["ns"]), "doc" : str(doc) } self.logger.warn("Unknown namespace for oplog event", extra = args) self._update_oplog_time(doc["ts"]) def _handle_known_oplog(self,doc): if doc["op"] == "i": self._handle_insert(doc) elif doc["op"] == "u": self._handle_update(doc) elif doc["op"] == "d": self._handle_delete(doc) def _handle_insert(self, doc): self.logger.debug("Handling an insert document: {0}".format(str(doc))) self.instances[doc["ns"]].insert(doc["o"]) def _handle_update(self, doc): self.logger.debug("Handling an update on document: {0}".format(str(doc))) self.instances[doc["ns"]].update(doc) def _handle_delete(self, doc): self.logger.debug("Handling a delete on document: {0}".format(str(doc))) self.instances[doc["ns"]].delete(doc, str(doc["o"]["_id"]))