-
Type:
Task
-
Resolution: Cannot Reproduce
-
Priority:
Blocker - P1
-
None
-
Affects Version/s: None
-
Component/s: None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
-
None
Currently I have a process where I am tailing a mongo operations log. Intermittently I have an issue where the process hangs, and becomes unresponsive to kill signals or keyboard interrupts. I believe the issue is occurring in the tailing of the mongo oplog. Below is my code, I am using python3.4 and the latest version of pymongo. Any idea would be appreciated:
from pymongo import MongoClient,ReadPreference,MongoReplicaSetClient
import logging
class MongoHelper:
def __init__(self,**kwargs):
if "replicaSet" in kwargs and kwargs["replicaSet"]:
self.client = MongoReplicaSetClient(host=kwargs["host"],
port=(kwargs["port"] if "port" in kwargs and type(kwargs["port"]) is int else 27017),
replicaSet = kwargs["replicaSet"],
read_preference=ReadPreference.SECONDARY_PREFERRED,
tz_aware=True)
else:
self.client = MongoClient(host=kwargs["host"],
port=kwargs["port"],
tz_aware=True)
self.db = self.client[kwargs["db"]]
self.db.authenticate(kwargs["user"], kwargs["password"], source=kwargs["auth_source"])
self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
def collection(self,collection_name):
return self.db[collection_name]
from pymongo.errors import AutoReconnect
from bson import timestamp
from time import sleep
from collections import OrderedDict
import logging
class MongoTailer:
schema = OrderedDict([
("name", {
"python_type" : "string",
"sql_type" : "varchar",
"length" : 10
}),
("time", {
"python_type" : "integer",
"sql_type" : "integer"
}),
("ordinal", {
"python_type" : "integer",
"sql_type" : "integer"
})
])
def __init__(self,**kwargs):
self.client = kwargs["mongo"].client
self.sql = kwargs["sql"]
self._create_table()
if "reimport" in kwargs and kwargs["reimport"] == True:
self.current_ts, self.current_ordinal = self._get_current_oplog_time()
else:
self.current_ts, self.current_ordinal = self._get_stored_oplog_time()
self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
def _get_current_oplog_time(self):
r = self.client["local"].oplog.rs.find(limit=1).sort("$natural", -1)[0]
if r:
return r["ts"].time, r["ts"].inc
else:
raise NotImplemented("No oplog detected")
def _get_stored_oplog_time(self):
q = "SELECT time, ordinal FROM etl_mongo_oplog"
r = self.sql.fetch_records(q)
if len(r) > 0:
if r[0][0] is not None and r[0][1] is not None:
return r[0][0], r[0][1]
return self._get_current_oplog_time()
def _create_table(self):
query = "SELECT * FROM information_schema.tables WHERE table_name = 'etl_mongo_oplog'"
exists = self.sql.fetch_records(query)
if len(exists) == 0:
self.sql.execute(self.sql.generate_create_table(schema=self.schema,name="etl_mongo_oplog",if_exists=True))
self.sql.execute("INSERT INTO etl_mongo_oplog (name,time,ordinal) VALUES (%s, %s, %s)", ["tracker", None, None])
def _update_oplog_time(self, ts):
query = "UPDATE etl_mongo_oplog SET time = %s, ordinal = %s"
self.sql.execute(query, [ts.time, ts.inc])
def tail(self, instances):
self.instances = instances
query = {'ts': {'$gt': timestamp.Timestamp(self.current_ts, self.current_ordinal)}}
cursor = self.client["local"].oplog.rs.find(query,tailable=True, timeout=False).sort("$natural", 1)
try:
while cursor.alive:
for doc in cursor:
self._handle_oplog(doc)
except AutoReconnect as e:
self.logger.warn("Lost connection to mongo...")
sleep(10)
finally:
if cursor:
cursor.close()
def _handle_oplog(self, doc):
if doc["op"] == "n":
self.logger.info("System message in replica set: " + str(doc["o"]))
elif doc["ns"] in self.instances:
self.logger.info("Handling an oplog event {0} for {1}".format(doc["op"], doc["ns"]))
self._handle_known_oplog(doc)
else:
args = {
"ns" : str(doc["ns"]),
"doc" : str(doc)
}
self.logger.warn("Unknown namespace for oplog event", extra = args)
self._update_oplog_time(doc["ts"])
def _handle_known_oplog(self,doc):
if doc["op"] == "i":
self._handle_insert(doc)
elif doc["op"] == "u":
self._handle_update(doc)
elif doc["op"] == "d":
self._handle_delete(doc)
def _handle_insert(self, doc):
self.logger.debug("Handling an insert document: {0}".format(str(doc)))
self.instances[doc["ns"]].insert(doc["o"])
def _handle_update(self, doc):
self.logger.debug("Handling an update on document: {0}".format(str(doc)))
self.instances[doc["ns"]].update(doc)
def _handle_delete(self, doc):
self.logger.debug("Handling a delete on document: {0}".format(str(doc)))
self.instances[doc["ns"]].delete(doc, str(doc["o"]["_id"]))