Source code for mongoelector.elector

# -*- coding: utf-8 -*-
import threading
from time import sleep
import logging
from datetime import datetime
from pymongo.errors import OperationFailure


from mongoelector.locker import MongoLocker, LockExists, AcquireTimeout


[docs]class MongoElector(object): """ This object will do lots of awesome distributed master election coolness """ def __init__(self, key, db, ttl=15, onmaster=None, onmasterloss=None, onloop=None, app_version=None, report_status=True): """ Create a MongoElector instance :param key: Name of the distributed lock that is used for master election. should be unique to this type of daemon i.e. any instance for which you want to run exactly one master should all share this same name. :type key: str :param db: Connection to a MongoDB database :param ttl: Time-to-live for the distributed lock. If the master node fails silently, this timeout must be hit before another node will take over. :type ttl: int :param onmaster: Function that will be run every time this instance is elected as the new master :type onmaster: Function or Method :param onmasterloss: Function that will be run every time when this instance loses it's master status :type onmasterloss: Function or Method :param onloop: Function that will be run on every loop :type onloop: Function or Method :param app_version: Parent app version, if provided, will be included in node_status for monitoring :type app_version: str """ self._poll_lock = threading.Lock() self._ts_poll = None self._shutdown = False self._wasmaster = False self._app_version = app_version self._report_status = report_status self.elector_thread = None self.key = key self.db = db self._status_collection = getattr(self.db, 'elector.status') try: self._status_collection.create_index('timestamp', expireAfterSeconds=int(ttl)) except OperationFailure: # Handle TTL Changes self._status_collection.drop_indexes() self._status_collection.create_index('timestamp', expireAfterSeconds=int(ttl)) self._status_collection.create_index('key') self.ttl = ttl self.callback_onmaster = onmaster self.callback_onmasterloss = onmasterloss self.callback_onloop = onloop self.mlock = MongoLocker(self.key, self.db, dbcollection='elector.locks', ttl=self.ttl, timeparanoid=True)
[docs] def start(self, blocking=False): """ Starts mongo elector polling on a background thread then returns. If blocking is set to True, this will never return until stop() is :param blocking: If False, returns as soon as the elector thread is started. If True, will only return after stop() is called i.e. by another thread. :type blocking: bool """ self.elector_thread = ElectorThread(self) # give elector thread reference to mongolocker self.elector_thread.start() if blocking: self.elector_thread.join()
[docs] def stop(self): """Cleanly stop the elector. Surrender master if owned""" with self._poll_lock: self._shutdown = True if self.elector_thread: self.elector_thread.join() self.release()
@property def running(self): """Returns true if the elector logic is running""" return self.elector_thread.isAlive() @property def ismaster(self): """Returns True if this instance is master""" return self.mlock.owned() @property def master_exists(self): """Returns true if an instance (not necessarily this one) has master""" return self.mlock.locked()
[docs] def poll(self): """ Main polling logic, will refresh lock if it's owned, or tries to obtain the lock if it's available. Runs onloop callback after lock maintenance logic In general, this should only be called by the elector thread """ with self._poll_lock: self._ts_poll = datetime.utcnow() if self.mlock.owned(): self._wasmaster = True self.mlock.touch() else: if self._wasmaster: self._wasmaster = False if self.callback_onmasterloss: self.callback_onmasterloss() if not self.master_exists and not self._shutdown: try: self.mlock.acquire(blocking=False) except (LockExists, AcquireTimeout): pass else: if self.mlock.owned(): self._wasmaster = True if self.callback_onmaster: self.callback_onmaster() if self._report_status: self.report_status() if self.callback_onloop: self.callback_onloop()
[docs] def report_status(self): status = self.node_status self._status_collection.update({'_id': status['_id']}, status, upsert=True)
@property def cluster_detail(self): data = [x for x in self._status_collection.find({'key': self.key}, {'_id': 0}).sort('timestamp', -1)] return {'member_detail': data, 'master': parse_master(data), 'timestamp': datetime.utcnow()} @property def node_status(self): """Status info for current object""" status = self.mlock.status status['_id'] = status['uuid'] status['ismaster'] = self.ismaster status['elector_running'] = self.running status['last_poll'] = self._ts_poll if self._app_version: status['app_version'] = self._app_version return status
[docs] def release(self): """ Releases master lock if owned and calls onmasterloss if provided. """ with self._poll_lock: self.mlock.release() if self._wasmaster and self.callback_onmaster: self.callback_onmasterloss()
@property def pollwait(self): """An appropriate sleep time to wait before next poll""" if self._wasmaster: return self.ttl / 2.0 else: return self.ttl
[docs]class ElectorThread(threading.Thread): """Calls the election polling logic""" def __init__(self, elector): """Custom Thread object for the Elector""" super(ElectorThread, self).__init__() self.elector = elector
[docs] def run(self): """starts the elector polling logic, should not be called directly""" # noinspection PyProtectedMember while self.elector._shutdown is False: try: self.elector.poll() except Exception as e: logging.warning('Elector Poll Error: {}'.format(e)) finally: sleep(self.elector.pollwait)
[docs]def parse_master(data): allmasters = [x for x in data if x['ismaster']] # grab most recent master (prevents race) if allmasters: master = allmasters[0] else: master = None if master: return {'host': master['host'], 'process_id': master['pid'], 'uuid': master['uuid']} else: return None