Queue worker and manager
The queueing functionality is new, and has gone through some preliminary testing. Please report any issues found on the retools Github issue tracker.
Any function that takes keyword arguments can be a job that a worker runs. The QueueManager handles configuration and enqueing jobs to be run.
# mypackage/jobs.py # jobs def default_job(): # do some basic thing def important(somearg=None): # do an important thing # event handlers def my_event_handler(sender, **kwargs): # do something def save_error(sender, **kwargs): # record error
from retools.queue import QueueManager qm = QueueManager() qm.subscriber('job_failure', handler='mypackage.jobs:save_error') qm.subscriber('job_postrun', 'mypackage.jobs:important', handler='mypackage.jobs:my_event_handler') qm.enqueue('mypackage.jobs:important', somearg='fred')
The events for a job are registered with the QueueManager and are encoded in the job’s JSON blob. Updating events for a job will therefore only take effect for new jobs queued, and not existing ones on the queue.
The retools queue has events available for additional functionality without having to subclass or directly extend retools. These functions will be run by the worker when the job is handled.
Available events to register for:
- job_prerun: Runs immediately before the job is run.
- job_wrapper: Wraps the execution of the job, these should be context managers.
- job_postrun: Runs after the job completes successfully, this will not be run if the job throws an exception.
- job_failure: Runs when a job throws an exception.
Event Function Signatures¶
Event functions have different call semantics, the following is a list of how the event functions will be called:
- job_prerun: (job=job_instance)
- job_wrapper: (job_function, job_instance, **job_keyword_arguments)
- job_postrun: (job=job_instance, result=job_function_result)
- job_failure: (job=job_instance, exc=job_exception)
Attributes of interest on the job instance are documented in the Job.__init__() method.
Running the Worker¶
After installing retools, a retools-worker command will be available that can spawn a worker. Queues to watch can be listed in order for priority queueing, in which case the worker will try each queue in order looking for jobs to process.
$ retools-worker high,medium,main
Public API Classes¶
- class retools.queue.QueueManager(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
Configures and enqueues jobs
- __init__(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
Initialize a QueueManager
- redis – A Redis instance. Defaults to the redis instance on the global_connection.
- default_queue_name – The default queue name. Defaults to ‘main’.
- serializer – A callable to serialize json data, defaults to json.dumps().
- deserializer – A callable to deserialize json data, defaults to json.loads().
- enqueue(job, **kwargs)¶
Enqueue a job
- job – The pkg_resouce name of the function. I.e. retools.jobs:my_function
- kwargs – Keyword arguments the job should be called with. These arguments must be serializeable by JSON.
The job id that was queued.
- set_queue_for_job(job_name, queue_name)¶
Set the queue that a given job name will go to
- job_name – The pkg_resource name of the job function. I.e. retools.jobs:my_function
- queue_name – Name of the queue on Redis job payloads should go to
- subscriber(event, job=None, handler=None)¶
Set events for a specific job or for all jobs
- event – The name of the event to subscribe to.
- job – Optional, a specific job to bind to.
- handler – The location of the handler to call.
Private API Classes¶
- class retools.queue.Job(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
- __init__(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
Create a job instance given a JSON job payload
- job_payload – A JSON string representing a job.
- queue_name – The queue this job was pulled off of.
- redis – The redis instance used to pull this job.
A Job instance is created when the Worker pulls a job payload off the queue. The current_job global is set upon creation to indicate the current job being processed.
Attributes of interest for event functions:
- job_id: The Job’s ID
- job_name: The Job’s name (it’s package + function name)
- queue_name: The queue this job came from
- kwargs: The keyword arguments the job is called with
- state: The state dict, this can be used by events to retain additional arguments. I.e. for a retry extension, retry information can be stored in the state dict.
- func: A reference to the job function
- redis: A redis.Redis instance.
- serializer: A callable to serialize json data, defaults to json.dumps().
- deserializer: A callable to deserialize json data, defaults to json.loads().
Queue this job in Redis
- static load_events(event_dict)¶
Load all the events given the references
Parameters: event_dict – A dictionary of events keyed by event name to a list of handlers for the event.
Runs the job calling all the job signals as appropriate
- run_event(event, **kwargs)¶
Run all registered events for this job
- class retools.queue.Worker(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
A Worker works on jobs
- __init__(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)¶
Create a worker
- queues (list) – List of queues to process
- redis – Redis instance to use, defaults to the global_connection.
In the event that there is only a single queue in the list Redis list blocking will be used for lower latency job processing
Called when we’re done working on a job
Immediately shutdown the worker, kill child process if needed
Kill the child process immediately
Cease pulling jobs off the queue for processing
Run the job and call the appropriate signal handlers
Prune dead workers from Redis
Setup all the signal handlers
Register this worker with Redis
- reserve(interval, blocking)¶
Attempts to pull a job off the queue(s)
Resume pulling jobs for processing off the queue
Sets the active process title, retains the retools prefic
Runs basic startup tasks
Graceful shutdown of the worker
Unregister this worker with Redis
- work(interval=5, blocking=False)¶
Work on jobs
This is the main method of the Worker, and will register itself with Redis as a Worker, wait for jobs, then process them.
- interval (int) – Time in seconds between polling.
- blocking (bool) – Whether or not blocking pop should be used. If the blocking pop is used, then the worker will block for interval seconds at a time waiting for a new job. This affects how often the worker can respond to signals.
Returns this workers id based on hostname, pid, queues
Returns a list of all the worker processes
Indicate with Redis what we’re working on