- 
    Type:Task 
- 
    Resolution: Cannot Reproduce
- 
    Priority:Blocker - P1 
- 
    None
- 
    Affects Version/s: None
- 
    Component/s: None
- 
    None
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
- 
        None
Currently I have a process where I am tailing a mongo operations log. Intermittently I have an issue where the process hangs, and becomes unresponsive to kill signals or keyboard interrupts. I believe the issue is occurring in the tailing of the mongo oplog. Below is my code, I am using python3.4 and the latest version of pymongo. Any idea would be appreciated:
from pymongo import MongoClient,ReadPreference,MongoReplicaSetClient
import logging
class MongoHelper:
  def __init__(self,**kwargs):  
    if "replicaSet" in kwargs and kwargs["replicaSet"]:
      self.client = MongoReplicaSetClient(host=kwargs["host"],
                                          port=(kwargs["port"] if "port" in kwargs and type(kwargs["port"]) is int else 27017),
                                          replicaSet = kwargs["replicaSet"],
                                          read_preference=ReadPreference.SECONDARY_PREFERRED,
                                          tz_aware=True)
    else:
      self.client = MongoClient(host=kwargs["host"],
                                port=kwargs["port"],
                                tz_aware=True)
    self.db = self.client[kwargs["db"]]
    self.db.authenticate(kwargs["user"], kwargs["password"], source=kwargs["auth_source"])
    self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
  def collection(self,collection_name):
    return self.db[collection_name]
from pymongo.errors import AutoReconnect
from bson import timestamp
from time import sleep
from collections import OrderedDict
import logging
class MongoTailer:  
  schema = OrderedDict([
    ("name", {
      "python_type" : "string",
      "sql_type" : "varchar",
      "length" : 10
    }),
    ("time", {
      "python_type" : "integer",
      "sql_type" : "integer"
    }),
    ("ordinal", {
      "python_type" : "integer",
      "sql_type" : "integer"
    })
  ])
  def __init__(self,**kwargs):
    self.client = kwargs["mongo"].client
    self.sql = kwargs["sql"]
    self._create_table()
    if "reimport" in kwargs and kwargs["reimport"] == True:
      self.current_ts, self.current_ordinal = self._get_current_oplog_time()
    else:
      self.current_ts, self.current_ordinal = self._get_stored_oplog_time()
    self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
  def _get_current_oplog_time(self):
    r = self.client["local"].oplog.rs.find(limit=1).sort("$natural", -1)[0]
    if r:
      return r["ts"].time, r["ts"].inc
    else:
      raise NotImplemented("No oplog detected")
  def _get_stored_oplog_time(self):
    q = "SELECT time, ordinal FROM etl_mongo_oplog"
    r = self.sql.fetch_records(q)
    if len(r) > 0:
      if r[0][0] is not None and r[0][1] is not None:
        return r[0][0], r[0][1]
    return self._get_current_oplog_time()
  def _create_table(self):
    query = "SELECT *  FROM information_schema.tables WHERE table_name = 'etl_mongo_oplog'"
    exists = self.sql.fetch_records(query)
    if len(exists) == 0:
      self.sql.execute(self.sql.generate_create_table(schema=self.schema,name="etl_mongo_oplog",if_exists=True))
      self.sql.execute("INSERT INTO etl_mongo_oplog (name,time,ordinal) VALUES (%s, %s, %s)", ["tracker", None, None])
  def _update_oplog_time(self, ts):
    query = "UPDATE etl_mongo_oplog SET time = %s, ordinal = %s"
    self.sql.execute(query, [ts.time, ts.inc])
  def tail(self, instances):
    self.instances = instances
    query = {'ts': {'$gt': timestamp.Timestamp(self.current_ts, self.current_ordinal)}} 
    cursor = self.client["local"].oplog.rs.find(query,tailable=True, timeout=False).sort("$natural", 1)  
    try:
      while cursor.alive:
        for doc in cursor:
          self._handle_oplog(doc)
    except AutoReconnect as e:
      self.logger.warn("Lost connection to mongo...")
      sleep(10)
    finally:
      if cursor:
        cursor.close()
  def _handle_oplog(self, doc):
    if doc["op"] == "n":
      self.logger.info("System message in replica set: " + str(doc["o"]))
    elif doc["ns"] in self.instances:
      self.logger.info("Handling an oplog event {0} for {1}".format(doc["op"], doc["ns"]))
      self._handle_known_oplog(doc)
    else:
      args = {
        "ns" : str(doc["ns"]),
        "doc" : str(doc)
      }
      self.logger.warn("Unknown namespace for oplog event", extra = args)
    self._update_oplog_time(doc["ts"])
  def _handle_known_oplog(self,doc):
    if doc["op"] == "i":
      self._handle_insert(doc)
    elif doc["op"] == "u":
      self._handle_update(doc)
    elif doc["op"] == "d":
      self._handle_delete(doc)
  def _handle_insert(self, doc):
    self.logger.debug("Handling an insert document: {0}".format(str(doc)))
    self.instances[doc["ns"]].insert(doc["o"])
  def _handle_update(self, doc):
    self.logger.debug("Handling an update on document: {0}".format(str(doc)))
    self.instances[doc["ns"]].update(doc)
  def _handle_delete(self, doc):
    self.logger.debug("Handling a delete on document: {0}".format(str(doc)))
    self.instances[doc["ns"]].delete(doc, str(doc["o"]["_id"]))