From edd139d86f9758a7c908cafcda484afa565e3caf Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 10 Jul 2015 08:58:38 +0700 Subject: [PATCH] Sync jobs should be cleaned up after execution. --- rq/decorators.py | 2 +- rq/defaults.py | 2 ++ rq/queue.py | 14 ++++++++++---- rq/worker.py | 4 ++-- tests/test_queue.py | 5 +++-- 5 files changed, 18 insertions(+), 9 deletions(-) create mode 100644 rq/defaults.py diff --git a/rq/decorators.py b/rq/decorators.py index 5c5e00b..01d5145 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -6,8 +6,8 @@ from functools import wraps from rq.compat import string_types +from .defaults import DEFAULT_RESULT_TTL from .queue import Queue -from .worker import DEFAULT_RESULT_TTL class job(object): diff --git a/rq/defaults.py b/rq/defaults.py new file mode 100644 index 0000000..6cf2fc3 --- /dev/null +++ b/rq/defaults.py @@ -0,0 +1,2 @@ +DEFAULT_WORKER_TTL = 420 +DEFAULT_RESULT_TTL = 500 diff --git a/rq/queue.py b/rq/queue.py index 16f5b0a..6125741 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -8,6 +8,7 @@ from redis import WatchError from .compat import as_text, string_types, total_ordering from .connections import resolve_connection +from .defaults import DEFAULT_RESULT_TTL from .exceptions import (DequeueTimeout, InvalidJobOperationError, NoSuchJobError, UnpickleError) from .job import Job, JobStatus @@ -212,7 +213,14 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job, at_front=at_front) + job = self.enqueue_job(job, at_front=at_front) + + if not self._async: + job.perform() + job.save() + job.cleanup(DEFAULT_RESULT_TTL) + + return job def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -273,9 +281,7 @@ class Queue(object): if self._async: self.push_job_id(job.id, at_front=at_front) - else: - job.perform() - job.save() + return job def enqueue_dependents(self, job): diff --git a/rq/worker.py b/rq/worker.py index e4ab4da..f86d39a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -17,6 +17,7 @@ from datetime import timedelta from rq.compat import as_text, string_types, text_type from .connections import get_current_connection +from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .exceptions import DequeueTimeout from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -38,8 +39,7 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') -DEFAULT_WORKER_TTL = 420 -DEFAULT_RESULT_TTL = 500 + logger = logging.getLogger(__name__) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8bb0ce0..bebdac7 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -368,7 +368,7 @@ class TestQueue(RQTestCase): self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_job_with_dependency_by_id(self): - """Enqueueing jobs should work as expected by id as well as job-objects.""" + """"Can specify job dependency with job object or job id.""" parent_job = Job.create(func=say_hello) q = Queue() @@ -465,10 +465,11 @@ class TestFailedQueue(RQTestCase): self.assertEqual(int(job_from_queue.result_ttl), 10) def test_async_false(self): - """Executes a job immediately if async=False.""" + """Job executes and cleaned up immediately if async=False.""" q = Queue(async=False) job = q.enqueue(some_calculation, args=(2, 3)) self.assertEqual(job.return_value, 6) + self.assertNotEqual(self.testconn.ttl(job.key), -1) def test_custom_job_class(self): """Ensure custom job class assignment works as expected."""