diff --git a/rq/worker.py b/rq/worker.py index 2731c98..41f17d1 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -475,9 +475,12 @@ class Worker(object): job execution. """ with self.connection._pipeline() as pipeline: + timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + self.heartbeat(timeout, pipeline=pipeline) + working_queue = WorkingQueue(job.origin, self.connection) + working_queue.add(job, timeout, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -491,13 +494,15 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + working_queue = WorkingQueue(job.origin, self.connection) + try: job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() - # Pickle the result in the same try-except block since we need to - # use the same exc handling when pickling fails + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails job._result = rv self.set_current_job_id(None, pipeline=pipeline) @@ -508,12 +513,14 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) pipeline.execute() except Exception: # Use the public setter here, to immediately update Redis job.set_status(Status.FAILED) + working_queue.remove(job) self.handle_exception(job, *sys.exc_info()) return False diff --git a/rq/working_queue.py b/rq/working_queue.py index 64cf1e9..8959384 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -15,18 +15,22 @@ class WorkingQueue: Jobs whose score are lower than current time is considered "expired". """ - def __init__(self, name, connection=None): + def __init__(self, name='default', connection=None): self.name = name self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) - def add(self, job, timeout): + def add(self, job, timeout, pipeline=None): """Adds a job to WorkingQueue with expiry time of now + timeout.""" - return self.connection._zadd(self.key, current_timestamp() + timeout, - job.id) + score = current_timestamp() + timeout + if pipeline is not None: + return pipeline.zadd(self.key, score, job.id) - def remove(self, job): - return self.connection.zrem(self.key, job.id) + return self.connection._zadd(self.key, score, job.id) + + def remove(self, job, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + return connection.zrem(self.key, job.id) def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index e02ee2a..91f95ea 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,6 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status +from rq.working_queue import WorkingQueue from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -277,3 +278,18 @@ class TestWorker(RQTestCase): q = Queue() worker = Worker([q], job_class=CustomJob) self.assertEqual(worker.job_class, CustomJob) + + def test_prepare_job_execution(self): + """Prepare job execution does the necessary bookkeeping.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.prepare_job_execution(job) + + # Updates working queue + working_queue = WorkingQueue(connection=self.testconn) + self.assertEqual(working_queue.get_job_ids(), [job.id]) + + # Updates worker statuses + self.assertEqual(worker.state, 'busy') + self.assertEqual(worker.get_current_job_id(), job.id) diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index af6eeb4..249c035 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,18 +2,20 @@ from __future__ import absolute_import from rq.job import Job -from rq.queue import FailedQueue +from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp +from rq.worker import Worker from rq.working_queue import WorkingQueue from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue('default', connection=self.testconn) + self.working_queue = WorkingQueue(connection=self.testconn) def test_add_and_remove(self): """Adding and removing job to WorkingQueue.""" @@ -52,4 +54,25 @@ class TestQueue(RQTestCase): self.working_queue.cleanup() self.assertIn('foo', failed_queue.job_ids) - \ No newline at end of file + def test_job_execution(self): + """Job is removed from WorkingQueue after execution.""" + working_queue = WorkingQueue(connection=self.testconn) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + job = queue.enqueue(say_hello) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) + + # Job that fails + job = queue.enqueue(div_by_zero) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids())