Add job to WorkingQueue before execution and remove from WorkingQueue after.

main
Selwin Ong 11 years ago
parent f38d0dc791
commit 893fc5a6ae

@ -475,9 +475,12 @@ class Worker(object):
job execution. job execution.
""" """
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
timeout = (job.timeout or 180) + 60
self.set_state('busy', pipeline=pipeline) self.set_state('busy', pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline)
working_queue = WorkingQueue(job.origin, self.connection)
working_queue.add(job, timeout, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
@ -491,13 +494,15 @@ class Worker(object):
self.prepare_job_execution(job) self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
working_queue = WorkingQueue(job.origin, self.connection)
try: try:
job.set_status(Status.STARTED) job.set_status(Status.STARTED)
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need
# use the same exc handling when pickling fails # to use the same exc handling when pickling fails
job._result = rv job._result = rv
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
@ -508,12 +513,14 @@ class Worker(object):
job._status = Status.FINISHED job._status = Status.FINISHED
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline)
working_queue.remove(job, pipeline=pipeline)
pipeline.execute() pipeline.execute()
except Exception: except Exception:
# Use the public setter here, to immediately update Redis # Use the public setter here, to immediately update Redis
job.set_status(Status.FAILED) job.set_status(Status.FAILED)
working_queue.remove(job)
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False

@ -15,18 +15,22 @@ class WorkingQueue:
Jobs whose score are lower than current time is considered "expired". Jobs whose score are lower than current time is considered "expired".
""" """
def __init__(self, name, connection=None): def __init__(self, name='default', connection=None):
self.name = name self.name = name
self.key = 'rq:wip:%s' % name self.key = 'rq:wip:%s' % name
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
def add(self, job, timeout): def add(self, job, timeout, pipeline=None):
"""Adds a job to WorkingQueue with expiry time of now + timeout.""" """Adds a job to WorkingQueue with expiry time of now + timeout."""
return self.connection._zadd(self.key, current_timestamp() + timeout, score = current_timestamp() + timeout
job.id) if pipeline is not None:
return pipeline.zadd(self.key, score, job.id)
def remove(self, job): return self.connection._zadd(self.key, score, job.id)
return self.connection.zrem(self.key, job.id)
def remove(self, job, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id)
def get_expired_job_ids(self): def get_expired_job_ids(self):
"""Returns job ids whose score are less than current timestamp.""" """Returns job ids whose score are less than current timestamp."""

@ -7,6 +7,7 @@ import os
from rq import get_failed_queue, Queue, Worker from rq import get_failed_queue, Queue, Worker
from rq.compat import as_text from rq.compat import as_text
from rq.job import Job, Status from rq.job import Job, Status
from rq.working_queue import WorkingQueue
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero,
@ -277,3 +278,18 @@ class TestWorker(RQTestCase):
q = Queue() q = Queue()
worker = Worker([q], job_class=CustomJob) worker = Worker([q], job_class=CustomJob)
self.assertEqual(worker.job_class, CustomJob) self.assertEqual(worker.job_class, CustomJob)
def test_prepare_job_execution(self):
"""Prepare job execution does the necessary bookkeeping."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
worker = Worker([queue])
worker.prepare_job_execution(job)
# Updates working queue
working_queue = WorkingQueue(connection=self.testconn)
self.assertEqual(working_queue.get_job_ids(), [job.id])
# Updates worker statuses
self.assertEqual(worker.state, 'busy')
self.assertEqual(worker.get_current_job_id(), job.id)

@ -2,18 +2,20 @@
from __future__ import absolute_import from __future__ import absolute_import
from rq.job import Job from rq.job import Job
from rq.queue import FailedQueue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker
from rq.working_queue import WorkingQueue from rq.working_queue import WorkingQueue
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
def setUp(self): def setUp(self):
super(TestQueue, self).setUp() super(TestQueue, self).setUp()
self.working_queue = WorkingQueue('default', connection=self.testconn) self.working_queue = WorkingQueue(connection=self.testconn)
def test_add_and_remove(self): def test_add_and_remove(self):
"""Adding and removing job to WorkingQueue.""" """Adding and removing job to WorkingQueue."""
@ -52,4 +54,25 @@ class TestQueue(RQTestCase):
self.working_queue.cleanup() self.working_queue.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn('foo', failed_queue.job_ids)
def test_job_execution(self):
"""Job is removed from WorkingQueue after execution."""
working_queue = WorkingQueue(connection=self.testconn)
queue = Queue(connection=self.testconn)
worker = Worker([queue])
job = queue.enqueue(say_hello)
worker.prepare_job_execution(job)
self.assertIn(job.id, working_queue.get_job_ids())
worker.perform_job(job)
self.assertNotIn(job.id, working_queue.get_job_ids())
# Job that fails
job = queue.enqueue(div_by_zero)
worker.prepare_job_execution(job)
self.assertIn(job.id, working_queue.get_job_ids())
worker.perform_job(job)
self.assertNotIn(job.id, working_queue.get_job_ids())

Loading…
Cancel
Save