diff --git a/bin/rqgenload b/bin/rqgenload index 674889c..7eb6f49 100755 --- a/bin/rqgenload +++ b/bin/rqgenload @@ -18,10 +18,7 @@ def main(): use_redis() - #funcs = filter(lambda s: not s.startswith('_'), dir(rq.dummy)) - #print(funcs) - - queues = ('default', 'high', 'normal', 'low') + queues = ('default', 'high', 'low') sample_calls = [ (dummy.do_nothing, [], {}), @@ -30,6 +27,10 @@ def main(): (dummy.do_nothing, [], {}), (dummy.do_nothing, [], {}), (dummy.sleep, [1], {}), + (dummy.fib, [8], {}), # normal result + (dummy.fib, [24], {}), # takes pretty long + (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc + (dummy.fib, [30], {}), # takes long, then crashes ] for i in range(opts.count): diff --git a/rq/dummy.py b/rq/dummy.py index f4b8dc2..b2ad055 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -21,6 +21,12 @@ def endless_loop(): def div_by_zero(): 1/0 +def fib(n): + if n <= 1: + return 1 + else: + return fib(n-2) + fib(n-1) + def yield_stuff(): yield 7 yield 'foo' diff --git a/rq/exceptions.py b/rq/exceptions.py index 0b885f7..813bcde 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,3 +1,6 @@ +class NoSuchJobError(Exception): + pass + class NoQueueError(Exception): pass diff --git a/rq/job.py b/rq/job.py index 0ae5bb1..386e594 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,63 +1,212 @@ -from datetime import datetime +import times from uuid import uuid4 from pickle import loads, dumps -from .exceptions import UnpickleError +from .proxy import conn +from .exceptions import UnpickleError, NoSuchJobError + + +def unpickle(pickled_string): + """Unpickles a string, but raises a unified UnpickleError in case anything + fails. + + This is a helper method to not have to deal with the fact that `loads()` + potentially raises many types of exceptions (e.g. AttributeError, + IndexError, TypeError, KeyError, etc.) + """ + try: + obj = loads(pickled_string) + except StandardError: + raise UnpickleError('Could not unpickle.', pickled_string) + return obj class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + + # Job construction @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - raise UnpickleError('Could not unpickle Job.', pickle_data) - - def __init__(self, func, *args, **kwargs): - self._id = unicode(uuid4()) - self.func = func - self.args = args - self.kwargs = kwargs + def for_call(cls, func, *args, **kwargs): + """Creates a new Job instance for the given function, arguments, and + keyword arguments. + """ + job = Job() + job._func = func + job._args = args + job._kwargs = kwargs + job.description = job.get_call_string() + return job + + @property + def func(self): + return self._func + + @property + def args(self): + return self._args + + @property + def kwargs(self): + return self._kwargs + + @classmethod + def fetch(cls, id): + """Fetches a persisted job from its corresponding Redis key and + instantiates it. + """ + job = Job(id) + job.refresh() + return job + + def __init__(self, id=None): + self._id = id + self.created_at = times.now() + self._func = None + self._args = None + self._kwargs = None + self.description = None self.origin = None - self.created_at = datetime.utcnow() self.enqueued_at = None + self.ended_at = None + self.result = None self.exc_info = None - def pickle(self): - """Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" - return dumps(self) + + # Data access + def get_id(self): + """The job ID for this job instance. Generates an ID lazily the + first time the ID is requested. + """ + if self._id is None: + self._id = unicode(uuid4()) + return self._id + + def set_id(self, value): + """Sets a job ID for the given job.""" + self._id = value + + id = property(get_id, set_id) @property - def rv_key(self): - """Returns the Redis key under which the Job's result will be stored, if applicable.""" - return 'rq:result:%s' % (self._id,) + def key(self): + """The Redis key that is used to store job data under.""" + return 'rq:job:%s' % (self.id,) + @property - def id(self): - """Returns the Job's internal ID.""" - return self._id + def job_tuple(self): + """Returns the job tuple that encodes the actual function call that this job represents.""" + return (self.func, self.args, self.kwargs) + @property + def return_value(self): + """Returns the return value of the job. + + Initially, right after enqueueing a job, the return value will be None. + But when the job has been executed, and had a return value or exception, + this will return that value or exception. + + Note that, when the job has no return value (i.e. returns None), the + ReadOnlyJob object is useless, as the result won't be written back to + Redis. + + Also note that you cannot draw the conclusion that a job has _not_ been + executed when its return value is None, since return values written back + to Redis will expire after a given amount of time (500 seconds by + default). + """ + if self._cached_result is None: + rv = conn.hget(self.key, 'result') + if rv is not None: + # cache the result + self._cached_result = loads(rv) + return self._cached_result + + + # Persistence + def refresh(self): + """Overwrite the current instance's properties with the values in the + corresponding Redis key. + + Will raise a NoSuchJobError if no corresponding Redis key exists. + """ + key = self.key + properties = ['data', 'created_at', 'origin', 'description', + 'enqueued_at', 'ended_at', 'result', 'exc_info'] + data, created_at, origin, description, \ + enqueued_at, ended_at, result, \ + exc_info = conn.hmget(key, properties) + if data is None: + raise NoSuchJobError('No such job: %s' % (key,)) + + def to_date(date_str): + if date_str is None: + return None + else: + return times.to_universal(date_str) + + self._func, self._args, self._kwargs = unpickle(data) + self.created_at = to_date(created_at) + self.origin = origin + self.description = description + self.enqueued_at = to_date(enqueued_at) + self.ended_at = to_date(ended_at) + self.result = result + self.exc_info = exc_info + + def save(self): + """Persists the current job instance to its corresponding Redis key.""" + key = self.key + + obj = {} + obj['created_at'] = times.format(self.created_at, 'UTC') + + if self.func is not None: + obj['data'] = dumps(self.job_tuple) + if self.origin is not None: + obj['origin'] = self.origin + if self.description is not None: + obj['description'] = self.description + if self.enqueued_at is not None: + obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC') + if self.ended_at is not None: + obj['ended_at'] = times.format(self.ended_at, 'UTC') + if self.result is not None: + obj['result'] = self.result + if self.exc_info is not None: + obj['exc_info'] = self.exc_info + + conn.hmset(key, obj) + + + # Job execution def perform(self): """Invokes the job function with the job arguments. """ return self.func(*self.args, **self.kwargs) - @property - def call_string(self): + + # Representation + def get_call_string(self): """Returns a string representation of the call, formatted as a regular Python function invocation statement. """ - arg_list = map(repr, self.args) - arg_list += map(lambda tup: '%s=%r' % (tup[0], tup[1]), - self.kwargs.items()) - return '%s(%s)' % (self.func.__name__, ', '.join(arg_list)) + if self.func is None: + return None + + arg_list = [repr(arg) for arg in self.args] + arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] + args = ', '.join(arg_list) + return '%s(%s)' % (self.func.__name__, args) def __str__(self): - return '' % (self.id, self.call_string) + return '' % (self.id, self.description) + + # Job equality def __eq__(self, other): - return cmp(self.id, other.id) + return self.id == other.id + + def __hash__(self): + return hash(self.id) + diff --git a/rq/queue.py b/rq/queue.py index dbef8c0..efccff8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,42 +1,12 @@ -from datetime import datetime +import times from functools import total_ordering -from pickle import loads from .proxy import conn from .job import Job -from .exceptions import UnpickleError +from .exceptions import NoSuchJobError, UnpickleError -class DelayedResult(object): - """Proxy object that is returned as a result of `Queue.enqueue()` calls. - Instances of DelayedResult can be polled for their return values. - """ - def __init__(self, key): - self.key = key - self._rv = None - - @property - def return_value(self): - """Returns the return value of the job. - - Initially, right after enqueueing a job, the return value will be None. - But when the job has been executed, and had a return value or exception, - this will return that value or exception. - - Note that, when the job has no return value (i.e. returns None), the - DelayedResult object is useless, as the result won't be written back to - Redis. - - Also note that you cannot draw the conclusion that a job has _not_ been - executed when its return value is None, since return values written back - to Redis will expire after a given amount of time (500 seconds by - default). - """ - if self._rv is None: - rv = conn.get(self.key) - if rv is not None: - # cache the result - self._rv = loads(rv) - return self._rv +def compact(lst): + return [item for item in lst if item is not None] @total_ordering @@ -72,20 +42,26 @@ class Queue(object): """Returns the Redis key for this Queue.""" return self._key - @property - def empty(self): + def is_empty(self): """Returns whether the current queue is empty.""" return self.count == 0 @property - def messages(self): - """Returns a list of all messages (pickled job data) in the queue.""" + def job_ids(self): + """Returns a list of all job IDS in the queue.""" return conn.lrange(self.key, 0, -1) @property def jobs(self): - """Returns a list of all jobs in the queue.""" - return map(Job.unpickle, self.messages) + """Returns a list of all (valid) jobs in the queue.""" + def safe_fetch(job_id): + try: + job = Job.fetch(job_id) + except UnpickleError: + return None + return job + + return compact([safe_fetch(job_id) for job_id in self.job_ids]) @property def count(self): @@ -93,68 +69,79 @@ class Queue(object): return conn.llen(self.key) - def _create_job(self, f, *args, **kwargs): - """Creates a Job instance for the given function call and attaches queue - meta data to it. - """ - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - - job = Job(f, *args, **kwargs) - job.origin = self.name - return job - - def _push(self, pickled_job): - """Enqueues a pickled_job on the corresponding Redis queue.""" - conn.rpush(self.key, pickled_job) + def push_job_id(self, job_id): + """Pushes a job ID on the corresponding Redis queue.""" + conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): - """Enqueues a function call for delayed execution. + """Creates a job to represent the delayed function call and enqueues it. Expects the function to call, along with the arguments and keyword arguments. """ - job = self._create_job(f, *args, **kwargs) - job.enqueued_at = datetime.utcnow() - self._push(job.pickle()) - return DelayedResult(job.rv_key) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + + job = Job.for_call(f, *args, **kwargs) + return self.enqueue_job(job) + + def enqueue_job(self, job): + """Enqueues a job for delayed execution.""" + job.origin = self.name + job.enqueued_at = times.now() + job.save() + self.push_job_id(job.id) + return job def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" raise NotImplementedError('Implement this') + def pop_job_id(self): + """Pops a given job ID from this Redis queue.""" + return conn.lpop(self.key) + + @classmethod + def lpop(cls, queue_keys, blocking): + """Helper method. Intermediate method to abstract away from some Redis + API details, where LPOP accepts only a single key, whereas BLPOP accepts + multiple. So if we want the non-blocking LPOP, we need to iterate over + all queues, do individual LPOPs, and return the result. + + Until Redis receives a specific method for this, we'll have to wrap it + this way. + """ + if blocking: + queue_key, job_id = conn.blpop(queue_keys) + return queue_key, job_id + else: + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return queue_key, blob + return None + def dequeue(self): - """Dequeues the function call at the front of this Queue. + """Dequeues the front-most job from this queue. Returns a Job instance, which can be executed or inspected. """ - blob = conn.lpop(self.key) - if blob is None: + job_id = self.pop_job_id() + if job_id is None: return None try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError as e: + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking itself recursively + return self.dequeue() except UnpickleError as e: # Attach queue information on the exception for improved error # reporting e.queue = self raise e - job.origin = self return job - @classmethod - def _lpop_any(cls, queue_keys): - """Helper method. You should not call this directly. - - Redis' BLPOP command takes multiple queue arguments, but LPOP can only - take a single queue. Therefore, we need to loop over all queues - manually, in order, and return None if no more work is available. - """ - for queue_key in queue_keys: - blob = conn.lpop(queue_key) - if blob is not None: - return (queue_key, blob) - return None - @classmethod def dequeue_any(cls, queues, blocking): """Class method returning the Job instance at the front of the given set @@ -164,25 +151,25 @@ class Queue(object): either blocks execution of this function until new messages arrive on any of the queues, or returns None. """ - queue_keys = map(lambda q: q.key, queues) - if blocking: - queue_key, blob = conn.blpop(queue_keys) - else: - redis_result = cls._lpop_any(queue_keys) - if redis_result is None: - return None - queue_key, blob = redis_result - + queue_keys = [q.key for q in queues] + result = cls.lpop(queue_keys, blocking) + if result is None: + return None + queue_key, job_id = result queue = Queue.from_queue_key(queue_key) try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError: + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking the same function recursively + return cls.dequeue_any(queues, blocking) except UnpickleError as e: # Attach queue information on the exception for improved error # reporting + e.job_id = job_id e.queue = queue raise e - job.origin = queue - return job + return job, queue # Total ordering defition (the rest of the required Python methods are diff --git a/rq/worker.py b/rq/worker.py index b2e6484..a6bbd51 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,12 +1,13 @@ import sys import os import errno -import datetime import random import time +import times import procname import socket import signal +import traceback from pickle import dumps try: from logbook import Logger @@ -15,8 +16,13 @@ except ImportError: from logging import Logger from .queue import Queue from .proxy import conn +from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +green = make_colorizer('darkgreen') +yellow = make_colorizer('darkyellow') +blue = make_colorizer('darkblue') + def iterable(x): return hasattr(x, '__iter__') @@ -233,13 +239,13 @@ class Worker(object): Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the - queues, unless `burst` is True. + queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ self._install_signal_handlers() - did_work = False + did_perform_work = False self.register_birth() self.state = 'starting' try: @@ -249,32 +255,33 @@ class Worker(object): break self.state = 'idle' qnames = self.queue_names() - self.procline('Listening on %s' % (','.join(qnames))) - self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) + self.procline('Listening on %s' % ','.join(qnames)) + self.log.info('') + self.log.info('*** Listening on %s...' % (green(', '.join(qnames)))) wait_for_job = not burst try: - job = Queue.dequeue_any(self.queues, wait_for_job) + result = Queue.dequeue_any(self.queues, wait_for_job) + if result is None: + break except UnpickleError as e: self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') - - fq = self.failure_queue - fq._push(e.raw_data) + self.failure_queue.push_job_id(e.job_id) continue - if job is None: - break - self.state = 'busy' + job, queue = result + self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) + self.state = 'busy' self.fork_and_perform_job(job) - did_work = True + did_perform_work = True finally: if not self.is_horse: self.register_death() - return did_work + return did_perform_work def fork_and_perform_job(self, job): child_pid = os.fork() @@ -282,12 +289,9 @@ class Worker(object): self._is_horse = True random.seed() self.log = Logger('horse') - try: - self.perform_job(job) - except Exception as e: - self.log.exception(e) - sys.exit(1) - sys.exit(0) + + success = self.perform_job(job) + sys.exit(int(not success)) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -308,29 +312,34 @@ class Worker(object): def perform_job(self, job): self.procline('Processing %s from %s since %s' % ( job.func.__name__, - job.origin.name, time.time())) - msg = 'Got job %s from %s' % ( - job.call_string, - job.origin.name) - self.log.info(msg) + job.origin, time.time())) try: rv = job.perform() except Exception as e: - rv = e - self.log.exception(e) - fq = self.failure_queue + self.log.exception(e) self.log.warning('Moving job to %s queue.' % (fq.name,)) - job.ended_at = datetime.datetime.utcnow() - job.exc_info = e - fq._push(job.pickle()) + + # Store the exception information... + job.ended_at = times.now() + job.exc_info = traceback.format_exc() + + # ------ REFACTOR THIS ------------------------- + job.save() + # ...and put the job on the failure queue + fq.push_job_id(job.id) + # ------ UNTIL HERE ---------------------------- + # (should be as easy as fq.enqueue(job) or so) + + return False else: - if rv is not None: - self.log.info('Job result = %s' % (rv,)) + if rv is None: + self.log.info('Job OK') else: - self.log.info('Job ended normally without result') + self.log.info('Job OK, result = %s' % (yellow(rv),)) if rv is not None: p = conn.pipeline() - p.set(job.rv_key, dumps(rv)) - p.expire(job.rv_key, self.rv_ttl) + p.set(job.result, dumps(rv)) + p.expire(job.result, self.rv_ttl) p.execute() + return True diff --git a/setup.py b/setup.py index 8dcee49..5263a04 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(): raise RuntimeError('No version info found.') def get_dependencies(): - deps = ['redis', 'procname'] + deps = ['redis', 'procname', 'times'] deps += ['logbook'] # should be soft dependency? if sys.version_info < (2, 7) or \ sys.version_info >= (3, 0) and sys.version_info < (3, 2): diff --git a/tests/__init__.py b/tests/__init__.py index 3109684..b833a91 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,6 +14,18 @@ def failing_job(x): return x / 0 +def find_empty_redis_database(): + """Tries to connect to a random Redis database (starting from 4), and + will use/connect it when no keys are in there. + """ + for dbnum in range(4, 17): + testconn = Redis(db=dbnum) + empty = len(testconn.keys('*')) == 0 + if empty: + return testconn + assert False, 'No empty Redis database found to run tests in.' + + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. @@ -27,7 +39,7 @@ class RQTestCase(unittest.TestCase): @classmethod def setUpClass(cls): # Set up connection to Redis - testconn = Redis() + testconn = find_empty_redis_database() conn.push(testconn) # Store the connection (for sanity checking) @@ -53,12 +65,3 @@ class RQTestCase(unittest.TestCase): testconn = conn.pop() assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' - - def assertQueueContains(self, queue, that_func): - # Do a queue scan (this is O(n), but we're in a test, so hey) - for job in queue.jobs: - if job.func == that_func: - return - self.fail('Queue %s does not contain message for function %s' % - (queue.key, that_func)) - diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..32d47ed --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,6 @@ +import times + + +def strip_milliseconds(date): + return times.to_universal(times.format(date, 'UTC')) + diff --git a/tests/test_job.py b/tests/test_job.py index de6c78e..c62c1d9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,8 +1,10 @@ +import times +from datetime import datetime from tests import RQTestCase -from pickle import dumps, loads +from tests.helpers import strip_milliseconds +from pickle import loads from rq.job import Job -#from rq import Queue, Worker -from rq.exceptions import UnpickleError +from rq.exceptions import NoSuchJobError, UnpickleError def arbitrary_function(x, y, z=1): @@ -10,35 +12,141 @@ def arbitrary_function(x, y, z=1): class TestJob(RQTestCase): - def test_create_job(self): - """Creation of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) + def test_create_empty_job(self): + """Creation of new empty jobs.""" + job = Job() + + # Jobs have a random UUID and a creation date + self.assertIsNotNone(job.id) + self.assertIsNotNone(job.created_at) + + # ...and nothing else + self.assertIsNone(job.func, None) + self.assertIsNone(job.args, None) + self.assertIsNone(job.kwargs, None) + self.assertIsNone(job.origin, None) + self.assertIsNone(job.enqueued_at, None) + self.assertIsNone(job.ended_at, None) + self.assertIsNone(job.result, None) + self.assertIsNone(job.exc_info, None) + + def test_create_typical_job(self): + """Creation of jobs for function calls.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + + # Jobs have a random UUID + self.assertIsNotNone(job.id) + self.assertIsNotNone(job.created_at) + self.assertIsNotNone(job.description) + + # Job data is set... self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, {'z': 2}) + + # ...but metadata is not self.assertIsNone(job.origin) - self.assertIsNotNone(job.created_at) self.assertIsNone(job.enqueued_at) - self.assertIsNotNone(job.rv_key) + self.assertIsNone(job.result) + + + def test_save(self): + """Storing jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + + # Saving creates a Redis hash + self.assertEquals(self.testconn.exists(job.key), False) + job.save() + self.assertEquals(self.testconn.type(job.key), 'hash') + + # Saving writes pickled job data + unpickled_data = loads(self.testconn.hget(job.key, 'data')) + self.assertEquals(unpickled_data[0], arbitrary_function) + + def test_fetch(self): + """Fetching jobs.""" + # Prepare test + self.testconn.hset('rq:job:some_id', 'data', "(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") + self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") + + # Fetch returns a job + job = Job.fetch('some_id') + self.assertEquals(job.id, 'some_id') + self.assertEquals(job.func, arbitrary_function) + self.assertEquals(job.args, (3, 4)) + self.assertEquals(job.kwargs, dict(z=2)) + self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) + + + def test_persistence_of_empty_jobs(self): + """Storing empty jobs.""" + job = Job() + job.save() + + expected_date = strip_milliseconds(job.created_at) + stored_date = self.testconn.hget(job.key, 'created_at') + self.assertEquals( + times.to_universal(stored_date), + expected_date) + + # ... and no other keys are stored + self.assertItemsEqual( + self.testconn.hkeys(job.key), + ['created_at']) + + def test_persistence_of_typical_jobs(self): + """Storing typical jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + expected_date = strip_milliseconds(job.created_at) + stored_date = self.testconn.hget(job.key, 'created_at') + self.assertEquals( + times.to_universal(stored_date), + expected_date) - def test_pickle_job(self): - """Pickling of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) - job2 = loads(dumps(job)) + # ... and no other keys are stored + self.assertItemsEqual( + self.testconn.hkeys(job.key), + ['created_at', 'data', 'description']) + + def test_store_then_fetch(self): + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + job2 = Job.fetch(job.id) self.assertEquals(job.func, job2.func) self.assertEquals(job.args, job2.args) self.assertEquals(job.kwargs, job2.kwargs) - def test_unpickle_errors(self): - """Handling of unpickl'ing errors.""" - with self.assertRaises(UnpickleError): - Job.unpickle('this is no pickle data') + # Mathematical equation + self.assertEquals(job, job2) - with self.assertRaises(UnpickleError): - Job.unpickle(13) + def test_fetching_can_fail(self): + """Fetching fails for non-existing jobs.""" + with self.assertRaises(NoSuchJobError): + Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') - pickle_data = dumps(Job(arbitrary_function, 2, 3)) - corrupt_data = pickle_data.replace('arbitrary', 'b0rken') + def test_fetching_unreadable_data(self): + """Fetching fails on unreadable data.""" + # Set up + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + # Just replace the data hkey with some random noise + self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): - Job.unpickle(corrupt_data) + job.refresh() + + # Set up (part B) + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Now slightly modify the job to make it unpickl'able (this is + # equivalent to a worker not having the most up-to-date source code and + # unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('arbitrary_function', 'broken') + self.testconn.hset(job.key, 'data', unimportable_data) + with self.assertRaises(UnpickleError): + job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index c7cbc2e..d1fa44e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,8 +1,7 @@ from tests import RQTestCase from tests import testjob -from pickle import dumps from rq import Queue -from rq.exceptions import UnpickleError +from rq.job import Job class TestQueue(RQTestCase): @@ -31,36 +30,92 @@ class TestQueue(RQTestCase): def test_queue_empty(self): """Detecting empty queues.""" - q = Queue('my-queue') - self.assertEquals(q.empty, True) + q = Queue('example') + self.assertEquals(q.is_empty(), True) - self.testconn.rpush('rq:queue:my-queue', 'some val') - self.assertEquals(q.empty, False) + self.testconn.rpush('rq:queue:example', 'sentinel message') + self.assertEquals(q.is_empty(), False) def test_enqueue(self): - """Putting work on queues.""" - q = Queue('my-queue') - self.assertEquals(q.empty, True) + """Enqueueing job onto queues.""" + q = Queue() + self.assertEquals(q.is_empty(), True) # testjob spec holds which queue this is sent to - q.enqueue(testjob, 'Nick', foo='bar') - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) + job = q.enqueue(testjob, 'Nick', foo='bar') + job_id = job.id + + # Inspect data inside Redis + q_key = 'rq:queue:default' + self.assertEquals(self.testconn.llen(q_key), 1) + self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + + def test_enqueue_sets_metadata(self): + """Enqueueing job onto queues modifies meta data.""" + q = Queue() + job = Job.for_call(testjob, 'Nick', foo='bar') + + # Preconditions + self.assertIsNone(job.origin) + self.assertIsNone(job.enqueued_at) + + # Action + q.enqueue_job(job) + + # Postconditions + self.assertEquals(job.origin, q.name) + self.assertIsNotNone(job.enqueued_at) + + + def test_pop_job_id(self): + """Popping job IDs from queues.""" + # Set up + q = Queue() + uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da' + q.push_job_id(uuid) + # Pop it off the queue... + self.assertEquals(q.count, 1) + self.assertEquals(q.pop_job_id(), uuid) + + # ...and assert the queue count when down + self.assertEquals(q.count, 0) def test_dequeue(self): - """Fetching work from specific queue.""" - q = Queue('foo') - q.enqueue(testjob, 'Rick', foo='bar') + """Dequeueing jobs from queues.""" + # Set up + q = Queue() + result = q.enqueue(testjob, 'Rick', foo='bar') - # Pull it off the queue (normally, a worker would do this) + # Dequeue a job (not a job ID) off the queue + self.assertEquals(q.count, 1) job = q.dequeue() + self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, q) + self.assertEquals(job.origin, q.name) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue_ignores_nonexisting_jobs(self): + """Dequeuing silently ignores non-existing jobs.""" + + q = Queue() + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) + q.push_job_id(uuid) + result = q.enqueue(testjob, 'Nick', foo='bar') + q.push_job_id(uuid) + + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 4) + self.assertEquals(q.dequeue().id, result.id) + self.assertIsNone(q.dequeue()) + self.assertEquals(q.count, 0) + def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -70,50 +125,35 @@ class TestQueue(RQTestCase): # Enqueue a single item barq.enqueue(testjob) - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) self.assertEquals(job.func, testjob) + self.assertEquals(queue, barq) # Enqueue items on both queues barq.enqueue(testjob, 'for Bar') fooq.enqueue(testjob, 'for Foo') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, fooq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, fooq) + self.assertEquals(job.origin, fooq.name) self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, barq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, barq) + self.assertEquals(job.origin, barq.name) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') - def test_dequeue_unpicklable_data(self): - """Error handling of invalid pickle data.""" - - # Push non-pickle data on the queue - q = Queue('foo') - blob = 'this is nothing like pickled data' - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push value pickle data, but not representing a job tuple - q = Queue('foo') - blob = dumps('this is pickled, but not a job tuple') - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push slightly incorrect pickled data onto the queue (simulate - # a function that can't be imported from the worker) - q = Queue('foo') + def test_dequeue_any_ignores_nonexisting_jobs(self): + """Dequeuing (from any queue) silently ignores non-existing jobs.""" - job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) - blob = job_tuple.replace('testjob', 'fooobar') - self.testconn.rpush(q._key, blob) + q = Queue('low') + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when dequeue()'ing + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None) + self.assertEquals(q.count, 0) diff --git a/tests/test_worker.py b/tests/test_worker.py index b4cd0aa..8a0f2cf 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,5 +1,6 @@ from tests import RQTestCase from tests import testjob, failing_job +from tests.helpers import strip_milliseconds from rq import Queue, Worker from rq.job import Job @@ -21,7 +22,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker processes unreadable job.""" + """Unreadable jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') @@ -31,14 +32,15 @@ class TestWorker(RQTestCase): # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. - job = Job(failing_job, 3) - pickled_job = job.pickle() - invalid_data = pickled_job.replace( - 'failing_job', 'nonexisting_job') + job = Job.for_call(failing_job, 3) + job.save() + data = self.testconn.hget(job.key, 'data') + invalid_data = data.replace('failing_job', 'nonexisting_job') + self.testconn.hset(job.key, 'data', invalid_data) # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q._push(invalid_data) + q.push_job_id(job.id) self.assertEquals(q.count, 1) @@ -50,20 +52,33 @@ class TestWorker(RQTestCase): self.assertEquals(failure_q.count, 1) def test_work_fails(self): - """Worker processes failing job.""" + """Failing jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') + # Preconditions self.assertEquals(failure_q.count, 0) self.assertEquals(q.count, 0) - q.enqueue(failing_job) - + # Action + job = q.enqueue(failing_job) self.assertEquals(q.count, 1) + + # keep for later + enqueued_at_date = strip_milliseconds(job.enqueued_at) + w = Worker([q]) w.work(burst=True) # should silently pass - self.assertEquals(q.count, 0) + # Postconditions + self.assertEquals(q.count, 0) self.assertEquals(failure_q.count, 1) + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.origin, q.name) + + # should be the original enqueued_at date, not the date of enqueueing to the failure queue + self.assertEquals(job.enqueued_at, enqueued_at_date) + self.assertIsNotNone(job.exc_info) # should contain exc_info