retools.queue¶
Queue worker and manager
Note
The queueing functionality is new, and has gone through some preliminary testing. Please report any issues found on the retools Github issue tracker.
Any function that takes keyword arguments can be a job that a worker runs. The QueueManager handles configuration and enqueing jobs to be run.
Declaring jobs:
# mypackage/jobs.py
# jobs
def default_job():
# do some basic thing
def important(somearg=None):
# do an important thing
# event handlers
def my_event_handler(sender, **kwargs):
# do something
def save_error(sender, **kwargs):
# record error
Running Jobs:
from retools.queue import QueueManager
qm = QueueManager()
qm.subscriber('job_failure', handler='mypackage.jobs:save_error')
qm.subscriber('job_postrun', 'mypackage.jobs:important',
handler='mypackage.jobs:my_event_handler')
qm.enqueue('mypackage.jobs:important', somearg='fred')
Note
The events for a job are registered with the QueueManager and are encoded in the job’s JSON blob. Updating events for a job will therefore only take effect for new jobs queued, and not existing ones on the queue.
Events¶
The retools queue has events available for additional functionality without having to subclass or directly extend retools. These functions will be run by the worker when the job is handled.
Available events to register for:
- job_prerun: Runs immediately before the job is run.
- job_wrapper: Wraps the execution of the job, these should be context managers.
- job_postrun: Runs after the job completes successfully, this will not be run if the job throws an exception.
- job_failure: Runs when a job throws an exception.
Event Function Signatures¶
Event functions have different call semantics, the following is a list of how the event functions will be called:
- job_prerun: (job=job_instance)
- job_wrapper: (job_function, job_instance, **job_keyword_arguments)
- job_postrun: (job=job_instance, result=job_function_result)
- job_failure: (job=job_instance, exc=job_exception)
Attributes of interest on the job instance are documented in the Job.__init__() method.
Running the Worker¶
After installing retools, a retools-worker command will be available that can spawn a worker. Queues to watch can be listed in order for priority queueing, in which case the worker will try each queue in order looking for jobs to process.
Example invocation:
$ retools-worker high,medium,main
Public API Classes¶
- class retools.queue.QueueManager(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
Configures and enqueues jobs
- __init__(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
Initialize a QueueManager
Parameters: - redis – A Redis instance. Defaults to the redis instance on the global_connection.
- default_queue_name – The default queue name. Defaults to ‘main’.
- serializer – A callable to serialize json data, defaults to json.dumps().
- deserializer – A callable to deserialize json data, defaults to json.loads().
- enqueue(job, **kwargs)[source]¶
Enqueue a job
Parameters: - job – The pkg_resouce name of the function. I.e. retools.jobs:my_function
- kwargs – Keyword arguments the job should be called with. These arguments must be serializeable by JSON.
Returns: The job id that was queued.
Private API Classes¶
- class retools.queue.Job(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
- __init__(queue_name, job_payload, redis, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
Create a job instance given a JSON job payload
Parameters: - job_payload – A JSON string representing a job.
- queue_name – The queue this job was pulled off of.
- redis – The redis instance used to pull this job.
A Job instance is created when the Worker pulls a job payload off the queue. The current_job global is set upon creation to indicate the current job being processed.
Attributes of interest for event functions:
- job_id: The Job’s ID
- job_name: The Job’s name (it’s package + function name)
- queue_name: The queue this job came from
- kwargs: The keyword arguments the job is called with
- state: The state dict, this can be used by events to retain additional arguments. I.e. for a retry extension, retry information can be stored in the state dict.
- func: A reference to the job function
- redis: A redis.Redis instance.
- serializer: A callable to serialize json data, defaults to json.dumps().
- deserializer: A callable to deserialize json data, defaults to json.loads().
- class retools.queue.Worker(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
A Worker works on jobs
- __init__(queues, redis=None, serializer=<function dumps at 0x7fba8447aed8>, deserializer=<function loads at 0x7fba8447ae60>)[source]¶
Create a worker
Parameters: - queues (list) – List of queues to process
- redis – Redis instance to use, defaults to the global_connection.
In the event that there is only a single queue in the list Redis list blocking will be used for lower latency job processing
- work(interval=5, blocking=False)[source]¶
Work on jobs
This is the main method of the Worker, and will register itself with Redis as a Worker, wait for jobs, then process them.
Parameters: - interval (int) – Time in seconds between polling.
- blocking (bool) – Whether or not blocking pop should be used. If the blocking pop is used, then the worker will block for interval seconds at a time waiting for a new job. This affects how often the worker can respond to signals.