From 442b389b970e9cd6843a58d82c7b9da9d1c6305b Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 25 Aug 2012 17:58:15 +0700 Subject: [PATCH] Job returning None as result are now persisted correctly. Job status can now be checked via ``status`` property which should return either "queued", "finished" or "failed". --- rq/job.py | 19 ++++++++++++++++--- rq/queue.py | 3 ++- rq/worker.py | 30 ++++++++++++++--------------- tests/test_queue.py | 6 ++++++ tests/test_worker.py | 45 ++++++++++++++++++++++---------------------- 5 files changed, 61 insertions(+), 42 deletions(-) diff --git a/rq/job.py b/rq/job.py index 7170f7a..bf55661 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,6 +1,7 @@ import importlib import inspect import times +from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError from .connections import get_current_connection @@ -8,9 +9,11 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', - 'created_at', 'enqueued_at', 'connection', '_result', + 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', - 'result_ttl']) + 'result_ttl', '_status', 'status']) +Status = namedtuple('Status', ('queued', 'finished', 'failed')) +STATUS = Status(queued='queued', finished='finished', failed='failed') def unpickle(pickled_string): @@ -48,11 +51,12 @@ def requeue_job(job_id, connection=None): class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + STATUS = STATUS # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None): + result_ttl=None, status=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -74,12 +78,17 @@ class Job(object): job._kwargs = kwargs job.description = job.get_call_string() job.result_ttl = result_ttl + job._status = status return job @property def func_name(self): return self._func_name + @property + def status(self): + return self._status + @property def func(self): func_name = self.func_name @@ -138,6 +147,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self._status = None # Data access @@ -227,6 +237,7 @@ class Job(object): self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self._status = obj.get('status') if obj.get('status') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -258,6 +269,8 @@ class Job(object): obj['timeout'] = self.timeout if self.result_ttl is not None: obj['result_ttl'] = self.result_ttl + if self._status is not None: + obj['status'] = self._status """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index 0ff5861..f9cfff0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -115,7 +115,8 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) + job = Job.create(func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=Job.STATUS.queued) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 389ca77..4ff7888 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -384,10 +384,12 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails pickled_rv = dumps(rv) + job._status = job.STATUS.finished except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) + job._status = job.STATUS.failed fq.quarantine(job, exc_info=traceback.format_exc()) return False @@ -397,23 +399,21 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - # Expire results - has_result = rv is not None - explicit_ttl_requested = job.result_ttl is not None - should_expire = has_result or explicit_ttl_requested - if should_expire: + """ + How long we persist the job result depends on the value of result_ttl: + - If result_ttl is 0, cleanup the job immediately. + - If it's a positive number, set the job to expire in X seconds. + - If result_ttl is negative, don't set an expiry to it (persist forever) + """ + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl + if result_ttl == 0: + job.delete() + else: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - - if explicit_ttl_requested: - ttl = job.result_ttl - else: - ttl = self.default_result_ttl - if ttl >= 0: - p.expire(job.key, ttl) + p.hset(job.key, 'status', job._status) + if result_ttl > 0: + p.expire(job.key, result_ttl) p.execute() - else: - # Cleanup immediately - job.delete() return True diff --git a/tests/test_queue.py b/tests/test_queue.py index fe50318..3d455c2 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -205,6 +205,12 @@ class TestQueue(RQTestCase): None) self.assertEquals(q.count, 0) + def test_enqueue_sets_status(self): + """Enqueueing a job sets its status to "queued".""" + q = Queue() + job = q.enqueue(say_hello) + self.assertEqual(job.status, Job.STATUS.queued) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5e46a13..91d1589 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -123,29 +123,6 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEquals(os.path.exists(SENTINEL_FILE), False) - def test_cleaning_up_of_jobs(self): - """Jobs get cleaned up after successful execution.""" - q = Queue() - job_with_rv = q.enqueue(say_hello, 'Franklin') - job_without_rv = q.enqueue(do_nothing) - - # Job hashes exists - self.assertEquals(self.testconn.type(job_with_rv.key), 'hash') - self.assertEquals(self.testconn.type(job_without_rv.key), 'hash') - - # Execute the job - w = Worker([q]) - w.work(burst=True) - - # First, assert that the job executed successfully - assert self.testconn.hget(job_with_rv.key, 'exc_info') is None - assert self.testconn.hget(job_without_rv.key, 'exc_info') is None - - # Jobs with results expire after a certain TTL, while jobs without - # results are immediately removed - assert self.testconn.ttl(job_with_rv.key) > 0 - assert not self.testconn.exists(job_without_rv.key) - @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -187,3 +164,25 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) self.assertEqual(self.testconn.ttl(job.key), None) + + # Job with result_ttl = 0 gets deleted immediately + job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.get(job.key), None) + + def test_worker_sets_job_status(self): + """Ensure that worker correctly sets job status.""" + q = Queue() + w = Worker([q]) + + job = q.enqueue(say_hello) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, job.STATUS.finished) + + # Failed jobs should set status to "failed" + job = q.enqueue(div_by_zero, args=(1,)) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.status, job.STATUS.failed) \ No newline at end of file