Merge branch 'rewrite-data-model'

main
Vincent Driessen 13 years ago
commit c4553f2a22

@ -18,10 +18,7 @@ def main():
use_redis() use_redis()
#funcs = filter(lambda s: not s.startswith('_'), dir(rq.dummy)) queues = ('default', 'high', 'low')
#print(funcs)
queues = ('default', 'high', 'normal', 'low')
sample_calls = [ sample_calls = [
(dummy.do_nothing, [], {}), (dummy.do_nothing, [], {}),
@ -30,6 +27,10 @@ def main():
(dummy.do_nothing, [], {}), (dummy.do_nothing, [], {}),
(dummy.do_nothing, [], {}), (dummy.do_nothing, [], {}),
(dummy.sleep, [1], {}), (dummy.sleep, [1], {}),
(dummy.fib, [8], {}), # normal result
(dummy.fib, [24], {}), # takes pretty long
(dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc
(dummy.fib, [30], {}), # takes long, then crashes
] ]
for i in range(opts.count): for i in range(opts.count):

@ -21,6 +21,12 @@ def endless_loop():
def div_by_zero(): def div_by_zero():
1/0 1/0
def fib(n):
if n <= 1:
return 1
else:
return fib(n-2) + fib(n-1)
def yield_stuff(): def yield_stuff():
yield 7 yield 7
yield 'foo' yield 'foo'

@ -1,3 +1,6 @@
class NoSuchJobError(Exception):
pass
class NoQueueError(Exception): class NoQueueError(Exception):
pass pass

@ -1,63 +1,212 @@
from datetime import datetime import times
from uuid import uuid4 from uuid import uuid4
from pickle import loads, dumps from pickle import loads, dumps
from .exceptions import UnpickleError from .proxy import conn
from .exceptions import UnpickleError, NoSuchJobError
def unpickle(pickled_string):
"""Unpickles a string, but raises a unified UnpickleError in case anything
fails.
This is a helper method to not have to deal with the fact that `loads()`
potentially raises many types of exceptions (e.g. AttributeError,
IndexError, TypeError, KeyError, etc.)
"""
try:
obj = loads(pickled_string)
except StandardError:
raise UnpickleError('Could not unpickle.', pickled_string)
return obj
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
# Job construction
@classmethod @classmethod
def unpickle(cls, pickle_data): def for_call(cls, func, *args, **kwargs):
"""Constructs a Job instance form the given pickle'd job tuple data.""" """Creates a new Job instance for the given function, arguments, and
try: keyword arguments.
unpickled_obj = loads(pickle_data) """
assert isinstance(unpickled_obj, Job) job = Job()
return unpickled_obj job._func = func
except (AssertionError, AttributeError, IndexError, TypeError, KeyError): job._args = args
raise UnpickleError('Could not unpickle Job.', pickle_data) job._kwargs = kwargs
job.description = job.get_call_string()
def __init__(self, func, *args, **kwargs): return job
self._id = unicode(uuid4())
self.func = func @property
self.args = args def func(self):
self.kwargs = kwargs return self._func
@property
def args(self):
return self._args
@property
def kwargs(self):
return self._kwargs
@classmethod
def fetch(cls, id):
"""Fetches a persisted job from its corresponding Redis key and
instantiates it.
"""
job = Job(id)
job.refresh()
return job
def __init__(self, id=None):
self._id = id
self.created_at = times.now()
self._func = None
self._args = None
self._kwargs = None
self.description = None
self.origin = None self.origin = None
self.created_at = datetime.utcnow()
self.enqueued_at = None self.enqueued_at = None
self.ended_at = None
self.result = None
self.exc_info = None self.exc_info = None
def pickle(self):
"""Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" # Data access
return dumps(self) def get_id(self):
"""The job ID for this job instance. Generates an ID lazily the
first time the ID is requested.
"""
if self._id is None:
self._id = unicode(uuid4())
return self._id
def set_id(self, value):
"""Sets a job ID for the given job."""
self._id = value
id = property(get_id, set_id)
@property @property
def rv_key(self): def key(self):
"""Returns the Redis key under which the Job's result will be stored, if applicable.""" """The Redis key that is used to store job data under."""
return 'rq:result:%s' % (self._id,) return 'rq:job:%s' % (self.id,)
@property @property
def id(self): def job_tuple(self):
"""Returns the Job's internal ID.""" """Returns the job tuple that encodes the actual function call that this job represents."""
return self._id return (self.func, self.args, self.kwargs)
@property
def return_value(self):
"""Returns the return value of the job.
Initially, right after enqueueing a job, the return value will be None.
But when the job has been executed, and had a return value or exception,
this will return that value or exception.
Note that, when the job has no return value (i.e. returns None), the
ReadOnlyJob object is useless, as the result won't be written back to
Redis.
Also note that you cannot draw the conclusion that a job has _not_ been
executed when its return value is None, since return values written back
to Redis will expire after a given amount of time (500 seconds by
default).
"""
if self._cached_result is None:
rv = conn.hget(self.key, 'result')
if rv is not None:
# cache the result
self._cached_result = loads(rv)
return self._cached_result
# Persistence
def refresh(self):
"""Overwrite the current instance's properties with the values in the
corresponding Redis key.
Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
key = self.key
properties = ['data', 'created_at', 'origin', 'description',
'enqueued_at', 'ended_at', 'result', 'exc_info']
data, created_at, origin, description, \
enqueued_at, ended_at, result, \
exc_info = conn.hmget(key, properties)
if data is None:
raise NoSuchJobError('No such job: %s' % (key,))
def to_date(date_str):
if date_str is None:
return None
else:
return times.to_universal(date_str)
self._func, self._args, self._kwargs = unpickle(data)
self.created_at = to_date(created_at)
self.origin = origin
self.description = description
self.enqueued_at = to_date(enqueued_at)
self.ended_at = to_date(ended_at)
self.result = result
self.exc_info = exc_info
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
obj = {}
obj['created_at'] = times.format(self.created_at, 'UTC')
if self.func is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self.result is not None:
obj['result'] = self.result
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
conn.hmset(key, obj)
# Job execution
def perform(self): def perform(self):
"""Invokes the job function with the job arguments. """Invokes the job function with the job arguments.
""" """
return self.func(*self.args, **self.kwargs) return self.func(*self.args, **self.kwargs)
@property
def call_string(self): # Representation
def get_call_string(self):
"""Returns a string representation of the call, formatted as a regular """Returns a string representation of the call, formatted as a regular
Python function invocation statement. Python function invocation statement.
""" """
arg_list = map(repr, self.args) if self.func is None:
arg_list += map(lambda tup: '%s=%r' % (tup[0], tup[1]), return None
self.kwargs.items())
return '%s(%s)' % (self.func.__name__, ', '.join(arg_list)) arg_list = [repr(arg) for arg in self.args]
arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()]
args = ', '.join(arg_list)
return '%s(%s)' % (self.func.__name__, args)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.call_string) return '<Job %s: %s>' % (self.id, self.description)
# Job equality
def __eq__(self, other): def __eq__(self, other):
return cmp(self.id, other.id) return self.id == other.id
def __hash__(self):
return hash(self.id)

@ -1,42 +1,12 @@
from datetime import datetime import times
from functools import total_ordering from functools import total_ordering
from pickle import loads
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
from .exceptions import UnpickleError from .exceptions import NoSuchJobError, UnpickleError
class DelayedResult(object): def compact(lst):
"""Proxy object that is returned as a result of `Queue.enqueue()` calls. return [item for item in lst if item is not None]
Instances of DelayedResult can be polled for their return values.
"""
def __init__(self, key):
self.key = key
self._rv = None
@property
def return_value(self):
"""Returns the return value of the job.
Initially, right after enqueueing a job, the return value will be None.
But when the job has been executed, and had a return value or exception,
this will return that value or exception.
Note that, when the job has no return value (i.e. returns None), the
DelayedResult object is useless, as the result won't be written back to
Redis.
Also note that you cannot draw the conclusion that a job has _not_ been
executed when its return value is None, since return values written back
to Redis will expire after a given amount of time (500 seconds by
default).
"""
if self._rv is None:
rv = conn.get(self.key)
if rv is not None:
# cache the result
self._rv = loads(rv)
return self._rv
@total_ordering @total_ordering
@ -72,20 +42,26 @@ class Queue(object):
"""Returns the Redis key for this Queue.""" """Returns the Redis key for this Queue."""
return self._key return self._key
@property def is_empty(self):
def empty(self):
"""Returns whether the current queue is empty.""" """Returns whether the current queue is empty."""
return self.count == 0 return self.count == 0
@property @property
def messages(self): def job_ids(self):
"""Returns a list of all messages (pickled job data) in the queue.""" """Returns a list of all job IDS in the queue."""
return conn.lrange(self.key, 0, -1) return conn.lrange(self.key, 0, -1)
@property @property
def jobs(self): def jobs(self):
"""Returns a list of all jobs in the queue.""" """Returns a list of all (valid) jobs in the queue."""
return map(Job.unpickle, self.messages) def safe_fetch(job_id):
try:
job = Job.fetch(job_id)
except UnpickleError:
return None
return job
return compact([safe_fetch(job_id) for job_id in self.job_ids])
@property @property
def count(self): def count(self):
@ -93,68 +69,79 @@ class Queue(object):
return conn.llen(self.key) return conn.llen(self.key)
def _create_job(self, f, *args, **kwargs): def push_job_id(self, job_id):
"""Creates a Job instance for the given function call and attaches queue """Pushes a job ID on the corresponding Redis queue."""
meta data to it. conn.rpush(self.key, job_id)
"""
if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
job = Job(f, *args, **kwargs)
job.origin = self.name
return job
def _push(self, pickled_job):
"""Enqueues a pickled_job on the corresponding Redis queue."""
conn.rpush(self.key, pickled_job)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Enqueues a function call for delayed execution. """Creates a job to represent the delayed function call and enqueues it.
Expects the function to call, along with the arguments and keyword Expects the function to call, along with the arguments and keyword
arguments. arguments.
""" """
job = self._create_job(f, *args, **kwargs) if f.__module__ == '__main__':
job.enqueued_at = datetime.utcnow() raise ValueError('Functions from the __main__ module cannot be processed by workers.')
self._push(job.pickle())
return DelayedResult(job.rv_key) job = Job.for_call(f, *args, **kwargs)
return self.enqueue_job(job)
def enqueue_job(self, job):
"""Enqueues a job for delayed execution."""
job.origin = self.name
job.enqueued_at = times.now()
job.save()
self.push_job_id(job.id)
return job
def requeue(self, job): def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue.""" """Requeues an existing (typically a failed job) onto the queue."""
raise NotImplementedError('Implement this') raise NotImplementedError('Implement this')
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return conn.lpop(self.key)
@classmethod
def lpop(cls, queue_keys, blocking):
"""Helper method. Intermediate method to abstract away from some Redis
API details, where LPOP accepts only a single key, whereas BLPOP accepts
multiple. So if we want the non-blocking LPOP, we need to iterate over
all queues, do individual LPOPs, and return the result.
Until Redis receives a specific method for this, we'll have to wrap it
this way.
"""
if blocking:
queue_key, job_id = conn.blpop(queue_keys)
return queue_key, job_id
else:
for queue_key in queue_keys:
blob = conn.lpop(queue_key)
if blob is not None:
return queue_key, blob
return None
def dequeue(self): def dequeue(self):
"""Dequeues the function call at the front of this Queue. """Dequeues the front-most job from this queue.
Returns a Job instance, which can be executed or inspected. Returns a Job instance, which can be executed or inspected.
""" """
blob = conn.lpop(self.key) job_id = self.pop_job_id()
if blob is None: if job_id is None:
return None return None
try: try:
job = Job.unpickle(blob) job = Job.fetch(job_id)
except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively
return self.dequeue()
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting
e.queue = self e.queue = self
raise e raise e
job.origin = self
return job return job
@classmethod
def _lpop_any(cls, queue_keys):
"""Helper method. You should not call this directly.
Redis' BLPOP command takes multiple queue arguments, but LPOP can only
take a single queue. Therefore, we need to loop over all queues
manually, in order, and return None if no more work is available.
"""
for queue_key in queue_keys:
blob = conn.lpop(queue_key)
if blob is not None:
return (queue_key, blob)
return None
@classmethod @classmethod
def dequeue_any(cls, queues, blocking): def dequeue_any(cls, queues, blocking):
"""Class method returning the Job instance at the front of the given set """Class method returning the Job instance at the front of the given set
@ -164,25 +151,25 @@ class Queue(object):
either blocks execution of this function until new messages arrive on either blocks execution of this function until new messages arrive on
any of the queues, or returns None. any of the queues, or returns None.
""" """
queue_keys = map(lambda q: q.key, queues) queue_keys = [q.key for q in queues]
if blocking: result = cls.lpop(queue_keys, blocking)
queue_key, blob = conn.blpop(queue_keys) if result is None:
else: return None
redis_result = cls._lpop_any(queue_keys) queue_key, job_id = result
if redis_result is None:
return None
queue_key, blob = redis_result
queue = Queue.from_queue_key(queue_key) queue = Queue.from_queue_key(queue_key)
try: try:
job = Job.unpickle(blob) job = Job.fetch(job_id)
except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively
return cls.dequeue_any(queues, blocking)
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting
e.job_id = job_id
e.queue = queue e.queue = queue
raise e raise e
job.origin = queue return job, queue
return job
# Total ordering defition (the rest of the required Python methods are # Total ordering defition (the rest of the required Python methods are

@ -1,12 +1,13 @@
import sys import sys
import os import os
import errno import errno
import datetime
import random import random
import time import time
import times
import procname import procname
import socket import socket
import signal import signal
import traceback
from pickle import dumps from pickle import dumps
try: try:
from logbook import Logger from logbook import Logger
@ -15,8 +16,13 @@ except ImportError:
from logging import Logger from logging import Logger
from .queue import Queue from .queue import Queue
from .proxy import conn from .proxy import conn
from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
def iterable(x): def iterable(x):
return hasattr(x, '__iter__') return hasattr(x, '__iter__')
@ -233,13 +239,13 @@ class Worker(object):
Pops and performs all jobs on the current list of queues. When all Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` is True. queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed. The return value indicates whether any jobs were processed.
""" """
self._install_signal_handlers() self._install_signal_handlers()
did_work = False did_perform_work = False
self.register_birth() self.register_birth()
self.state = 'starting' self.state = 'starting'
try: try:
@ -249,32 +255,33 @@ class Worker(object):
break break
self.state = 'idle' self.state = 'idle'
qnames = self.queue_names() qnames = self.queue_names()
self.procline('Listening on %s' % (','.join(qnames))) self.procline('Listening on %s' % ','.join(qnames))
self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) self.log.info('')
self.log.info('*** Listening on %s...' % (green(', '.join(qnames))))
wait_for_job = not burst wait_for_job = not burst
try: try:
job = Queue.dequeue_any(self.queues, wait_for_job) result = Queue.dequeue_any(self.queues, wait_for_job)
if result is None:
break
except UnpickleError as e: except UnpickleError as e:
self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,))
self.log.debug('Data follows:') self.log.debug('Data follows:')
self.log.debug(e.raw_data) self.log.debug(e.raw_data)
self.log.debug('End of unreadable data.') self.log.debug('End of unreadable data.')
self.failure_queue.push_job_id(e.job_id)
fq = self.failure_queue
fq._push(e.raw_data)
continue continue
if job is None: job, queue = result
break self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id))
self.state = 'busy'
self.state = 'busy'
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
did_work = True did_perform_work = True
finally: finally:
if not self.is_horse: if not self.is_horse:
self.register_death() self.register_death()
return did_work return did_perform_work
def fork_and_perform_job(self, job): def fork_and_perform_job(self, job):
child_pid = os.fork() child_pid = os.fork()
@ -282,12 +289,9 @@ class Worker(object):
self._is_horse = True self._is_horse = True
random.seed() random.seed()
self.log = Logger('horse') self.log = Logger('horse')
try:
self.perform_job(job) success = self.perform_job(job)
except Exception as e: sys.exit(int(not success))
self.log.exception(e)
sys.exit(1)
sys.exit(0)
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time())) self.procline('Forked %d at %d' % (child_pid, time.time()))
@ -308,29 +312,34 @@ class Worker(object):
def perform_job(self, job): def perform_job(self, job):
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
job.func.__name__, job.func.__name__,
job.origin.name, time.time())) job.origin, time.time()))
msg = 'Got job %s from %s' % (
job.call_string,
job.origin.name)
self.log.info(msg)
try: try:
rv = job.perform() rv = job.perform()
except Exception as e: except Exception as e:
rv = e
self.log.exception(e)
fq = self.failure_queue fq = self.failure_queue
self.log.exception(e)
self.log.warning('Moving job to %s queue.' % (fq.name,)) self.log.warning('Moving job to %s queue.' % (fq.name,))
job.ended_at = datetime.datetime.utcnow()
job.exc_info = e # Store the exception information...
fq._push(job.pickle()) job.ended_at = times.now()
job.exc_info = traceback.format_exc()
# ------ REFACTOR THIS -------------------------
job.save()
# ...and put the job on the failure queue
fq.push_job_id(job.id)
# ------ UNTIL HERE ----------------------------
# (should be as easy as fq.enqueue(job) or so)
return False
else: else:
if rv is not None: if rv is None:
self.log.info('Job result = %s' % (rv,)) self.log.info('Job OK')
else: else:
self.log.info('Job ended normally without result') self.log.info('Job OK, result = %s' % (yellow(rv),))
if rv is not None: if rv is not None:
p = conn.pipeline() p = conn.pipeline()
p.set(job.rv_key, dumps(rv)) p.set(job.result, dumps(rv))
p.expire(job.rv_key, self.rv_ttl) p.expire(job.result, self.rv_ttl)
p.execute() p.execute()
return True

@ -15,7 +15,7 @@ def get_version():
raise RuntimeError('No version info found.') raise RuntimeError('No version info found.')
def get_dependencies(): def get_dependencies():
deps = ['redis', 'procname'] deps = ['redis', 'procname', 'times']
deps += ['logbook'] # should be soft dependency? deps += ['logbook'] # should be soft dependency?
if sys.version_info < (2, 7) or \ if sys.version_info < (2, 7) or \
sys.version_info >= (3, 0) and sys.version_info < (3, 2): sys.version_info >= (3, 0) and sys.version_info < (3, 2):

@ -14,6 +14,18 @@ def failing_job(x):
return x / 0 return x / 0
def find_empty_redis_database():
"""Tries to connect to a random Redis database (starting from 4), and
will use/connect it when no keys are in there.
"""
for dbnum in range(4, 17):
testconn = Redis(db=dbnum)
empty = len(testconn.keys('*')) == 0
if empty:
return testconn
assert False, 'No empty Redis database found to run tests in.'
class RQTestCase(unittest.TestCase): class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ. """Base class to inherit test cases from for RQ.
@ -27,7 +39,7 @@ class RQTestCase(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Set up connection to Redis # Set up connection to Redis
testconn = Redis() testconn = find_empty_redis_database()
conn.push(testconn) conn.push(testconn)
# Store the connection (for sanity checking) # Store the connection (for sanity checking)
@ -53,12 +65,3 @@ class RQTestCase(unittest.TestCase):
testconn = conn.pop() testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
def assertQueueContains(self, queue, that_func):
# Do a queue scan (this is O(n), but we're in a test, so hey)
for job in queue.jobs:
if job.func == that_func:
return
self.fail('Queue %s does not contain message for function %s' %
(queue.key, that_func))

@ -0,0 +1,6 @@
import times
def strip_milliseconds(date):
return times.to_universal(times.format(date, 'UTC'))

@ -1,8 +1,10 @@
import times
from datetime import datetime
from tests import RQTestCase from tests import RQTestCase
from pickle import dumps, loads from tests.helpers import strip_milliseconds
from pickle import loads
from rq.job import Job from rq.job import Job
#from rq import Queue, Worker from rq.exceptions import NoSuchJobError, UnpickleError
from rq.exceptions import UnpickleError
def arbitrary_function(x, y, z=1): def arbitrary_function(x, y, z=1):
@ -10,35 +12,141 @@ def arbitrary_function(x, y, z=1):
class TestJob(RQTestCase): class TestJob(RQTestCase):
def test_create_job(self): def test_create_empty_job(self):
"""Creation of jobs.""" """Creation of new empty jobs."""
job = Job(arbitrary_function, 3, 4, z=2) job = Job()
# Jobs have a random UUID and a creation date
self.assertIsNotNone(job.id)
self.assertIsNotNone(job.created_at)
# ...and nothing else
self.assertIsNone(job.func, None)
self.assertIsNone(job.args, None)
self.assertIsNone(job.kwargs, None)
self.assertIsNone(job.origin, None)
self.assertIsNone(job.enqueued_at, None)
self.assertIsNone(job.ended_at, None)
self.assertIsNone(job.result, None)
self.assertIsNone(job.exc_info, None)
def test_create_typical_job(self):
"""Creation of jobs for function calls."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
# Jobs have a random UUID
self.assertIsNotNone(job.id)
self.assertIsNotNone(job.created_at)
self.assertIsNotNone(job.description)
# Job data is set...
self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.func, arbitrary_function)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2}) self.assertEquals(job.kwargs, {'z': 2})
# ...but metadata is not
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
self.assertIsNotNone(job.created_at)
self.assertIsNone(job.enqueued_at) self.assertIsNone(job.enqueued_at)
self.assertIsNotNone(job.rv_key) self.assertIsNone(job.result)
def test_save(self):
"""Storing jobs."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
# Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False)
job.save()
self.assertEquals(self.testconn.type(job.key), 'hash')
# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
self.assertEquals(unpickled_data[0], arbitrary_function)
def test_fetch(self):
"""Fetching jobs."""
# Prepare test
self.testconn.hset('rq:job:some_id', 'data', "(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.")
self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000")
# Fetch returns a job
job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func, arbitrary_function)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
def test_persistence_of_empty_jobs(self):
"""Storing empty jobs."""
job = Job()
job.save()
expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at')
self.assertEquals(
times.to_universal(stored_date),
expected_date)
# ... and no other keys are stored
self.assertItemsEqual(
self.testconn.hkeys(job.key),
['created_at'])
def test_persistence_of_typical_jobs(self):
"""Storing typical jobs."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at')
self.assertEquals(
times.to_universal(stored_date),
expected_date)
def test_pickle_job(self): # ... and no other keys are stored
"""Pickling of jobs.""" self.assertItemsEqual(
job = Job(arbitrary_function, 3, 4, z=2) self.testconn.hkeys(job.key),
job2 = loads(dumps(job)) ['created_at', 'data', 'description'])
def test_store_then_fetch(self):
job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
job2 = Job.fetch(job.id)
self.assertEquals(job.func, job2.func) self.assertEquals(job.func, job2.func)
self.assertEquals(job.args, job2.args) self.assertEquals(job.args, job2.args)
self.assertEquals(job.kwargs, job2.kwargs) self.assertEquals(job.kwargs, job2.kwargs)
def test_unpickle_errors(self): # Mathematical equation
"""Handling of unpickl'ing errors.""" self.assertEquals(job, job2)
with self.assertRaises(UnpickleError):
Job.unpickle('this is no pickle data')
with self.assertRaises(UnpickleError): def test_fetching_can_fail(self):
Job.unpickle(13) """Fetching fails for non-existing jobs."""
with self.assertRaises(NoSuchJobError):
Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0')
pickle_data = dumps(Job(arbitrary_function, 2, 3)) def test_fetching_unreadable_data(self):
corrupt_data = pickle_data.replace('arbitrary', 'b0rken') """Fetching fails on unreadable data."""
# Set up
job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
# Just replace the data hkey with some random noise
self.testconn.hset(job.key, 'data', 'this is no pickle string')
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
Job.unpickle(corrupt_data) job.refresh()
# Set up (part B)
job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
# Now slightly modify the job to make it unpickl'able (this is
# equivalent to a worker not having the most up-to-date source code and
# unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('arbitrary_function', 'broken')
self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError):
job.refresh()

