retools.queue

Queue worker and manager

Note

The queueing functionality is new, and has gone through some preliminary testing. Please report any issues found on the retools Github issue tracker.

Any function that takes keyword arguments can be a job that a worker runs. The QueueManager handles configuration and enqueing jobs to be run.

Declaring jobs:

# mypackage/jobs.py

# jobs

def default_job():
    # do some basic thing

def important(somearg=None):
    # do an important thing

# event handlers

def my_event_handler(sender, **kwargs):
    # do something

def save_error(sender, **kwargs):
    # record error

Running Jobs:

from retools.queue import QueueManager

qm = QueueManager()
qm.subscriber('job_failure', handler='mypackage.jobs:save_error')
qm.subscriber('job_postrun', 'mypackage.jobs:important',
              handler='mypackage.jobs:my_event_handler')
qm.enqueue('mypackage.jobs:important', somearg='fred')

Note

The events for a job are registered with the QueueManager and are encoded in the job’s JSON blob. Updating events for a job will therefore only take effect for new jobs queued, and not existing ones on the queue.

Events

The retools queue has events available for additional functionality without having to subclass or directly extend retools. These functions will be run by the worker when the job is handled.

Available events to register for:

  • job_prerun: Runs immediately before the job is run.
  • job_wrapper: Wraps the execution of the job, these should be context managers.
  • job_postrun: Runs after the job completes successfully, this will not be run if the job throws an exception.
  • job_failure: Runs when a job throws an exception.

Event Function Signatures

Event functions have different call semantics, the following is a list of how the event functions will be called:

  • job_prerun: (job=job_instance)
  • job_wrapper: (job_function, job_instance, **job_keyword_arguments)
  • job_postrun: (job=job_instance, result=job_function_result)
  • job_failure: (job=job_instance, exc=job_exception)

Attributes of interest on the job instance are documented in the Job.__init__() method.

Running the Worker

After installing retools, a retools-worker command will be available that can spawn a worker. Queues to watch can be listed in order for priority queueing, in which case the worker will try each queue in order looking for jobs to process.

Example invocation:

$ retools-worker high,medium,main

Public API Classes

class retools.queue.QueueManager(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]

Configures and enqueues jobs

__init__(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]

Initialize a QueueManager

Parameters:
  • redis – A Redis instance. Defaults to the redis instance on the global_connection.
  • default_queue_name – The default queue name. Defaults to ‘main’.
  • serializer – A callable to serialize json data, defaults to json.dumps().
  • deserializer – A callable to deserialize json data, defaults to json.loads().
enqueue(job, **kwargs)[source]

Enqueue a job

Parameters:
  • job – The pkg_resouce name of the function. I.e. retools.jobs:my_function
  • kwargs – Keyword arguments the job should be called with. These arguments must be serializeable by JSON.
Returns:

The job id that was queued.

set_queue_for_job(job_name, queue_name)[source]

Set the queue that a given job name will go to

Parameters:
  • job_name – The pkg_resource name of the job function. I.e. retools.jobs:my_function
  • queue_name – Name of the queue on Redis job payloads should go to
subscriber(event, job=None, handler=None)[source]

Set events for a specific job or for all jobs

Parameters:
  • event – The name of the event to subscribe to.
  • job – Optional, a specific job to bind to.
  • handler – The location of the handler to call.

Private API Classes

class retools.queue.Job(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]
__init__(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]

Create a job instance given a JSON job payload

Parameters:
  • job_payload – A JSON string representing a job.
  • queue_name – The queue this job was pulled off of.
  • redis – The redis instance used to pull this job.

A Job instance is created when the Worker pulls a job payload off the queue. The current_job global is set upon creation to indicate the current job being processed.

Attributes of interest for event functions:

  • job_id: The Job’s ID
  • job_name: The Job’s name (it’s package + function name)
  • queue_name: The queue this job came from
  • kwargs: The keyword arguments the job is called with
  • state: The state dict, this can be used by events to retain additional arguments. I.e. for a retry extension, retry information can be stored in the state dict.
  • func: A reference to the job function
  • redis: A redis.Redis instance.
  • serializer: A callable to serialize json data, defaults to json.dumps().
  • deserializer: A callable to deserialize json data, defaults to json.loads().
enqueue()[source]

Queue this job in Redis

static load_events(event_dict)[source]

Load all the events given the references

Parameters:event_dict – A dictionary of events keyed by event name to a list of handlers for the event.
perform()[source]

Runs the job calling all the job signals as appropriate

run_event(event, **kwargs)[source]

Run all registered events for this job

class retools.queue.Worker(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]

A Worker works on jobs

__init__(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]

Create a worker

Parameters:
  • queues (list) – List of queues to process
  • redis – Redis instance to use, defaults to the global_connection.

In the event that there is only a single queue in the list Redis list blocking will be used for lower latency job processing

done_working()[source]

Called when we’re done working on a job

immediate_shutdown(*args)[source]

Immediately shutdown the worker, kill child process if needed

kill_child(*args)[source]

Kill the child process immediately

pause_processing(*args)[source]

Cease pulling jobs off the queue for processing

perform()[source]

Run the job and call the appropriate signal handlers

prune_dead_workers()[source]

Prune dead workers from Redis

register_signal_handlers()[source]

Setup all the signal handlers

register_worker()[source]

Register this worker with Redis

reserve(interval, blocking)[source]

Attempts to pull a job off the queue(s)

resume_processing(*args)[source]

Resume pulling jobs for processing off the queue

set_proc_title(title)[source]

Sets the active process title, retains the retools prefic

startup()[source]

Runs basic startup tasks

trigger_shutdown(*args)[source]

Graceful shutdown of the worker

unregister_worker(worker_id=None)[source]

Unregister this worker with Redis

work(interval=5, blocking=False)[source]

Work on jobs

This is the main method of the Worker, and will register itself with Redis as a Worker, wait for jobs, then process them.

Parameters:
  • interval (int) – Time in seconds between polling.
  • blocking (bool) – Whether or not blocking pop should be used. If the blocking pop is used, then the worker will block for interval seconds at a time waiting for a new job. This affects how often the worker can respond to signals.
worker_id[source]

Returns this workers id based on hostname, pid, queues

worker_pids()[source]

Returns a list of all the worker processes

working_on()[source]

Indicate with Redis what we’re working on