Merge pull request #561 from selwin/async-bug

Sync jobs should be cleaned up after execution.
main
Selwin Ong 10 years ago
commit e6a499ada8

@ -6,8 +6,8 @@ from functools import wraps
from rq.compat import string_types from rq.compat import string_types
from .defaults import DEFAULT_RESULT_TTL
from .queue import Queue from .queue import Queue
from .worker import DEFAULT_RESULT_TTL
class job(object): class job(object):

@ -0,0 +1,2 @@
DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500

@ -8,6 +8,7 @@ from redis import WatchError
from .compat import as_text, string_types, total_ordering from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError) NoSuchJobError, UnpickleError)
from .job import Job, JobStatus from .job import Job, JobStatus
@ -212,7 +213,14 @@ class Queue(object):
except WatchError: except WatchError:
continue continue
return self.enqueue_job(job, at_front=at_front) job = self.enqueue_job(job, at_front=at_front)
if not self._async:
job.perform()
job.save()
job.cleanup(DEFAULT_RESULT_TTL)
return job
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
@ -273,9 +281,7 @@ class Queue(object):
if self._async: if self._async:
self.push_job_id(job.id, at_front=at_front) self.push_job_id(job.id, at_front=at_front)
else:
job.perform()
job.save()
return job return job
def enqueue_dependents(self, job): def enqueue_dependents(self, job):

@ -17,6 +17,7 @@ from datetime import timedelta
from rq.compat import as_text, string_types, text_type from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection from .connections import get_current_connection
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .exceptions import DequeueTimeout from .exceptions import DequeueTimeout
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
@ -38,8 +39,7 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue') blue = make_colorizer('darkblue')
DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

@ -368,7 +368,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects.""" """"Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
@ -465,10 +465,11 @@ class TestFailedQueue(RQTestCase):
self.assertEqual(int(job_from_queue.result_ttl), 10) self.assertEqual(int(job_from_queue.result_ttl), 10)
def test_async_false(self): def test_async_false(self):
"""Executes a job immediately if async=False.""" """Job executes and cleaned up immediately if async=False."""
q = Queue(async=False) q = Queue(async=False)
job = q.enqueue(some_calculation, args=(2, 3)) job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6) self.assertEqual(job.return_value, 6)
self.assertNotEqual(self.testconn.ttl(job.key), -1)
def test_custom_job_class(self): def test_custom_job_class(self):
"""Ensure custom job class assignment works as expected.""" """Ensure custom job class assignment works as expected."""

Loading…
Cancel
Save