Source code for retools.queue

"""Queue worker and manager

.. note::

    The queueing functionality is new, and has gone through some preliminary
    testing. Please report any issues found on `the retools Github issue
    tracker <https://github.com/bbangert/retools/issues>`_.

Any function that takes keyword arguments can be a ``job`` that a worker runs.
The :class:`~retools.queue.QueueManager` handles configuration and enqueing jobs
to be run.

Declaring jobs:

.. code-block:: python

    # mypackage/jobs.py

    # jobs

    def default_job():
        # do some basic thing

    def important(somearg=None):
        # do an important thing

    # event handlers

    def my_event_handler(sender, **kwargs):
        # do something

    def save_error(sender, **kwargs):
        # record error


Running Jobs::

    from retools.queue import QueueManager

    qm = QueueManager()
    qm.subscriber('job_failure', handler='mypackage.jobs:save_error')
    qm.subscriber('job_postrun', 'mypackage.jobs:important',
                  handler='mypackage.jobs:my_event_handler')
    qm.enqueue('mypackage.jobs:important', somearg='fred')


.. note::

    The events for a job are registered with the :class:`QueueManager` and are
    encoded in the job's JSON blob. Updating events for a job will therefore
    only take effect for new jobs queued, and not existing ones on the queue.

.. _queue_events:

Events
======

The retools queue has events available for additional functionality without
having to subclass or directly extend retools. These functions will be run by
the worker when the job is handled.

Available events to register for:

* **job_prerun**: Runs immediately before the job is run.
* **job_wrapper**: Wraps the execution of the job, these should be context
  managers.
* **job_postrun**: Runs after the job completes *successfully*, this will not
  be run if the job throws an exception.
* **job_failure**: Runs when a job throws an exception.

Event Function Signatures
-------------------------

Event functions have different call semantics, the following is a list of how
the event functions will be called:

* **job_prerun**: (job=job_instance)
* **job_wrapper**: (job_function, job_instance, \*\*job_keyword_arguments)
* **job_postrun**: (job=job_instance, result=job_function_result)
* **job_failure**: (job=job_instance, exc=job_exception)

Attributes of interest on the job instance are documented in the
:meth:`Job.__init__` method.

.. _queue_worker:

Running the Worker
==================

After installing ``retools``, a ``retools-worker`` command will be available
that can spawn a worker. Queues to watch can be listed in order for priority
queueing, in which case the worker will try each queue in order looking for jobs
to process.

Example invocation:

.. code-block:: bash

    $ retools-worker high,medium,main

"""
import os
import signal
import socket
import subprocess
import sys
import time
import uuid
from datetime import datetime
from optparse import OptionParser

import pkg_resources

try:
    import json
except ImportError:  # pragma: nocover
    import simplejson as json

from setproctitle import setproctitle

from retools import global_connection
from retools.exc import ConfigurationError
from retools.util import with_nested_contexts