@ -1,8 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests import testjob from tests import testjob
from pickle import dumps
from rq import Queue from rq import Queue
from rq.exceptions import UnpickleError from rq.job import Job
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
@ -31,36 +30,92 @@ class TestQueue(RQTestCase):
def test_queue_empty(self): def test_queue_empty(self):
"""Detecting empty queues.""" """Detecting empty queues."""
q = Queue('my-queue') q = Queue('example')
self.assertEquals(q.empty, True) self.assertEquals(q.is_empty(), True)
self.testconn.rpush('rq:queue:my-queue', 'some val') self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.empty, False) self.assertEquals(q.is_empty(), False)
def test_enqueue(self): def test_enqueue(self):
"""Putting work on queues.""" """Enqueueing job onto queues."""
q = Queue('my-queue') q = Queue()
self.assertEquals(q.empty, True) self.assertEquals(q.is_empty(), True)
# testjob spec holds which queue this is sent to # testjob spec holds which queue this is sent to
q.enqueue(testjob, 'Nick', foo='bar') job = q.enqueue(testjob, 'Nick', foo='bar')
self.assertEquals(q.empty, False) job_id = job.id
self.assertQueueContains(q, testjob)
# Inspect data inside Redis
q_key = 'rq:queue:default'
self.assertEquals(self.testconn.llen(q_key), 1)
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data."""
q = Queue()
job = Job.for_call(testjob, 'Nick', foo='bar')
# Preconditions
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at)
# Action
q.enqueue_job(job)
# Postconditions
self.assertEquals(job.origin, q.name)
self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self):
"""Popping job IDs from queues."""
# Set up
q = Queue()
uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
q.push_job_id(uuid)
# Pop it off the queue...
self.assertEquals(q.count, 1)
self.assertEquals(q.pop_job_id(), uuid)
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue(self): def test_dequeue(self):
"""Fetching work from specific queue.""" """Dequeueing jobs from queues."""
q = Queue('foo') # Set up
q.enqueue(testjob, 'Rick', foo='bar') q = Queue()
result = q.enqueue(testjob, 'Rick', foo='bar')
# Pull it off the queue (normally, a worker would do this) # Dequeue a job (not a job ID) off the queue
self.assertEquals(q.count, 1)
job = q.dequeue() job = q.dequeue()
self.assertEquals(job.id, result.id)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, q) self.assertEquals(job.origin, q.name)
self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar') self.assertEquals(job.kwargs['foo'], 'bar')
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue_ignores_nonexisting_jobs(self):
"""Dequeuing silently ignores non-existing jobs."""
q = Queue()
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
q.push_job_id(uuid)
result = q.enqueue(testjob, 'Nick', foo='bar')
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 4)
self.assertEquals(q.dequeue().id, result.id)
self.assertIsNone(q.dequeue())
self.assertEquals(q.count, 0)
def test_dequeue_any(self): def test_dequeue_any(self):
"""Fetching work from any given queue.""" """Fetching work from any given queue."""
fooq = Queue('foo') fooq = Queue('foo')
@ -70,50 +125,35 @@ class TestQueue(RQTestCase):
# Enqueue a single item # Enqueue a single item
barq.enqueue(testjob) barq.enqueue(testjob)
job = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(queue, barq)
# Enqueue items on both queues # Enqueue items on both queues
barq.enqueue(testjob, 'for Bar') barq.enqueue(testjob, 'for Bar')
fooq.enqueue(testjob, 'for Foo') fooq.enqueue(testjob, 'for Foo')
job = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(queue, fooq)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, fooq) self.assertEquals(job.origin, fooq.name)
self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.')
job = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(queue, barq)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, barq) self.assertEquals(job.origin, barq.name)
self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.')
def test_dequeue_unpicklable_data(self): def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Error handling of invalid pickle data.""" """Dequeuing (from any queue) silently ignores non-existing jobs."""
# Push non-pickle data on the queue
q = Queue('foo')
blob = 'this is nothing like pickled data'
self.testconn.rpush(q._key, blob)
with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing
# Push value pickle data, but not representing a job tuple
q = Queue('foo')
blob = dumps('this is pickled, but not a job tuple')
self.testconn.rpush(q._key, blob)
with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing
# Push slightly incorrect pickled data onto the queue (simulate
# a function that can't be imported from the worker)
q = Queue('foo')
job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) q = Queue('low')
blob = job_tuple.replace('testjob', 'fooobar') uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
self.testconn.rpush(q._key, blob) q.push_job_id(uuid)
with self.assertRaises(UnpickleError): # Dequeue simply ignores the missing job and returns None
q.dequeue() # error occurs when dequeue()'ing self.assertEquals(q.count, 1)
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None)
self.assertEquals(q.count, 0)

