Python Tailing Hangs

XMLWordPrintableJSON

    • Type: Task
    • Resolution: Cannot Reproduce
    • Priority: Blocker - P1
    • None
    • Affects Version/s: None
    • Component/s: None
    • None
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      Currently I have a process where I am tailing a mongo operations log. Intermittently I have an issue where the process hangs, and becomes unresponsive to kill signals or keyboard interrupts. I believe the issue is occurring in the tailing of the mongo oplog. Below is my code, I am using python3.4 and the latest version of pymongo. Any idea would be appreciated:

      from pymongo import MongoClient,ReadPreference,MongoReplicaSetClient
      import logging
      
      class MongoHelper:
        def __init__(self,**kwargs):  
          if "replicaSet" in kwargs and kwargs["replicaSet"]:
            self.client = MongoReplicaSetClient(host=kwargs["host"],
                                                port=(kwargs["port"] if "port" in kwargs and type(kwargs["port"]) is int else 27017),
                                                replicaSet = kwargs["replicaSet"],
                                                read_preference=ReadPreference.SECONDARY_PREFERRED,
                                                tz_aware=True)
          else:
            self.client = MongoClient(host=kwargs["host"],
                                      port=kwargs["port"],
                                      tz_aware=True)
          self.db = self.client[kwargs["db"]]
          self.db.authenticate(kwargs["user"], kwargs["password"], source=kwargs["auth_source"])
          self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
      
        def collection(self,collection_name):
          return self.db[collection_name]
      
      from pymongo.errors import AutoReconnect
      from bson import timestamp
      from time import sleep
      from collections import OrderedDict
      import logging
      
      
      class MongoTailer:  
        schema = OrderedDict([
          ("name", {
            "python_type" : "string",
            "sql_type" : "varchar",
            "length" : 10
          }),
          ("time", {
            "python_type" : "integer",
            "sql_type" : "integer"
          }),
          ("ordinal", {
            "python_type" : "integer",
            "sql_type" : "integer"
          })
        ])
      
        def __init__(self,**kwargs):
          self.client = kwargs["mongo"].client
          self.sql = kwargs["sql"]
          self._create_table()
          if "reimport" in kwargs and kwargs["reimport"] == True:
            self.current_ts, self.current_ordinal = self._get_current_oplog_time()
          else:
            self.current_ts, self.current_ordinal = self._get_stored_oplog_time()
          self.logger = kwargs["logger"] if "logger" in kwargs else logging.getLogger(__name__)
      
        def _get_current_oplog_time(self):
          r = self.client["local"].oplog.rs.find(limit=1).sort("$natural", -1)[0]
          if r:
            return r["ts"].time, r["ts"].inc
          else:
            raise NotImplemented("No oplog detected")
      
        def _get_stored_oplog_time(self):
          q = "SELECT time, ordinal FROM etl_mongo_oplog"
          r = self.sql.fetch_records(q)
          if len(r) > 0:
            if r[0][0] is not None and r[0][1] is not None:
              return r[0][0], r[0][1]
          return self._get_current_oplog_time()
      
        def _create_table(self):
          query = "SELECT *  FROM information_schema.tables WHERE table_name = 'etl_mongo_oplog'"
          exists = self.sql.fetch_records(query)
          if len(exists) == 0:
            self.sql.execute(self.sql.generate_create_table(schema=self.schema,name="etl_mongo_oplog",if_exists=True))
            self.sql.execute("INSERT INTO etl_mongo_oplog (name,time,ordinal) VALUES (%s, %s, %s)", ["tracker", None, None])
      
        def _update_oplog_time(self, ts):
          query = "UPDATE etl_mongo_oplog SET time = %s, ordinal = %s"
          self.sql.execute(query, [ts.time, ts.inc])
      
        def tail(self, instances):
          self.instances = instances
          query = {'ts': {'$gt': timestamp.Timestamp(self.current_ts, self.current_ordinal)}} 
          cursor = self.client["local"].oplog.rs.find(query,tailable=True, timeout=False).sort("$natural", 1)  
          try:
            while cursor.alive:
              for doc in cursor:
                self._handle_oplog(doc)
          except AutoReconnect as e:
            self.logger.warn("Lost connection to mongo...")
            sleep(10)
          finally:
            if cursor:
              cursor.close()
      
        def _handle_oplog(self, doc):
          if doc["op"] == "n":
            self.logger.info("System message in replica set: " + str(doc["o"]))
          elif doc["ns"] in self.instances:
            self.logger.info("Handling an oplog event {0} for {1}".format(doc["op"], doc["ns"]))
            self._handle_known_oplog(doc)
          else:
            args = {
              "ns" : str(doc["ns"]),
              "doc" : str(doc)
            }
            self.logger.warn("Unknown namespace for oplog event", extra = args)
          self._update_oplog_time(doc["ts"])
      
        def _handle_known_oplog(self,doc):
          if doc["op"] == "i":
            self._handle_insert(doc)
          elif doc["op"] == "u":
            self._handle_update(doc)
          elif doc["op"] == "d":
            self._handle_delete(doc)
      
        def _handle_insert(self, doc):
          self.logger.debug("Handling an insert document: {0}".format(str(doc)))
          self.instances[doc["ns"]].insert(doc["o"])
      
        def _handle_update(self, doc):
          self.logger.debug("Handling an update on document: {0}".format(str(doc)))
          self.instances[doc["ns"]].update(doc)
      
        def _handle_delete(self, doc):
          self.logger.debug("Handling a delete on document: {0}".format(str(doc)))
          self.instances[doc["ns"]].delete(doc, str(doc["o"]["_id"]))
      

        1. MongoHelper.py
          1 kB
          brad
        2. MongoTailer.py
          4 kB
          brad

            Assignee:
            Bernie Hackett
            Reporter:
            brad
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: