Add job to WorkingQueue before execution and remove from WorkingQueue after.

main
Selwin Ong 11 years ago
parent b0c0a84ab0
commit 4d90cc062e

@ -475,9 +475,12 @@ class Worker(object):
job execution. job execution.
""" """
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
timeout = (job.timeout or 180) + 60
self.set_state('busy', pipeline=pipeline) self.set_state('busy', pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline)
working_queue = WorkingQueue(job.origin, self.connection)
working_queue.add(job, timeout, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
@ -491,13 +494,15 @@ class Worker(object):
self.prepare_job_execution(job) self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
working_queue = WorkingQueue(job.origin, self.connection)
try: try:
job.set_status(Status.STARTED) job.set_status(Status.STARTED)
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need
# use the same exc handling when pickling fails # to use the same exc handling when pickling fails
job._result = rv job._result = rv
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
@ -508,12 +513,14 @@ class Worker(object):
job._status = Status.FINISHED job._status = Status.FINISHED
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline)
working_queue.remove(job, pipeline=pipeline)
pipeline.execute() pipeline.execute()
except Exception: except Exception:
# Use the public setter here, to immediately update Redis # Use the public setter here, to immediately update Redis
job.set_status(Status.FAILED) job.set_status(Status.FAILED)
working_queue.remove(job)
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False

@ -15,18 +15,22 @@ class WorkingQueue:
Jobs whose score are lower than current time is considered "expired". Jobs whose score are lower than current time is considered "expired".
""" """
def __init__(self, name, connection=None): def __init__(self, name='default', connection=None):
self.name = name self.name = name
self.key = 'rq:wip:%s' % name self.key = 'rq:wip:%s' % name
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
def add(self, job, timeout): def add(self, job, timeout, pipeline=None):
"""Adds a job to WorkingQueue with expiry time of now + timeout.""" """Adds a job to WorkingQueue with expiry time of now + timeout."""
return self.connection._zadd(self.key, current_timestamp() + timeout, score = current_timestamp() + timeout
job.id) if pipeline is not None:
return pipeline.zadd(self.key, score, job.id)
def remove(self, job): return self.connection._zadd(self.key, score, job.id)
return self.connection.zrem(self.key, job.id)
def remove(self, job, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id)
def get_expired_job_ids(self): def get_expired_job_ids(self):
"""Returns job ids whose score are less than current timestamp.""" """Returns job ids whose score are less than current timestamp."""

@ -7,10 +7,11 @@ import os
from rq import get_failed_queue, Queue, Worker, SimpleWorker from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text from rq.compat import as_text
from rq.job import Job, Status from rq.job import Job, Status
from rq.working_queue import WorkingQueue
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, from tests.fixtures import (create_file, create_file_after_timeout,
say_hello, say_pid) div_by_zero, say_hello, say_pid)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
@ -291,3 +292,18 @@ class TestWorker(RQTestCase):
'Expected at least some work done.') 'Expected at least some work done.')
self.assertEquals(job.result, os.getpid(), self.assertEquals(job.result, os.getpid(),
'PID mismatch, fork() is not supposed to happen here') 'PID mismatch, fork() is not supposed to happen here')
def test_prepare_job_execution(self):
"""Prepare job execution does the necessary bookkeeping."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
worker = Worker([queue])
worker.prepare_job_execution(job)
# Updates working queue
working_queue = WorkingQueue(connection=self.testconn)
self.assertEqual(working_queue.get_job_ids(), [job.id])
# Updates worker statuses
self.assertEqual(worker.state, 'busy')
self.assertEqual(worker.get_current_job_id(), job.id)

@ -2,18 +2,20 @@
from __future__ import absolute_import from __future__ import absolute_import
from rq.job import Job from rq.job import Job
from rq.queue import FailedQueue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker
from rq.working_queue import WorkingQueue from rq.working_queue import WorkingQueue
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
def setUp(self): def setUp(self):
super(TestQueue, self).setUp() super(TestQueue, self).setUp()
self.working_queue = WorkingQueue('default', connection=self.testconn) self.working_queue = WorkingQueue(connection=self.testconn)
def test_add_and_remove(self): def test_add_and_remove(self):
"""Adding and removing job to WorkingQueue.""" """Adding and removing job to WorkingQueue."""
@ -52,4 +54,25 @@ class TestQueue(RQTestCase):
self.working_queue.cleanup() self.working_queue.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn('foo', failed_queue.job_ids)
def test_job_execution(self):
"""Job is removed from WorkingQueue after execution."""
working_queue = WorkingQueue(connection=self.testconn)
queue = Queue(connection=self.testconn)
worker = Worker([queue])
job = queue.enqueue(say_hello)
worker.prepare_job_execution(job)
self.assertIn(job.id, working_queue.get_job_ids())
worker.perform_job(job)
self.assertNotIn(job.id, working_queue.get_job_ids())
# Job that fails
job = queue.enqueue(div_by_zero)
worker.prepare_job_execution(job)
self.assertIn(job.id, working_queue.get_job_ids())
worker.perform_job(job)
self.assertNotIn(job.id, working_queue.get_job_ids())

Loading…
Cancel
Save