From 7b8d4e075e30ca64fa8caa09867624cf2480c983 Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Thu, 11 Aug 2016 12:57:14 +0200 Subject: [PATCH 1/5] protecting dependency enqueuing with redis WATCH --- rq/queue.py | 4 +++- rq/worker.py | 36 ++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index e5d3df8..8f288ca 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -293,8 +293,10 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry + dependents_connection = pipeline if pipeline is not None else self.connection + while True: - job_id = as_text(self.connection.spop(job.dependents_key)) + job_id = as_text(dependents_connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) diff --git a/rq/worker.py b/rq/worker.py index 95bcded..39547cc 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,8 @@ import traceback import warnings from datetime import timedelta +from redis import WatchError + from rq.compat import as_text, string_types, text_type from .compat import PY2 @@ -667,24 +669,34 @@ class Worker(object): # to use the same exc handling when pickling fails job._result = rv - self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection) - finished_job_registry.add(job, result_ttl, pipeline) + while True: + try: + self.set_current_job_id(None, pipeline=pipeline) + + if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.save(pipeline=pipeline) - queue.enqueue_dependents(job, pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) - started_job_registry.remove(job, pipeline=pipeline) + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) + finished_job_registry.add(job, result_ttl, pipeline) - pipeline.execute() + # avoid missing dependents that where inserted after enqueue_dependents() + pipeline.watch(job.dependents_key) + queue.enqueue_dependents(job, pipeline=pipeline) + + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) + started_job_registry.remove(job, pipeline=pipeline) + + pipeline.execute() + break + except WatchError: + continue except Exception: self.handle_job_failure( From 7d267cf7efa41587156d6a184db47358162773cb Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Fri, 12 Aug 2016 14:53:39 +0200 Subject: [PATCH 2/5] implemented correct watch/multi/exec --- rq/worker.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 39547cc..b44ffe7 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -675,6 +675,13 @@ class Worker(object): while True: try: + # if dependencies are inserted after enqueue_dependents + # a WatchError is thrown by execute() + pipeline.watch(job.dependents_key) + queue.enqueue_dependents(job, pipeline=pipeline) + + # pipeline all following commands (reads won't work!) + pipeline.multi() self.set_current_job_id(None, pipeline=pipeline) if result_ttl != 0: @@ -685,10 +692,6 @@ class Worker(object): self.connection) finished_job_registry.add(job, result_ttl, pipeline) - # avoid missing dependents that where inserted after enqueue_dependents() - pipeline.watch(job.dependents_key) - queue.enqueue_dependents(job, pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) started_job_registry.remove(job, pipeline=pipeline) From 67ae9277e5cc1aea5aa3c4aad9b0de254ab55ea2 Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Sat, 13 Aug 2016 20:22:20 +0200 Subject: [PATCH 3/5] completely pipelined enqueue_dependents --- rq/job.py | 2 +- rq/queue.py | 60 +++++++++++++++++++++++++++++++++++++--------------- rq/worker.py | 3 +-- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/rq/job.py b/rq/job.py index ce1894b..d037374 100644 --- a/rq/job.py +++ b/rq/job.py @@ -543,7 +543,7 @@ class Job(object): forever) """ if ttl == 0: - self.delete(remove_from_queue=remove_from_queue) + self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return elif ttl > 0: diff --git a/rq/queue.py b/rq/queue.py index 8f288ca..1e5b7e4 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -280,35 +280,61 @@ class Queue(object): job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + if self._async: + self.push_job_id(job.id, pipeline=pipe, at_front=at_front) + if pipeline is None: pipe.execute() - if self._async: - self.push_job_id(job.id, at_front=at_front) - return job def enqueue_dependents(self, job, pipeline=None): - """Enqueues all jobs in the given job's dependents set and clears it.""" - # TODO: can probably be pipelined + """Enqueues all jobs in the given job's dependents set and clears it. + + When called without a pipeline, this method uses WATCH/MULTI/EXEC. + If you pass a pipeline, only MULTI is called. The rest is up to the + caller. + """ from .registry import DeferredJobRegistry - dependents_connection = pipeline if pipeline is not None else self.connection + pipe = pipeline if pipeline is not None else self.connection._pipeline() + dependents_key = job.dependents_key while True: - job_id = as_text(dependents_connection.spop(job.dependents_key)) - if job_id is None: + try: + # if a pipeline is passed, the caller is responsible for calling WATCH + # to ensure all jobs are enqueued + if pipeline is None: + pipe.watch(dependents_key) + + dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) + for job_id in pipe.smembers(dependents_key)] + + pipe.multi() + + for dependent in dependent_jobs: + registry = DeferredJobRegistry(dependent.origin, self.connection) + registry.remove(dependent, pipeline=pipe) + if dependent.origin == self.name: + self.enqueue_job(dependent, pipeline=pipe) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent, pipeline=pipe) + + pipe.delete(dependents_key) + + if pipeline is None: + pipe.execute() + break - dependent = self.job_class.fetch(job_id, connection=self.connection) - registry = DeferredJobRegistry(dependent.origin, self.connection) - with self.connection._pipeline() as pipeline: - registry.remove(dependent, pipeline=pipeline) - if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipeline) + except WatchError: + if pipeline is None: + continue else: - queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipeline) - pipeline.execute() + # if the pipeline comes from the caller, we re-raise the + # exception as it it the responsibility of the caller to + # handle it + raise def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/rq/worker.py b/rq/worker.py index b44ffe7..20554ee 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -678,10 +678,9 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) - # pipeline all following commands (reads won't work!) - pipeline.multi() self.set_current_job_id(None, pipeline=pipeline) if result_ttl != 0: From 44f98693c73a1c65f4baddc5fee6e2133b51f611 Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Sat, 13 Aug 2016 20:23:56 +0200 Subject: [PATCH 4/5] added a test for the race condition --- tests/test_worker.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index b753274..f09a226 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -11,6 +11,8 @@ import time from multiprocessing import Process import subprocess +import mock + from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, @@ -567,6 +569,40 @@ class TestWorker(RQTestCase): worker.work(burst=True) self.assertEqual(self.testconn.zcard(registry.key), 0) + def test_job_dependency_race_condition(self): + """Dependencies added while the job gets finished shouldn't get lost.""" + + # This patches the enqueue_dependents to enqueue a new dependency AFTER + # the original code was executed. + orig_enqueue_dependents = Queue.enqueue_dependents + + def new_enqueue_dependents(self, job, *args, **kwargs): + orig_enqueue_dependents(self, job, *args, **kwargs) + if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id: + Queue._add_enqueue = None + Queue().enqueue_call(say_hello, depends_on=job) + + Queue.enqueue_dependents = new_enqueue_dependents + + q = Queue() + w = Worker([q]) + with mock.patch.object(Worker, 'execute_job', wraps=w.execute_job) as mocked: + parent_job = q.enqueue(say_hello, result_ttl=0) + Queue._add_enqueue = parent_job + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + # The created spy checks two issues: + # * before the fix of #739, 2 of the 3 jobs where executed due + # to the race condition + # * during the development another issue was fixed: + # due to a missing pipeline usage in Queue.enqueue_job, the job + # which was enqueued before the "rollback" was executed twice. + # So before that fix the call count was 4 instead of 3 + self.assertEqual(mocked.call_count, 3) + def kill_worker(pid, double_kill): # wait for the worker to be started over on the main process From a0cee2d2a0ec7e94dc76d5259b8298d864ee5c35 Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Thu, 22 Sep 2016 21:36:37 +0200 Subject: [PATCH 5/5] refactored worker code Moved code into a new handle_job_success() method and reduced context of used pipelines. --- rq/worker.py | 173 ++++++++++++++++++++++--------------------- tests/test_worker.py | 2 +- 2 files changed, 89 insertions(+), 86 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 20554ee..8b47236 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -536,29 +536,23 @@ class Worker(object): # Job completed and its ttl has expired break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - with self.connection._pipeline() as pipeline: - self.handle_job_failure( - job=job, - pipeline=pipeline + self.handle_job_failure( + job=job + ) + + #Unhandled failure: move the job to the failed queue + self.log.warning( + 'Moving job to {0!r} queue'.format( + self.failed_queue.name ) - try: - pipeline.execute() - except Exception: - pass - - #Unhandled failure: move the job to the failed queue - self.log.warning( - 'Moving job to {0!r} queue'.format( - self.failed_queue.name - ) - ) - self.failed_queue.quarantine( - job, - exc_info=( - "Work-horse proccess " - "was terminated unexpectedly" - ) + ) + self.failed_queue.quarantine( + job, + exc_info=( + "Work-horse proccess " + "was terminated unexpectedly" ) + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -631,8 +625,7 @@ class Worker(object): def handle_job_failure( self, job, - started_job_registry=None, - pipeline=None + started_job_registry=None ): """Handles the failure or an executing job by: 1. Setting the job status to failed @@ -640,89 +633,99 @@ class Worker(object): 3. Setting the workers current job to None """ - if started_job_registry is None: - started_job_registry = StartedJobRegistry( - job.origin, - self.connection - ) - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) - - def perform_job(self, job, queue): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. - """ - self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, + self.connection + ) + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass + def handle_job_success( + self, + job, + queue, + started_job_registry + ): with self.connection._pipeline() as pipeline: + while True: + try: + # if dependencies are inserted after enqueue_dependents + # a WatchError is thrown by execute() + pipeline.watch(job.dependents_key) + # enqueue_dependents calls multi() on the pipeline! + queue.enqueue_dependents(job, pipeline=pipeline) - push_connection(self.connection) + self.set_current_job_id(None, pipeline=pipeline) - started_job_registry = StartedJobRegistry(job.origin, self.connection) + result_ttl = job.get_result_ttl(self.default_result_ttl) + if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.save(pipeline=pipeline) - try: - with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): - rv = job.perform() + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) + finished_job_registry.add(job, result_ttl, pipeline) - # Pickle the result in the same try-except block since we need - # to use the same exc handling when pickling fails - job._result = rv + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) + started_job_registry.remove(job, pipeline=pipeline) - result_ttl = job.get_result_ttl(self.default_result_ttl) - if result_ttl != 0: - job.ended_at = utcnow() + pipeline.execute() + break + except WatchError: + continue - while True: - try: - # if dependencies are inserted after enqueue_dependents - # a WatchError is thrown by execute() - pipeline.watch(job.dependents_key) - # enqueue_dependents calls multi() on the pipeline! - queue.enqueue_dependents(job, pipeline=pipeline) + def perform_job(self, job, queue): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) - self.set_current_job_id(None, pipeline=pipeline) + push_connection(self.connection) - if result_ttl != 0: - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection) - finished_job_registry.add(job, result_ttl, pipeline) + try: + with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): + rv = job.perform() - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) - started_job_registry.remove(job, pipeline=pipeline) + job.ended_at = utcnow() - pipeline.execute() - break - except WatchError: - continue + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails + job._result = rv - except Exception: - self.handle_job_failure( - job=job, - started_job_registry=started_job_registry, - pipeline=pipeline - ) - try: - pipeline.execute() - except Exception: - # Ensure that custom exception handlers are called - # even if Redis is down - pass - self.handle_exception(job, *sys.exc_info()) - return False + self.handle_job_success( + job=job, + queue=queue, + started_job_registry=started_job_registry + ) + except Exception: + self.handle_job_failure( + job=job, + started_job_registry=started_job_registry + ) + self.handle_exception(job, *sys.exc_info()) + return False - finally: - pop_connection() + finally: + pop_connection() self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) self.log.debug('Result: {0}'.format(yellow(log_result))) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl == 0: self.log.info('Result discarded immediately') elif result_ttl > 0: diff --git a/tests/test_worker.py b/tests/test_worker.py index f09a226..dd4d0fe 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -578,7 +578,7 @@ class TestWorker(RQTestCase): def new_enqueue_dependents(self, job, *args, **kwargs): orig_enqueue_dependents(self, job, *args, **kwargs) - if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id: + if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue is not None and Queue._add_enqueue.id == job.id: Queue._add_enqueue = None Queue().enqueue_call(say_hello, depends_on=job)