[docs]class QueueManager(object): """Configures and enqueues jobs"""
[docs] def __init__(self, redis=None, default_queue_name='main', serializer=json.dumps, deserializer=json.loads): """Initialize a QueueManager :param redis: A Redis instance. Defaults to the redis instance on the global_connection. :param default_queue_name: The default queue name. Defaults to 'main'. :param serializer: A callable to serialize json data, defaults to json.dumps(). :param deserializer: A callable to deserialize json data, defaults to json.loads(). """ self.default_queue_name = default_queue_name self.redis = redis or global_connection.redis self.names = {} # cache name lookups self.job_config = {} self.job_events = {} self.global_events = {} self.serializer = serializer self.deserializer = deserializer
[docs] def set_queue_for_job(self, job_name, queue_name): """Set the queue that a given job name will go to :param job_name: The pkg_resource name of the job function. I.e. retools.jobs:my_function :param queue_name: Name of the queue on Redis job payloads should go to """ self.job_config[job_name] = queue_name
def get_job(self, job_id, queue_name=None, full_job=True): if queue_name is None: queue_name = self.default_queue_name full_queue_name = 'retools:queue:' + queue_name current_len = self.redis.llen(full_queue_name) # that's O(n), we should do better for i in range(current_len): # the list can change while doing this # so we need to catch any index error job = self.redis.lindex(full_queue_name, i) job_data = self.deserializer(job) if job_data['job_id'] == job_id: if not full_job: return job_data['job_id'] return Job(full_queue_name, job, self.redis, serializer=self.serializer, deserializer=self.deserializer) raise IndexError(job_id) def get_jobs(self, queue_name=None, full_job=True): if queue_name is None: queue_name = self.default_queue_name full_queue_name = 'retools:queue:' + queue_name current_len = self.redis.llen(full_queue_name) for i in range(current_len): # the list can change while doing this # so we need to catch any index error job = self.redis.lindex(full_queue_name, i) if not full_job: job_dict = self.deserializer(job) yield job_dict['job_id'] yield Job(full_queue_name, job, self.redis, serializer=self.serializer, deserializer=self.deserializer)
[docs] def subscriber(self, event, job=None, handler=None): """Set events for a specific job or for all jobs :param event: The name of the event to subscribe to. :param job: Optional, a specific job to bind to. :param handler: The location of the handler to call. """ if job: job_events = self.job_events.setdefault(job, {}) job_events.setdefault(event, []).append(handler) else: self.global_events.setdefault(event, []).append(handler)
[docs] def enqueue(self, job, **kwargs): """Enqueue a job :param job: The pkg_resouce name of the function. I.e. retools.jobs:my_function :param kwargs: Keyword arguments the job should be called with. These arguments must be serializeable by JSON. :returns: The job id that was queued. """ if job not in self.names: job_func = pkg_resources.EntryPoint.parse('x=%s' % job).load(False) self.names[job] = job_func queue_name = kwargs.pop('queue_name', None) if not queue_name: queue_name = self.job_config.get('job', self.default_queue_name) metadata = kwargs.pop('metadata', None) if metadata is None: metadata = {} full_queue_name = 'retools:queue:' + queue_name job_id = uuid.uuid4().hex events = self.global_events.copy() if job in self.job_events: for k, v in self.job_events[job].items(): events.setdefault(k, []).extend(v) job_dct = { 'job_id': job_id, 'job': job, 'kwargs': kwargs, 'events': events, 'metadata': metadata, 'state': {} } pipeline = self.redis.pipeline() pipeline.rpush(full_queue_name, self.serializer(job_dct)) pipeline.sadd('retools:queues', queue_name) pipeline.execute() return job_id
[docs]class Job(object):
[docs] def __init__(self, queue_name, job_payload, redis, serializer=json.dumps, deserializer=json.loads): """Create a job instance given a JSON job payload :param job_payload: A JSON string representing a job. :param queue_name: The queue this job was pulled off of. :param redis: The redis instance used to pull this job. A :class:`Job` instance is created when the Worker pulls a job payload off the queue. The ``current_job`` global is set upon creation to indicate the current job being processed. Attributes of interest for event functions: * **job_id**: The Job's ID * **job_name**: The Job's name (it's package + function name) * **queue_name**: The queue this job came from * **kwargs**: The keyword arguments the job is called with * **state**: The state dict, this can be used by events to retain additional arguments. I.e. for a retry extension, retry information can be stored in the ``state`` dict. * **func**: A reference to the job function * **redis**: A :class:`redis.Redis` instance. * **serializer**: A callable to serialize json data, defaults to :func:`json.dumps`. * **deserializer**: A callable to deserialize json data, defaults to :func:`json.loads`. """ global current_job current_job = self self.deserializer = deserializer self.serializer = serializer self.payload = payload = deserializer(job_payload) self.job_id = payload['job_id'] self.job_name = payload['job'] self.queue_name = queue_name self.kwargs = payload['kwargs'] self.state = payload['state'] self.metadata = payload.get('metadata', {}) self.events = {} self.redis = redis self.func = None self.events = self.load_events(event_dict=payload['events'])
def __repr__(self): """Display representation of self""" res = '<%s object at %s: ' % (self.__class__.__name__, hex(id(self))) res += 'Events: %s, ' % self.events res += 'State: %s, ' % self.state res += 'Job ID: %s, ' % self.job_id res += 'Job Name: %s, ' % self.job_name res += 'Queue: %s' % self.queue_name res += '>' return res @staticmethod
[docs] def load_events(event_dict): """Load all the events given the references :param event_dict: A dictionary of events keyed by event name to a list of handlers for the event. """ events = {} for k, v in event_dict.items(): funcs = [] for name in v: mod_name, func_name = name.split(':') try: mod = sys.modules[mod_name] except KeyError: __import__(mod_name) mod = sys.modules[mod_name] funcs.append(getattr(mod, func_name)) events[k] = funcs return events
[docs] def perform(self): """Runs the job calling all the job signals as appropriate""" self.run_event('job_prerun') try: if 'job_wrapper' in self.events: result = with_nested_contexts(self.events['job_wrapper'], self.func, [self], self.kwargs) else: result = self.func(**self.kwargs) self.run_event('job_postrun', result=result) return True except Exception, exc: self.run_event('job_failure', exc=exc) return False
def to_dict(self): return { 'job_id': self.job_id, 'job': self.job_name, 'kwargs': self.kwargs, 'events': self.payload['events'], 'state': self.state, 'metadata': self.metadata} def to_json(self): return self.serializer(self.to_dict())
[docs] def enqueue(self): """Queue this job in Redis""" full_queue_name = self.queue_name queue_name = full_queue_name[len('retools:queue:'):] pipeline = self.redis.pipeline() pipeline.rpush(full_queue_name, self.to_json()) pipeline.sadd('retools:queues', queue_name) pipeline.execute() return self.job_id
[docs] def run_event(self, event, **kwargs): """Run all registered events for this job""" for event_func in self.events.get(event, []): event_func(job=self, **kwargs)
[docs]class Worker(object): """A Worker works on jobs"""
[docs] def __init__(self, queues, redis=None, serializer=json.dumps, deserializer=json.loads): """Create a worker :param queues: List of queues to process :type queues: list :param redis: Redis instance to use, defaults to the global_connection. In the event that there is only a single queue in the list Redis list blocking will be used for lower latency job processing """ self.redis = redis or global_connection.redis self.serializer = serializer self.deserializer = deserializer if not queues: raise ConfigurationError( "No queues were configured for this worker") self.queues = ['retools:queue:%s' % x for x in queues] self.paused = self.shutdown = False self.job = None self.child_id = None self.jobs = {} # job function import cache
@classmethod def get_workers(cls, redis=None): redis = redis or global_connection.redis for worker_id in redis.smembers('retools:workers'): yield cls.from_id(worker_id) @classmethod def get_worker_ids(cls, redis=None): redis = redis or global_connection.redis return redis.smembers('retools:workers') @classmethod def from_id(cls, worker_id, redis=None): redis = redis or global_connection.redis if not redis.sismember("retools:workers", worker_id): raise IndexError(worker_id) queues = redis.get("retools:worker:%s:queues" % worker_id) queues = queues.split(',') return Worker(queues, redis) @property
[docs] def worker_id(self): """Returns this workers id based on hostname, pid, queues""" return '%s:%s:%s' % (socket.gethostname(), os.getpid(), self.queue_names)
@property def queue_names(self): names = [x[len('retools:queue:'):] for x in self.queues] return ','.join(names)
[docs] def work(self, interval=5, blocking=False): """Work on jobs This is the main method of the Worker, and will register itself with Redis as a Worker, wait for jobs, then process them. :param interval: Time in seconds between polling. :type interval: int :param blocking: Whether or not blocking pop should be used. If the blocking pop is used, then the worker will block for ``interval`` seconds at a time waiting for a new job. This affects how often the worker can respond to signals. :type blocking: bool """ self.set_proc_title('Starting') self.startup() try: while 1: if self.shutdown: break # Set this first since reserve may block for awhile self.set_proc_title("Waiting for %s" % self.queue_names) if not self.paused and self.reserve(interval, blocking): self.working_on() self.child_id = os.fork() if self.child_id: self.set_proc_title("Forked %s at %s" % ( self.child_id, datetime.now())) try: os.wait() except OSError: # got killed pass else: self.set_proc_title("Processing %s since %s" % ( self.job.queue_name, datetime.now())) self.perform() sys.exit() self.done_working() self.child_id = None self.job = None else: if self.paused: self.set_proc_title("Paused") elif not blocking: self.set_proc_title( "Waiting for %s" % self.queue_names) time.sleep(interval) finally: self.unregister_worker()
[docs] def reserve(self, interval, blocking): """Attempts to pull a job off the queue(s)""" queue_name = None if blocking: result = self.redis.blpop(self.queues, timeout=interval) if result: queue_name, job_payload = result else: for queue in self.queues: job_payload = self.redis.lpop(queue) if job_payload: queue_name = queue break if not queue_name: return False self.job = job = Job(queue_name=queue_name, job_payload=job_payload, redis=self.redis, serializer=self.serializer, deserializer=self.deserializer) try: job.func = self.jobs[job.job_name] except KeyError: mod_name, func_name = job.job_name.split(':') __import__(mod_name) mod = sys.modules[mod_name] job.func = self.jobs[job.job_name] = getattr(mod, func_name) return True
[docs] def set_proc_title(self, title): """Sets the active process title, retains the retools prefic""" setproctitle('retools: ' + title)
[docs] def register_worker(self): """Register this worker with Redis""" pipeline = self.redis.pipeline() pipeline.sadd("retools:workers", self.worker_id) pipeline.set("retools:worker:%s:started" % self.worker_id, time.time()) pipeline.set("retools:worker:%s:queues" % self.worker_id, self.queue_names) pipeline.execute()
[docs] def unregister_worker(self, worker_id=None): """Unregister this worker with Redis""" worker_id = worker_id or self.worker_id pipeline = self.redis.pipeline() pipeline.srem("retools:workers", worker_id) pipeline.delete("retools:worker:%s" % worker_id) pipeline.delete("retools:worker:%s:started" % worker_id) pipeline.delete("retools:worker:%s:queues" % worker_id) pipeline.execute()
[docs] def startup(self): """Runs basic startup tasks""" self.register_signal_handlers() self.prune_dead_workers() self.register_worker()
[docs] def trigger_shutdown(self, *args): """Graceful shutdown of the worker""" self.shutdown = True
[docs] def immediate_shutdown(self, *args): """Immediately shutdown the worker, kill child process if needed""" self.shutdown = True self.kill_child()
[docs] def kill_child(self, *args): """Kill the child process immediately""" if self.child_id: os.kill(self.child_id, signal.SIGTERM)
[docs] def pause_processing(self, *args): """Cease pulling jobs off the queue for processing""" self.paused = True
[docs] def resume_processing(self, *args): """Resume pulling jobs for processing off the queue""" self.paused = False
[docs] def prune_dead_workers(self): """Prune dead workers from Redis""" all_workers = self.redis.smembers("retools:workers") known_workers = self.worker_pids() hostname = socket.gethostname() for worker in all_workers: host, pid, queues = worker.split(':') if host != hostname or pid in known_workers: continue self.unregister_worker(worker)
[docs] def register_signal_handlers(self): """Setup all the signal handlers""" signal.signal(signal.SIGTERM, self.immediate_shutdown) signal.signal(signal.SIGINT, self.immediate_shutdown) signal.signal(signal.SIGQUIT, self.trigger_shutdown) signal.signal(signal.SIGUSR1, self.kill_child) signal.signal(signal.SIGUSR2, self.pause_processing) signal.signal(signal.SIGCONT, self.resume_processing)
[docs] def working_on(self): """Indicate with Redis what we're working on""" data = { 'queue': self.job.queue_name, 'run_at': time.time(), 'payload': self.job.payload } self.redis.set("retools:worker:%s" % self.worker_id, self.serializer(data))
[docs] def done_working(self): """Called when we're done working on a job""" self.redis.delete("retools:worker:%s" % self.worker_id)
[docs] def worker_pids(self): """Returns a list of all the worker processes""" ps = subprocess.Popen("ps -U 0 -A | grep 'retools:'", shell=True, stdout=subprocess.PIPE) data = ps.stdout.read() ps.stdout.close() ps.wait() return [x.split()[0] for x in data.split('\n') if x]
[docs] def perform(self): """Run the job and call the appropriate signal handlers""" self.job.perform()
def run_worker(): usage = "usage: %prog queues" parser = OptionParser(usage=usage) parser.add_option("--interval", dest="interval", type="int", default=5, help="Polling interval") parser.add_option("-b", dest="blocking", action="store_true", default=False, help="Whether to use blocking queue semantics") (options, args) = parser.parse_args() if len(args) < 1: sys.exit("Error: Failed to provide queues or packages_to_scan args") worker = Worker(queues=args[0].split(',')) worker.work(interval=options.interval, blocking=options.blocking) sys.exit()