@ -1,5 +1,6 @@
from tests import RQTestCase from tests import RQTestCase
from tests import testjob, failing_job from tests import testjob, failing_job
from tests.helpers import strip_milliseconds
from rq import Queue, Worker from rq import Queue, Worker
from rq.job import Job from rq.job import Job
@ -21,7 +22,7 @@ class TestWorker(RQTestCase):
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
"""Worker processes unreadable job.""" """Unreadable jobs are put on the failure queue."""
q = Queue() q = Queue()
failure_q = Queue('failure') failure_q = Queue('failure')
@ -31,14 +32,15 @@ class TestWorker(RQTestCase):
# NOTE: We have to fake this enqueueing for this test case. # NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job(failing_job, 3) job = Job.for_call(failing_job, 3)
pickled_job = job.pickle() job.save()
invalid_data = pickled_job.replace( data = self.testconn.hget(job.key, 'data')
'failing_job', 'nonexisting_job') invalid_data = data.replace('failing_job', 'nonexisting_job')
self.testconn.hset(job.key, 'data', invalid_data)
# We use the low-level internal function to enqueue any data (bypassing # We use the low-level internal function to enqueue any data (bypassing
# validity checks) # validity checks)
q._push(invalid_data) q.push_job_id(job.id)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
@ -50,20 +52,33 @@ class TestWorker(RQTestCase):
self.assertEquals(failure_q.count, 1) self.assertEquals(failure_q.count, 1)
def test_work_fails(self): def test_work_fails(self):
"""Worker processes failing job.""" """Failing jobs are put on the failure queue."""
q = Queue() q = Queue()
failure_q = Queue('failure') failure_q = Queue('failure')
# Preconditions
self.assertEquals(failure_q.count, 0) self.assertEquals(failure_q.count, 0)
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
q.enqueue(failing_job) # Action
job = q.enqueue(failing_job)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
# keep for later
enqueued_at_date = strip_milliseconds(job.enqueued_at)
w = Worker([q]) w = Worker([q])
w.work(burst=True) # should silently pass w.work(burst=True) # should silently pass
self.assertEquals(q.count, 0)
# Postconditions
self.assertEquals(q.count, 0)
self.assertEquals(failure_q.count, 1) self.assertEquals(failure_q.count, 1)
# Check the job
job = Job.fetch(job.id)
self.assertEquals(job.origin, q.name)
# should be the original enqueued_at date, not the date of enqueueing to the failure queue
self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info

Loading…
Cancel
Save