|
|
@ -4,12 +4,30 @@ from pickle import loads, dumps
|
|
|
|
from .proxy import conn
|
|
|
|
from .proxy import conn
|
|
|
|
|
|
|
|
|
|
|
|
class DelayedResult(object):
|
|
|
|
class DelayedResult(object):
|
|
|
|
|
|
|
|
"""Proxy object that is returned as a result of `Queue.enqueue()` calls.
|
|
|
|
|
|
|
|
Instances of DelayedResult can be polled for their return values.
|
|
|
|
|
|
|
|
"""
|
|
|
|
def __init__(self, key):
|
|
|
|
def __init__(self, key):
|
|
|
|
self.key = key
|
|
|
|
self.key = key
|
|
|
|
self._rv = None
|
|
|
|
self._rv = None
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def return_value(self):
|
|
|
|
def return_value(self):
|
|
|
|
|
|
|
|
"""Returns the return value of the job.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Initially, right after enqueueing a job, the return value will be None.
|
|
|
|
|
|
|
|
But when the job has been executed, and had a return value or exception,
|
|
|
|
|
|
|
|
this will return that value or exception.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Note that, when the job has no return value (i.e. returns None), the
|
|
|
|
|
|
|
|
DelayedResult object is useless, as the result won't be written back to
|
|
|
|
|
|
|
|
Redis.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Also note that you cannot draw the conclusion that a job has _not_ been
|
|
|
|
|
|
|
|
executed when its return value is None, since return values written back
|
|
|
|
|
|
|
|
to Redis will expire after a given amount of time (500 seconds by
|
|
|
|
|
|
|
|
default).
|
|
|
|
|
|
|
|
"""
|
|
|
|
if self._rv is None:
|
|
|
|
if self._rv is None:
|
|
|
|
rv = conn.get(self.key)
|
|
|
|
rv = conn.get(self.key)
|
|
|
|
if rv is not None:
|
|
|
|
if rv is not None:
|
|
|
@ -25,6 +43,7 @@ class Job(object):
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@classmethod
|
|
|
|
def unpickle(cls, pickle_data):
|
|
|
|
def unpickle(cls, pickle_data):
|
|
|
|
|
|
|
|
"""Constructs a Job instance form the given pickle'd job tuple data."""
|
|
|
|
job_tuple = loads(pickle_data)
|
|
|
|
job_tuple = loads(pickle_data)
|
|
|
|
return Job(job_tuple)
|
|
|
|
return Job(job_tuple)
|
|
|
|
|
|
|
|
|
|
|
@ -68,21 +87,30 @@ class Queue(object):
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def key(self):
|
|
|
|
def key(self):
|
|
|
|
|
|
|
|
"""Returns the Redis key for this Queue."""
|
|
|
|
return self._key
|
|
|
|
return self._key
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def empty(self):
|
|
|
|
def empty(self):
|
|
|
|
|
|
|
|
"""Returns whether the current queue is empty."""
|
|
|
|
return self.count == 0
|
|
|
|
return self.count == 0
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def messages(self):
|
|
|
|
def messages(self):
|
|
|
|
|
|
|
|
"""Returns a list of all messages in the queue."""
|
|
|
|
return conn.lrange(self.key, 0, -1)
|
|
|
|
return conn.lrange(self.key, 0, -1)
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def count(self):
|
|
|
|
def count(self):
|
|
|
|
|
|
|
|
"""Returns a count of all messages in the queue."""
|
|
|
|
return conn.llen(self.key)
|
|
|
|
return conn.llen(self.key)
|
|
|
|
|
|
|
|
|
|
|
|
def enqueue(self, f, *args, **kwargs):
|
|
|
|
def enqueue(self, f, *args, **kwargs):
|
|
|
|
|
|
|
|
"""Enqueues a function call for delayed execution.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Expects the function to call, along with the arguments and keyword
|
|
|
|
|
|
|
|
arguments.
|
|
|
|
|
|
|
|
"""
|
|
|
|
rv_key = 'rq:result:%s:%s' % (self.name, str(uuid.uuid4()))
|
|
|
|
rv_key = 'rq:result:%s:%s' % (self.name, str(uuid.uuid4()))
|
|
|
|
if f.__module__ == '__main__':
|
|
|
|
if f.__module__ == '__main__':
|
|
|
|
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
|
|
|
|
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
|
|
|
@ -91,6 +119,10 @@ class Queue(object):
|
|
|
|
return DelayedResult(rv_key)
|
|
|
|
return DelayedResult(rv_key)
|
|
|
|
|
|
|
|
|
|
|
|
def dequeue(self):
|
|
|
|
def dequeue(self):
|
|
|
|
|
|
|
|
"""Dequeues the function call at the front of this Queue.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns a Job instance, which can be executed or inspected.
|
|
|
|
|
|
|
|
"""
|
|
|
|
blob = conn.lpop(self.key)
|
|
|
|
blob = conn.lpop(self.key)
|
|
|
|
if blob is None:
|
|
|
|
if blob is None:
|
|
|
|
return None
|
|
|
|
return None
|
|
|
@ -114,6 +146,13 @@ class Queue(object):
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@classmethod
|
|
|
|
def dequeue_any(cls, queues, blocking):
|
|
|
|
def dequeue_any(cls, queues, blocking):
|
|
|
|
|
|
|
|
"""Class method returning the Job instance at the front of the given set
|
|
|
|
|
|
|
|
of Queues, where the order of the queues is important.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
When all of the Queues are empty, depending on the `blocking` argument,
|
|
|
|
|
|
|
|
either blocks execution of this function until new messages arrive on
|
|
|
|
|
|
|
|
any of the queues, or returns None.
|
|
|
|
|
|
|
|
"""
|
|
|
|
queue_keys = map(lambda q: q.key, queues)
|
|
|
|
queue_keys = map(lambda q: q.key, queues)
|
|
|
|
if blocking:
|
|
|
|
if blocking:
|
|
|
|
queue_key, blob = conn.blpop(queue_keys)
|
|
|
|
queue_key, blob = conn.blpop(queue_keys)
|
|
|
|