diff --git a/rq/job.py b/rq/job.py index 133a748..b820f79 100644 --- a/rq/job.py +++ b/rq/job.py @@ -543,7 +543,7 @@ class Job(object): forever) """ if ttl == 0: - self.delete(remove_from_queue=remove_from_queue) + self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return elif ttl > 0: diff --git a/rq/queue.py b/rq/queue.py index 5027fa6..8231b08 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -299,24 +299,52 @@ class Queue(object): return job def enqueue_dependents(self, job, pipeline=None): - """Enqueues all jobs in the given job's dependents set and clears it.""" - # TODO: can probably be pipelined + """Enqueues all jobs in the given job's dependents set and clears it. + + When called without a pipeline, this method uses WATCH/MULTI/EXEC. + If you pass a pipeline, only MULTI is called. The rest is up to the + caller. + """ from .registry import DeferredJobRegistry + pipe = pipeline if pipeline is not None else self.connection._pipeline() + dependents_key = job.dependents_key + while True: - job_id = as_text(self.connection.spop(job.dependents_key)) - if job_id is None: + try: + # if a pipeline is passed, the caller is responsible for calling WATCH + # to ensure all jobs are enqueued + if pipeline is None: + pipe.watch(dependents_key) + + dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) + for job_id in pipe.smembers(dependents_key)] + + pipe.multi() + + for dependent in dependent_jobs: + registry = DeferredJobRegistry(dependent.origin, self.connection) + registry.remove(dependent, pipeline=pipe) + if dependent.origin == self.name: + self.enqueue_job(dependent, pipeline=pipe) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent, pipeline=pipe) + + pipe.delete(dependents_key) + + if pipeline is None: + pipe.execute() + break - dependent = self.job_class.fetch(job_id, connection=self.connection) - registry = DeferredJobRegistry(dependent.origin, self.connection) - with self.connection._pipeline() as pipeline: - registry.remove(dependent, pipeline=pipeline) - if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipeline) + except WatchError: + if pipeline is None: + continue else: - queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipeline) - pipeline.execute() + # if the pipeline comes from the caller, we re-raise the + # exception as it it the responsibility of the caller to + # handle it + raise def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/rq/worker.py b/rq/worker.py index 3067f68..7d745c4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,8 @@ import traceback import warnings from datetime import timedelta +from redis import WatchError + from rq.compat import as_text, string_types, text_type from .compat import PY2 @@ -535,29 +537,23 @@ class Worker(object): # Job completed and its ttl has expired break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - with self.connection._pipeline() as pipeline: - self.handle_job_failure( - job=job, - pipeline=pipeline - ) - try: - pipeline.execute() - except Exception: - pass - - # Unhandled failure: move the job to the failed queue - self.log.warning( - 'Moving job to {0!r} queue'.format( - self.failed_queue.name - ) + self.handle_job_failure( + job=job + ) + + # Unhandled failure: move the job to the failed queue + self.log.warning( + 'Moving job to {0!r} queue'.format( + self.failed_queue.name ) - self.failed_queue.quarantine( - job, - exc_info=( - "Work-horse proccess " - "was terminated unexpectedly" - ) + ) + self.failed_queue.quarantine( + job, + exc_info=( + "Work-horse proccess " + "was terminated unexpectedly" ) + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -630,8 +626,7 @@ class Worker(object): def handle_job_failure( self, job, - started_job_registry=None, - pipeline=None + started_job_registry=None ): """Handles the failure or an executing job by: 1. Setting the job status to failed @@ -639,77 +634,99 @@ class Worker(object): 3. Setting the workers current job to None """ - if started_job_registry is None: - started_job_registry = StartedJobRegistry( - job.origin, - self.connection - ) - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) - - def perform_job(self, job, queue): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. - """ - self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, + self.connection + ) + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass + def handle_job_success( + self, + job, + queue, + started_job_registry + ): with self.connection._pipeline() as pipeline: + while True: + try: + # if dependencies are inserted after enqueue_dependents + # a WatchError is thrown by execute() + pipeline.watch(job.dependents_key) + # enqueue_dependents calls multi() on the pipeline! + queue.enqueue_dependents(job, pipeline=pipeline) - push_connection(self.connection) + self.set_current_job_id(None, pipeline=pipeline) - started_job_registry = StartedJobRegistry(job.origin, self.connection) + result_ttl = job.get_result_ttl(self.default_result_ttl) + if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.save(pipeline=pipeline) - try: - with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): - rv = job.perform() + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) + finished_job_registry.add(job, result_ttl, pipeline) - # Pickle the result in the same try-except block since we need - # to use the same exc handling when pickling fails - job._result = rv + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) + started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) + pipeline.execute() + break + except WatchError: + continue - result_ttl = job.get_result_ttl(self.default_result_ttl) - if result_ttl != 0: - job.ended_at = utcnow() - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) + def perform_job(self, job, queue): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection) - finished_job_registry.add(job, result_ttl, pipeline) + push_connection(self.connection) - queue.enqueue_dependents(job, pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) - started_job_registry.remove(job, pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) - pipeline.execute() + try: + with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): + rv = job.perform() - except Exception: - self.handle_job_failure( - job=job, - started_job_registry=started_job_registry, - pipeline=pipeline - ) - try: - pipeline.execute() - except Exception: - # Ensure that custom exception handlers are called - # even if Redis is down - pass - self.handle_exception(job, *sys.exc_info()) - return False + job.ended_at = utcnow() + + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails + job._result = rv - finally: - pop_connection() + self.handle_job_success( + job=job, + queue=queue, + started_job_registry=started_job_registry + ) + except Exception: + self.handle_job_failure( + job=job, + started_job_registry=started_job_registry + ) + self.handle_exception(job, *sys.exc_info()) + return False + + finally: + pop_connection() self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) self.log.debug('Result: {0}'.format(yellow(log_result))) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl == 0: self.log.info('Result discarded immediately') elif result_ttl > 0: diff --git a/tests/test_worker.py b/tests/test_worker.py index 4923379..8c1d2d2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -11,6 +11,8 @@ import time from multiprocessing import Process import subprocess +import mock + from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, @@ -567,6 +569,40 @@ class TestWorker(RQTestCase): worker.work(burst=True) self.assertEqual(self.testconn.zcard(registry.key), 0) + def test_job_dependency_race_condition(self): + """Dependencies added while the job gets finished shouldn't get lost.""" + + # This patches the enqueue_dependents to enqueue a new dependency AFTER + # the original code was executed. + orig_enqueue_dependents = Queue.enqueue_dependents + + def new_enqueue_dependents(self, job, *args, **kwargs): + orig_enqueue_dependents(self, job, *args, **kwargs) + if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue is not None and Queue._add_enqueue.id == job.id: + Queue._add_enqueue = None + Queue().enqueue_call(say_hello, depends_on=job) + + Queue.enqueue_dependents = new_enqueue_dependents + + q = Queue() + w = Worker([q]) + with mock.patch.object(Worker, 'execute_job', wraps=w.execute_job) as mocked: + parent_job = q.enqueue(say_hello, result_ttl=0) + Queue._add_enqueue = parent_job + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + # The created spy checks two issues: + # * before the fix of #739, 2 of the 3 jobs where executed due + # to the race condition + # * during the development another issue was fixed: + # due to a missing pipeline usage in Queue.enqueue_job, the job + # which was enqueued before the "rollback" was executed twice. + # So before that fix the call count was 4 instead of 3 + self.assertEqual(mocked.call_count, 3) + def kill_worker(pid, double_kill): # wait for the worker to be started over on the main process