From ee215a1853113912d53aa14bd0ebca66ebe91843 Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Sun, 8 Dec 2019 20:45:18 -0500 Subject: [PATCH 01/16] Create get_dependencies_statuses method on Job This method shall be used in Queue#enqueue_dependendents to determine if all of a dependents' dependencies have been _FINISHED_. --- rq/job.py | 42 +++++++++++++- rq/queue.py | 3 +- tests/test_job.py | 139 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 179 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 15e706b..0661f7a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -124,7 +124,7 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - connection = pipeline or self.connection + connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @property @@ -405,7 +405,6 @@ class Job(object): return jobs - @property def result(self): """Returns the return value of the job. @@ -725,4 +724,43 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) + def get_dependencies_statuses( + self, + watch=False, + pipeline=None + ): + """Returns a list of tuples containing the job ids and status of all + dependencies; e.g: + + [('14462606-09c4-41c2-8bf1-fbd109092318', 'started'), + ('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...] + + As a minor optimization allowing callers to more quickly tell if all + dependencies are _FINISHED_, the returned list is sorted by the + `ended_at` timestamp, so those jobs which are not yet finished are at + the start of the list. + """ + + pipe = pipeline if pipeline is not None else self.connection + + if watch: + pipe.watch(self.dependencies_key) + pipe.watch(*[self.redis_job_namespace_prefix + as_text(_id) + for _id in pipe.smembers(self.dependencies_key)]) + + sort_by = self.redis_job_namespace_prefix + '*->ended_at' + get_field = self.redis_job_namespace_prefix + '*->status' + + # Sorting here lexographically works because these dates are stored in + # an ISO 8601 format, so lexographic order is the same as + # chronological order. + dependencies_statuses = [ + (as_text(_id), as_text(status)) + for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, + get=['#', get_field], alpha=True, groups=True, ) + ] + + return dependencies_statuses + + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 4316653..56ec31a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -466,7 +466,8 @@ class Queue(object): pipe.multi() - for dependent in dependent_jobs: + for dependent, dependents_dependencies in dependent_jobs: + registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) diff --git a/tests/test_job.py b/tests/test_job.py index 25c37e2..835dc6f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -6,8 +6,9 @@ import json import time import queue import zlib -from datetime import datetime +from datetime import datetime, timedelta +import pytest from redis import WatchError from rq.compat import PY2, as_text @@ -17,7 +18,7 @@ from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, ScheduledJobRegistry) -from rq.utils import utcformat +from rq.utils import utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures @@ -796,3 +797,137 @@ class TestJob(RQTestCase): self.testconn.set(dependency_job.id, 'somethingelsehappened') pipeline.touch(dependency_job.id) pipeline.execute() + + def test_get_dependencies_statuses_returns_ids_and_statuses(self): + queue = Queue(connection=self.testconn) + + dependency_job_ids = [ + queue.enqueue(fixtures.say_hello).id + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = dependency_job_ids + dependent_job.register_dependency() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertSetEqual( + set(dependencies_statuses), + {(_id, JobStatus.QUEUED) for _id in dependency_job_ids} + ) + + def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self): + queue = Queue(connection=self.testconn) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job.register_dependency() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertListEqual( + dependencies_statuses, + [] + ) + + def test_get_dependencies_statuses_returns_ordered_by_end_time(self): + dependency_jobs = [ + Job.create(fixtures.say_hello) + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + for i, job in enumerate(dependency_jobs): + job._status = JobStatus.FINISHED + job.ended_at = now - timedelta(seconds=i) + job.save() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertListEqual( + dependencies_statuses, + [(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)] + ) + + def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self): + dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] + + dependency_jobs[0]._status = JobStatus.FINISHED + dependency_jobs[0].ended_at = utcnow() + dependency_jobs[0].save() + + dependency_jobs[1]._status = JobStatus.STARTED + dependency_jobs[1].ended_at = None + dependency_jobs[1].save() + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertEqual( + dependencies_statuses[0], + (dependency_jobs[1].id, JobStatus.STARTED) + ) + + self.assertEqual( + dependencies_statuses[1], + (dependency_jobs[0].id, JobStatus.FINISHED) + ) + + def test_get_dependencies_statuses_watches_job(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [dependency_job.id] + dependent_job.register_dependency() + + with self.testconn.pipeline() as pipeline: + + dependent_job.get_dependencies_statuses( + pipeline=pipeline, + watch=True + ) + + dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn) + pipeline.multi() + + with self.assertRaises(WatchError): + pipeline.touch(Job.key_for(dependent_job.id)) + pipeline.execute() + + def test_get_dependencies_statuses_watches_dependency_set(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [dependency_job.id] + dependent_job.register_dependency() + + with self.testconn.pipeline() as pipeline: + + dependent_job.get_dependencies_statuses( + pipeline=pipeline, + watch=True + ) + + self.testconn.sadd( + dependent_job.dependencies_key, + queue.enqueue(fixtures.say_hello).id, + ) + + pipeline.multi() + + with self.assertRaises(WatchError): + pipeline.touch(Job.key_for(dependent_job.id)) + pipeline.execute() From 4a64244f405348e534512c40894cba5d9355b126 Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Dec 2019 20:57:39 -0500 Subject: [PATCH 02/16] Only enqueue dependents for all dependencies are FINISHED --- rq/queue.py | 18 ++++++++- tests/test_queue.py | 97 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 56ec31a..a354d1c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -464,9 +464,25 @@ class Queue(object): dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) for job_id in pipe.smembers(dependents_key)] + dependencies_statuses = [ + dependent.get_dependencies_statuses(watch=True, pipeline=pipe) + for dependent in dependent_jobs + ] + pipe.multi() - for dependent, dependents_dependencies in dependent_jobs: + for dependent, dependents_dependencies in zip(dependent_jobs, + dependencies_statuses): + + # Enqueue this dependent job only if all of it's _other_ + # dependencies are FINISHED. + if not all( + status == JobStatus.FINISHED + for job_id, status + in dependents_dependencies + if job_id != job.id + ): + continue registry = DeferredJobRegistry(dependent.origin, self.connection, diff --git a/tests/test_queue.py b/tests/test_queue.py index 7004aad..4b43677 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import json from datetime import datetime, timedelta +from mock.mock import patch from rq import Queue from rq.compat import utc @@ -524,6 +525,102 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) + def test_enqueue_job_with_multiple_queued_dependencies(self): + + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.QUEUED + job.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in parent_jobs] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueue_job_with_multiple_finished_dependencies(self): + + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.FINISHED + job.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in parent_jobs] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueues_dependent_if_other_dependencies_finished(self): + + started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) + started_dependency.save() + + finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) + finished_dependency.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + + q.enqueue_dependents(started_dependency) + self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [dependent_job.id]) + + def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self): + + started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) + started_dependency.save() + + queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED) + queued_dependency.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in [started_dependency, queued_dependency]] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + + q.enqueue_dependents(started_dependency) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.job_ids, []) + def test_fetch_job_successful(self): """Fetch a job from a queue.""" q = Queue('example') From 7ea5a32a55c59b05da849b3aa3d9e20a135df45a Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Dec 2019 23:06:22 -0500 Subject: [PATCH 03/16] Alway set status 'FINISHED' when job is Successful Method Queue#enqueue_dependents checks the status of all dependencies of all dependents, and enqueues those dependents for which all dependencies are FINISHED. The enqueue_dependents method WAS called from Worker#handle_job_success called BEFORE the status of the successful job was set in Redis, so enqueue_dependents explicitly excluded the _successful_ job from interrogation of dependency statuses as the it would never be true in the existing code path, but it was assumed that this would be final status after the current pipeline was executed. This commit changes Worker#handle_job_success so that it persists the status of the successful job to Redis, everytime a job completes(not only if it has a ttl) and does so before enqueue_dependents is called. This allows for enqueue_dependents to be less reliant on the out of band state of the current _successful job being handled_. --- rq/queue.py | 1 - rq/worker.py | 32 ++++++++++++++++++-------------- tests/test_queue.py | 27 ++++++++++++++++++++------- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a354d1c..57aa3a6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -480,7 +480,6 @@ class Queue(object): status == JobStatus.FINISHED for job_id, status in dependents_dependencies - if job_id != job.id ): continue diff --git a/rq/worker.py b/rq/worker.py index 7e090f5..553b457 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,20 +15,15 @@ import warnings from datetime import timedelta from uuid import uuid4 -try: - from signal import SIGKILL -except ImportError: - from signal import SIGTERM as SIGKILL - from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type -from .connections import get_current_connection, push_connection, pop_connection - -from .defaults import (DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .connections import (get_current_connection, pop_connection, + push_connection) +from .defaults import (DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, + DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL) from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -36,13 +31,22 @@ from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, enum, - make_colorizer, utcformat, utcnow, utcparse) +from .timeouts import (HorseMonitorTimeoutException, JobTimeoutException, + UnixSignalDeathPenalty) +from .utils import (backend_class, ensure_list, enum, make_colorizer, + utcformat, utcnow, utcparse) from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer +try: + from signal import SIGKILL +except ImportError: + from signal import SIGTERM as SIGKILL + + + + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -845,6 +849,7 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) @@ -856,7 +861,6 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - job.set_status(JobStatus.FINISHED, pipeline=pipeline) # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) diff --git a/tests/test_queue.py b/tests/test_queue.py index 4b43677..f776747 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -411,6 +411,9 @@ class TestQueue(RQTestCase): job_2 = q.enqueue(say_hello, depends_on=parent_job) registry = DeferredJobRegistry(q.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry.get_job_ids()), set([job_1.id, job_2.id]) @@ -441,6 +444,9 @@ class TestQueue(RQTestCase): set([job_1.id]) ) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry_2.get_job_ids()), set([job_2.id]) @@ -573,26 +579,33 @@ class TestQueue(RQTestCase): def test_enqueues_dependent_if_other_dependencies_finished(self): - started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) - started_dependency.save() + parent_jobs = [Job.create(func=say_hello) for _ in + range(2)] + + parent_jobs[0]._status = JobStatus.STARTED + parent_jobs[0].save() - finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) - finished_dependency.save() + parent_jobs[1]._status = JobStatus.FINISHED + parent_jobs[1].save() job_create = Job.create def create_job_patch(*args, **kwargs): # patch Job#create to set parent jobs as dependencies. job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] + job._dependency_ids = [job.id for job in parent_jobs] return job q = Queue() with patch.object(Job, 'create', create_job_patch) as patch_create_job: - dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + # dependent job deferred, b/c parent_job 0 is still 'started' + dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) - q.enqueue_dependents(started_dependency) + # now set parent job 0 to 'finished' + parent_jobs[0].set_status(JobStatus.FINISHED) + + q.enqueue_dependents(parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) self.assertEqual(q.job_ids, [dependent_job.id]) From 4440669f3ca44e025aedcf7123c3602c60b1a763 Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Dec 2019 23:52:05 -0500 Subject: [PATCH 04/16] Fix patches for python2 --- tests/test_queue.py | 70 +++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 40 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index f776747..1d14b3a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -24,6 +24,20 @@ class CustomJob(Job): pass +class MultipleDependencyJob(Job): + """ + Allows for the patching of `_dependency_ids` to simulate multi-dependency + support without modifying the public interface of `Job` + """ + create_job = Job.create + + @classmethod + def create(cls, *args, **kwargs): + dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids') + _job = cls.create_job(*args, **kwargs) + _job._dependency_ids = dependency_ids + return _job + class TestQueue(RQTestCase): def test_create_queue(self): """Creating queues.""" @@ -539,17 +553,10 @@ class TestQueue(RQTestCase): job._status = JobStatus.QUEUED job.save() - job_create = Job.create - - def create_job_patch(*args, **kwargs): - # patch Job#create to set parent jobs as dependencies. - job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in parent_jobs] - return job - q = Queue() - with patch.object(Job, 'create', create_job_patch) as patch_create_job: - job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids = [job.id for job in parent_jobs]) self.assertEqual(job.get_status(), JobStatus.DEFERRED) self.assertEqual(q.job_ids, []) self.assertEqual(job.fetch_dependencies(), parent_jobs) @@ -562,17 +569,10 @@ class TestQueue(RQTestCase): job._status = JobStatus.FINISHED job.save() - job_create = Job.create - - def create_job_patch(*args, **kwargs): - # patch Job#create to set parent jobs as dependencies. - job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in parent_jobs] - return job - q = Queue() - with patch.object(Job, 'create', create_job_patch) as patch_create_job: - job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids=[job.id for job in parent_jobs]) self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.fetch_dependencies(), parent_jobs) @@ -580,7 +580,7 @@ class TestQueue(RQTestCase): def test_enqueues_dependent_if_other_dependencies_finished(self): parent_jobs = [Job.create(func=say_hello) for _ in - range(2)] + range(3)] parent_jobs[0]._status = JobStatus.STARTED parent_jobs[0].save() @@ -588,18 +588,15 @@ class TestQueue(RQTestCase): parent_jobs[1]._status = JobStatus.FINISHED parent_jobs[1].save() - job_create = Job.create - - def create_job_patch(*args, **kwargs): - # patch Job#create to set parent jobs as dependencies. - job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in parent_jobs] - return job + parent_jobs[2]._status = JobStatus.FINISHED + parent_jobs[2].save() q = Queue() - with patch.object(Job, 'create', create_job_patch) as patch_create_job: + with patch('rq.queue.Job.create', + new=MultipleDependencyJob.create): # dependent job deferred, b/c parent_job 0 is still 'started' - dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids=[job.id for job in parent_jobs]) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) # now set parent job 0 to 'finished' @@ -617,17 +614,10 @@ class TestQueue(RQTestCase): queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED) queued_dependency.save() - job_create = Job.create - - def create_job_patch(*args, **kwargs): - # patch Job#create to set parent jobs as dependencies. - job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in [started_dependency, queued_dependency]] - return job - q = Queue() - with patch.object(Job, 'create', create_job_patch) as patch_create_job: - dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency], + _dependency_ids=[started_dependency.id, queued_dependency.id]) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) q.enqueue_dependents(started_dependency) From 540be93401bb8d4bbd02e0953e4ed242843c8c19 Mon Sep 17 00:00:00 2001 From: thomas Date: Sun, 15 Dec 2019 16:58:52 -0500 Subject: [PATCH 05/16] Undo extra formatting changes --- rq/worker.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 553b457..8678ad3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,15 +15,20 @@ import warnings from datetime import timedelta from uuid import uuid4 +try: + from signal import SIGKILL +except ImportError: + from signal import SIGTERM as SIGKILL + from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type -from .connections import (get_current_connection, pop_connection, - push_connection) -from .defaults import (DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, - DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL) +from .connections import get_current_connection, push_connection, pop_connection + +from .defaults import (DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -31,22 +36,13 @@ from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended -from .timeouts import (HorseMonitorTimeoutException, JobTimeoutException, - UnixSignalDeathPenalty) -from .utils import (backend_class, ensure_list, enum, make_colorizer, - utcformat, utcnow, utcparse) +from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty +from .utils import (backend_class, ensure_list, enum, + make_colorizer, utcformat, utcnow, utcparse) from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer -try: - from signal import SIGKILL -except ImportError: - from signal import SIGTERM as SIGKILL - - - - try: from setproctitle import setproctitle as setprocname except ImportError: From a69d91d2b2372a943b36288df9badfffbd86b22e Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Thu, 19 Dec 2019 21:11:58 -0500 Subject: [PATCH 06/16] Do not watch dependency key set --- rq/job.py | 6 ++---- tests/test_job.py | 26 -------------------------- 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/rq/job.py b/rq/job.py index 0661f7a..c45bb07 100644 --- a/rq/job.py +++ b/rq/job.py @@ -744,9 +744,8 @@ class Job(object): pipe = pipeline if pipeline is not None else self.connection if watch: - pipe.watch(self.dependencies_key) - pipe.watch(*[self.redis_job_namespace_prefix + as_text(_id) - for _id in pipe.smembers(self.dependencies_key)]) + pipe.watch(*[Job.key_for(_id) + for _id in self.connection.smembers(self.dependencies_key)]) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_field = self.redis_job_namespace_prefix + '*->status' @@ -762,5 +761,4 @@ class Job(object): return dependencies_statuses - _job_stack = LocalStack() diff --git a/tests/test_job.py b/tests/test_job.py index 835dc6f..e8f965a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -905,29 +905,3 @@ class TestJob(RQTestCase): with self.assertRaises(WatchError): pipeline.touch(Job.key_for(dependent_job.id)) pipeline.execute() - - def test_get_dependencies_statuses_watches_dependency_set(self): - queue = Queue(connection=self.testconn) - - dependency_job = queue.enqueue(fixtures.say_hello) - dependent_job = Job.create(func=fixtures.say_hello) - dependent_job._dependency_ids = [dependency_job.id] - dependent_job.register_dependency() - - with self.testconn.pipeline() as pipeline: - - dependent_job.get_dependencies_statuses( - pipeline=pipeline, - watch=True - ) - - self.testconn.sadd( - dependent_job.dependencies_key, - queue.enqueue(fixtures.say_hello).id, - ) - - pipeline.multi() - - with self.assertRaises(WatchError): - pipeline.touch(Job.key_for(dependent_job.id)) - pipeline.execute() From d5921814e4cb23fdb7a090b70e774639af9511e2 Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Thu, 19 Dec 2019 22:46:59 -0500 Subject: [PATCH 07/16] Change get_dependency_statuses to dependencies_finished Convert method on Job to return a boolean and rename. Also use fetch_many in Queue#enqueue_dependents. --- rq/job.py | 29 +++++++++++++---------------- rq/queue.py | 24 +++++++----------------- tests/test_job.py | 47 +++++++++++++++-------------------------------- 3 files changed, 35 insertions(+), 65 deletions(-) diff --git a/rq/job.py b/rq/job.py index c45bb07..810e1f6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -724,34 +724,29 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) - def get_dependencies_statuses( + def dependencies_finished( self, - watch=False, pipeline=None ): - """Returns a list of tuples containing the job ids and status of all - dependencies; e.g: + """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ - [('14462606-09c4-41c2-8bf1-fbd109092318', 'started'), - ('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...] - - As a minor optimization allowing callers to more quickly tell if all - dependencies are _FINISHED_, the returned list is sorted by the - `ended_at` timestamp, so those jobs which are not yet finished are at - the start of the list. + If a pipeline is passed, all dependencies are WATCHed. """ pipe = pipeline if pipeline is not None else self.connection - if watch: - pipe.watch(*[Job.key_for(_id) + if pipeline is not None: + pipe.watch(*[Job.key_for(as_text(_id)) for _id in self.connection.smembers(self.dependencies_key)]) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_field = self.redis_job_namespace_prefix + '*->status' - # Sorting here lexographically works because these dates are stored in - # an ISO 8601 format, so lexographic order is the same as + # As a minor optimization to more quickly tell if all dependencies + # are _FINISHED_, sort dependencies by the `ended_at` timestamp so + # those jobs which are not yet finished are at the start of the + # list. Sorting here lexographically works because these dates are + # stored in an ISO 8601 format, so lexographic order is the same as # chronological order. dependencies_statuses = [ (as_text(_id), as_text(status)) @@ -759,6 +754,8 @@ class Job(object): get=['#', get_field], alpha=True, groups=True, ) ] - return dependencies_statuses + return all(status == JobStatus.FINISHED + for job_id, status + in dependencies_statuses) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 57aa3a6..afe7f48 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -461,28 +461,18 @@ class Queue(object): if pipeline is None: pipe.watch(dependents_key) - dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) - for job_id in pipe.smembers(dependents_key)] + dependent_job_ids = [as_text(_id) + for _id in pipe.smembers(dependents_key)] - dependencies_statuses = [ - dependent.get_dependencies_statuses(watch=True, pipeline=pipe) - for dependent in dependent_jobs + dependent_jobs = [ + job for job in self.job_class.fetch_many(dependent_job_ids, + connection=self.connection) + if job.dependencies_finished(pipeline=pipe) ] pipe.multi() - for dependent, dependents_dependencies in zip(dependent_jobs, - dependencies_statuses): - - # Enqueue this dependent job only if all of it's _other_ - # dependencies are FINISHED. - if not all( - status == JobStatus.FINISHED - for job_id, status - in dependents_dependencies - ): - continue - + for dependent in dependent_jobs: registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) diff --git a/tests/test_job.py b/tests/test_job.py index e8f965a..394140a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -798,7 +798,7 @@ class TestJob(RQTestCase): pipeline.touch(dependency_job.id) pipeline.execute() - def test_get_dependencies_statuses_returns_ids_and_statuses(self): + def test_dependencies_finished_returns_false_if_dependencies_queued(self): queue = Queue(connection=self.testconn) dependency_job_ids = [ @@ -810,27 +810,21 @@ class TestJob(RQTestCase): dependent_job._dependency_ids = dependency_job_ids dependent_job.register_dependency() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertSetEqual( - set(dependencies_statuses), - {(_id, JobStatus.QUEUED) for _id in dependency_job_ids} - ) + self.assertFalse(dependencies_finished) - def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self): + def test_dependencies_finished_returns_true_if_no_dependencies(self): queue = Queue(connection=self.testconn) dependent_job = Job.create(func=fixtures.say_hello) dependent_job.register_dependency() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertListEqual( - dependencies_statuses, - [] - ) + self.assertTrue(dependencies_finished) - def test_get_dependencies_statuses_returns_ordered_by_end_time(self): + def test_dependencies_finished_returns_true_if_all_dependencies_finished(self): dependency_jobs = [ Job.create(fixtures.say_hello) for _ in range(5) @@ -842,19 +836,17 @@ class TestJob(RQTestCase): now = utcnow() + # Set ended_at timestamps for i, job in enumerate(dependency_jobs): job._status = JobStatus.FINISHED job.ended_at = now - timedelta(seconds=i) job.save() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertListEqual( - dependencies_statuses, - [(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)] - ) + self.assertTrue(dependencies_finished) - def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self): + def test_dependencies_finished_returns_false_if_unfinished_job(self): dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] dependency_jobs[0]._status = JobStatus.FINISHED @@ -871,19 +863,11 @@ class TestJob(RQTestCase): now = utcnow() - dependencies_statuses = dependent_job.get_dependencies_statuses() - - self.assertEqual( - dependencies_statuses[0], - (dependency_jobs[1].id, JobStatus.STARTED) - ) + dependencies_finished = dependent_job.dependencies_finished() - self.assertEqual( - dependencies_statuses[1], - (dependency_jobs[0].id, JobStatus.FINISHED) - ) + self.assertFalse(dependencies_finished) - def test_get_dependencies_statuses_watches_job(self): + def test_dependencies_finished_watches_job(self): queue = Queue(connection=self.testconn) dependency_job = queue.enqueue(fixtures.say_hello) @@ -894,9 +878,8 @@ class TestJob(RQTestCase): with self.testconn.pipeline() as pipeline: - dependent_job.get_dependencies_statuses( + dependent_job.dependencies_finished( pipeline=pipeline, - watch=True ) dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn) From 9f15df2d5567d6697e4f7bddb68400f6f9d845c5 Mon Sep 17 00:00:00 2001 From: thomas Date: Sun, 23 Feb 2020 00:00:03 -0500 Subject: [PATCH 08/16] rename dependencies_finished to dependencies_are_met --- rq/job.py | 2 +- rq/queue.py | 2 +- rq/worker.py | 1 + tests/test_job.py | 10 +++++----- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index 810e1f6..cf8948e 100644 --- a/rq/job.py +++ b/rq/job.py @@ -724,7 +724,7 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) - def dependencies_finished( + def dependencies_are_met( self, pipeline=None ): diff --git a/rq/queue.py b/rq/queue.py index afe7f48..4fe1b44 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -467,7 +467,7 @@ class Queue(object): dependent_jobs = [ job for job in self.job_class.fetch_many(dependent_job_ids, connection=self.connection) - if job.dependencies_finished(pipeline=pipe) + if job.dependencies_are_met(pipeline=pipe) ] pipe.multi() diff --git a/rq/worker.py b/rq/worker.py index 8678ad3..5161a24 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -845,6 +845,7 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + # TODO: This was moved job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) diff --git a/tests/test_job.py b/tests/test_job.py index 394140a..6ce717e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -810,7 +810,7 @@ class TestJob(RQTestCase): dependent_job._dependency_ids = dependency_job_ids dependent_job.register_dependency() - dependencies_finished = dependent_job.dependencies_finished() + dependencies_finished = dependent_job.dependencies_are_met() self.assertFalse(dependencies_finished) @@ -820,7 +820,7 @@ class TestJob(RQTestCase): dependent_job = Job.create(func=fixtures.say_hello) dependent_job.register_dependency() - dependencies_finished = dependent_job.dependencies_finished() + dependencies_finished = dependent_job.dependencies_are_met() self.assertTrue(dependencies_finished) @@ -842,7 +842,7 @@ class TestJob(RQTestCase): job.ended_at = now - timedelta(seconds=i) job.save() - dependencies_finished = dependent_job.dependencies_finished() + dependencies_finished = dependent_job.dependencies_are_met() self.assertTrue(dependencies_finished) @@ -863,7 +863,7 @@ class TestJob(RQTestCase): now = utcnow() - dependencies_finished = dependent_job.dependencies_finished() + dependencies_finished = dependent_job.dependencies_are_met() self.assertFalse(dependencies_finished) @@ -878,7 +878,7 @@ class TestJob(RQTestCase): with self.testconn.pipeline() as pipeline: - dependent_job.dependencies_finished( + dependent_job.dependencies_are_met( pipeline=pipeline, ) From 83fa6b23868e9e8dd7776768489f8521ab5fa021 Mon Sep 17 00:00:00 2001 From: thomas Date: Thu, 12 Mar 2020 22:31:42 -0400 Subject: [PATCH 09/16] Revert move of status update in `Worker#handle_job_success` When a job with dependents is _successful_ it's dependents are enqueued. Only if the FINISHing job's `result_ttl` is non-zero is the change in status persisted in Redis - that is, when each dependent job is enqueued, the _FINISHing_ job (,triggering the enqueueing,) has an _outdated_ status in redis. This avoids redundant call because if `result_ttl=0` then the job is deleted then deleted in `Job#cleanup`. In order to enqueue the dependents, we therefore _exclude_ the FINISHing job from the check if each dependents' dependencies have been met. --- rq/job.py | 52 ++++++++++++++++++++++++++++++++++++++++------------ rq/queue.py | 32 +++++++++++++++++++++----------- rq/worker.py | 3 +-- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/rq/job.py b/rq/job.py index cf8948e..8eaa7a3 100644 --- a/rq/job.py +++ b/rq/job.py @@ -5,10 +5,10 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings import zlib +from functools import partial from uuid import uuid4 from rq.compat import as_text, decode_redis_hash, string_types, text_type - from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -16,6 +16,15 @@ from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) from .serializers import resolve_serializer +try: + import cPickle as pickle +except ImportError: # noqa # pragma: no cover + import pickle + +# Serialize pickle dumps using the highest pickle protocol (binary, default +# uses ascii) +dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) +loads = pickle.loads JobStatus = enum( 'JobStatus', @@ -94,11 +103,13 @@ class Job(object): job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) elif isinstance(func, string_types): job._func_name = as_text(func) - elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance + elif not inspect.isclass(func) and hasattr(func, + '__call__'): # a callable class instance job._instance = func job._func_name = '__call__' else: - raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) + raise TypeError( + 'Expected a callable or a string, but got: {0}'.format(func)) job._args = args job._kwargs = kwargs @@ -113,7 +124,8 @@ class Job(object): # dependency could be job instance or id if depends_on is not None: - job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] + job._dependency_ids = [ + depends_on.id if isinstance(depends_on, Job) else depends_on] return job def get_status(self, refresh=True): @@ -401,7 +413,8 @@ class Job(object): for i, job in enumerate(jobs): if not job: - raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i])) + raise NoSuchJobError( + 'Dependency {0} does not exist'.format(self._dependency_ids[i])) return jobs @@ -459,8 +472,10 @@ class Job(object): except Exception as e: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa - self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get( + 'result_ttl') else None # noqa + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get( + 'failure_ttl') else None # noqa self._status = as_text(obj.get('status')) if obj.get('status') else None dependency_id = obj.get('dependency_id', None) @@ -725,19 +740,29 @@ class Job(object): connection.sadd(self.dependencies_key, dependency_id) def dependencies_are_met( - self, - pipeline=None + self, + pipeline=None, + exclude=None + ): """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. + + `exclude` allows us to exclude some job id from the status check. This is useful + when enqueueing the dependents of a _successful_ job -- that status of + `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this + method is _called_ in the _stack_ of it's dependents are being enqueued. """ + exclude = exclude or [] pipe = pipeline if pipeline is not None else self.connection + dependencies = self.connection.smembers(self.dependencies_key) + if pipeline is not None: pipe.watch(*[Job.key_for(as_text(_id)) - for _id in self.connection.smembers(self.dependencies_key)]) + for _id in dependencies]) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_field = self.redis_job_namespace_prefix + '*->status' @@ -751,11 +776,14 @@ class Job(object): dependencies_statuses = [ (as_text(_id), as_text(status)) for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, - get=['#', get_field], alpha=True, groups=True, ) + get=['#', get_field], alpha=True, + groups=True, ) ] return all(status == JobStatus.FINISHED for job_id, status - in dependencies_statuses) + in dependencies_statuses + if job_id not in exclude) + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 4fe1b44..6dba72f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings - from datetime import datetime from redis import WatchError @@ -66,7 +65,8 @@ class Queue(object): if 'async' in kwargs: self._is_async = kwargs['async'] - warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning) + warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', + DeprecationWarning) # override class attribute job_class if one was passed if job_class is not None: @@ -317,7 +317,8 @@ class Queue(object): pipe.multi() for dependency in dependencies: - if dependency.get_status(refresh=False) != JobStatus.FINISHED: + if dependency.get_status( + refresh=False) != JobStatus.FINISHED: job.set_status(JobStatus.DEFERRED, pipeline=pipe) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) @@ -379,8 +380,9 @@ class Queue(object): """Creates a job to represent the delayed function call and enqueues it.""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) - + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, + **kwargs) + return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, @@ -393,7 +395,8 @@ class Queue(object): from .registry import ScheduledJobRegistry (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, + **kwargs) job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, @@ -465,9 +468,14 @@ class Queue(object): for _id in pipe.smembers(dependents_key)] dependent_jobs = [ - job for job in self.job_class.fetch_many(dependent_job_ids, - connection=self.connection) - if job.dependencies_are_met(pipeline=pipe) + dependent_job for dependent_job + in self.job_class.fetch_many( + dependent_job_ids, + connection=self.connection + ) if dependent_job.dependencies_are_met( + pipeline=pipe, + exclude={job.id} + ) ] pipe.multi() @@ -480,7 +488,8 @@ class Queue(object): if dependent.origin == self.name: self.enqueue_job(dependent, pipeline=pipe) else: - queue = self.__class__(name=dependent.origin, connection=self.connection) + queue = self.__class__(name=dependent.origin, + connection=self.connection) queue.enqueue_job(dependent, pipeline=pipe) pipe.delete(dependents_key) @@ -519,7 +528,8 @@ class Queue(object): connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: - raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') + raise ValueError( + 'RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) diff --git a/rq/worker.py b/rq/worker.py index 5161a24..7e090f5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -845,8 +845,6 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) - # TODO: This was moved - job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) @@ -858,6 +856,7 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) From 01ebe25f5626f2b433932683fefbc00afd81cf5e Mon Sep 17 00:00:00 2001 From: thomas Date: Thu, 12 Mar 2020 23:49:49 -0400 Subject: [PATCH 10/16] Address Deleted Dependencies 1) Check if `created_at` when checking if dependencies are met. If `created_at` is `None` then the job has been deleted. This is sort of hack - we just need one of the fields on the job's hash that is ALWAYS populated. You can persist a job to redis without setting status... 2) Job#fetch_dependencies no longer raises NoSuchJob. If one of a job's dependencies has been deleted from Redis, it is not returned from `fetch_dependencies` and no exception is raised. --- rq/job.py | 43 +++++++++++++++++++++++++------------------ tests/test_job.py | 32 ++++++++++++++++++++++++++++---- tests/test_queue.py | 14 -------------- 3 files changed, 53 insertions(+), 36 deletions(-) diff --git a/rq/job.py b/rq/job.py index 8eaa7a3..3cad5ab 100644 --- a/rq/job.py +++ b/rq/job.py @@ -403,18 +403,17 @@ class Job(object): watch is true, then set WATCH on all the keys of all dependencies. Returned jobs will use self's connection, not the pipeline supplied. + + If a job has been deleted from redis, it is not returned. """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: connection.watch(*self._dependency_ids) - jobs = self.fetch_many(self._dependency_ids, connection=self.connection) - - for i, job in enumerate(jobs): - if not job: - raise NoSuchJobError( - 'Dependency {0} does not exist'.format(self._dependency_ids[i])) + jobs = [job for + job in self.fetch_many(self._dependency_ids, connection=self.connection) + if job] return jobs @@ -739,6 +738,12 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) + @property + def dependencies_job_ids(self): + dependencies = self.connection.smembers(self.dependencies_key) + return [Job.key_for(as_text(_id)) + for _id in dependencies] + def dependencies_are_met( self, pipeline=None, @@ -758,14 +763,15 @@ class Job(object): pipe = pipeline if pipeline is not None else self.connection - dependencies = self.connection.smembers(self.dependencies_key) - if pipeline is not None: - pipe.watch(*[Job.key_for(as_text(_id)) - for _id in dependencies]) + pipe.watch(*self.dependencies_job_ids) sort_by = self.redis_job_namespace_prefix + '*->ended_at' - get_field = self.redis_job_namespace_prefix + '*->status' + get_fields = ( + '#', + self.redis_job_namespace_prefix + '*->created_at', + self.redis_job_namespace_prefix + '*->status' + ) # As a minor optimization to more quickly tell if all dependencies # are _FINISHED_, sort dependencies by the `ended_at` timestamp so @@ -774,16 +780,17 @@ class Job(object): # stored in an ISO 8601 format, so lexographic order is the same as # chronological order. dependencies_statuses = [ - (as_text(_id), as_text(status)) - for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, - get=['#', get_field], alpha=True, - groups=True, ) + tuple(map(as_text, result)) + for result in pipe.sort(name=self.dependencies_key, by=sort_by, + get=get_fields, alpha=True, + groups=True, ) ] - return all(status == JobStatus.FINISHED - for job_id, status + # if `created_at` is None, then this has been deleted! + return all(status == JobStatus.FINISHED or not created_at + for dependency_id, created_at, status in dependencies_statuses - if job_id not in exclude) + if dependency_id not in exclude) _job_stack = LocalStack() diff --git a/tests/test_job.py b/tests/test_job.py index 6ce717e..0a0f49d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -8,7 +8,6 @@ import queue import zlib from datetime import datetime, timedelta -import pytest from redis import WatchError from rq.compat import PY2, as_text @@ -774,8 +773,12 @@ class TestJob(RQTestCase): dependency_job.delete() - with self.assertRaises(NoSuchJobError): - dependent_job.fetch_dependencies(pipeline=self.testconn) + self.assertNotIn( + dependent_job.id, + [job.id for job in dependent_job.fetch_dependencies( + pipeline=self.testconn + )] + ) def test_fetch_dependencies_watches(self): queue = Queue(connection=self.testconn) @@ -877,7 +880,6 @@ class TestJob(RQTestCase): dependent_job.register_dependency() with self.testconn.pipeline() as pipeline: - dependent_job.dependencies_are_met( pipeline=pipeline, ) @@ -888,3 +890,25 @@ class TestJob(RQTestCase): with self.assertRaises(WatchError): pipeline.touch(Job.key_for(dependent_job.id)) pipeline.execute() + + def test_can_enqueue_job_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + + w = Worker([queue]) + w.work(burst=True) + + assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + def test_dependents_are_met_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + w = Worker([queue]) + w.work(burst=True, max_jobs=1) + + assert dependent_job.get_status() == JobStatus.QUEUED + assert dependent_job.dependencies_are_met() diff --git a/tests/test_queue.py b/tests/test_queue.py index 1d14b3a..79dd646 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -531,20 +531,6 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) - def test_enqueue_job_with_invalid_dependency(self): - """Enqueuing a job fails, if the dependency does not exist at all.""" - parent_job = Job.create(func=say_hello) - # without save() the job is not visible to others - - q = Queue() - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job) - - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job.id) - - self.assertEqual(q.job_ids, []) - def test_enqueue_job_with_multiple_queued_dependencies(self): parent_jobs = [Job.create(func=say_hello) for _ in range(2)] From c0119a8a19c84add14a38bdfec9ed1f86aff1f9e Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Mar 2020 12:22:46 -0400 Subject: [PATCH 11/16] Undo formatting for coverage stats --- rq/job.py | 21 ++++++++------------- rq/queue.py | 20 +++++++------------- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/rq/job.py b/rq/job.py index 3cad5ab..6ae0633 100644 --- a/rq/job.py +++ b/rq/job.py @@ -21,6 +21,7 @@ try: except ImportError: # noqa # pragma: no cover import pickle + # Serialize pickle dumps using the highest pickle protocol (binary, default # uses ascii) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) @@ -103,13 +104,11 @@ class Job(object): job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) elif isinstance(func, string_types): job._func_name = as_text(func) - elif not inspect.isclass(func) and hasattr(func, - '__call__'): # a callable class instance + elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance job._instance = func job._func_name = '__call__' else: - raise TypeError( - 'Expected a callable or a string, but got: {0}'.format(func)) + raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) job._args = args job._kwargs = kwargs @@ -124,8 +123,7 @@ class Job(object): # dependency could be job instance or id if depends_on is not None: - job._dependency_ids = [ - depends_on.id if isinstance(depends_on, Job) else depends_on] + job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] return job def get_status(self, refresh=True): @@ -411,8 +409,8 @@ class Job(object): if watch and self._dependency_ids: connection.watch(*self._dependency_ids) - jobs = [job for - job in self.fetch_many(self._dependency_ids, connection=self.connection) + jobs = [job + for job in self.fetch_many(self._dependency_ids, connection=self.connection) if job] return jobs @@ -471,10 +469,8 @@ class Job(object): except Exception as e: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get( - 'result_ttl') else None # noqa - self.failure_ttl = int(obj.get('failure_ttl')) if obj.get( - 'failure_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self._status = as_text(obj.get('status')) if obj.get('status') else None dependency_id = obj.get('dependency_id', None) @@ -792,5 +788,4 @@ class Job(object): in dependencies_statuses if dependency_id not in exclude) - _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 6dba72f..6a0ed05 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -13,8 +13,8 @@ from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus -from .utils import backend_class, import_attribute, parse_timeout, utcnow from .serializers import resolve_serializer +from .utils import backend_class, import_attribute, parse_timeout, utcnow def compact(lst): @@ -65,8 +65,7 @@ class Queue(object): if 'async' in kwargs: self._is_async = kwargs['async'] - warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', - DeprecationWarning) + warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning) # override class attribute job_class if one was passed if job_class is not None: @@ -317,8 +316,7 @@ class Queue(object): pipe.multi() for dependency in dependencies: - if dependency.get_status( - refresh=False) != JobStatus.FINISHED: + if dependency.get_status(refresh=False) != JobStatus.FINISHED: job.set_status(JobStatus.DEFERRED, pipeline=pipe) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) @@ -380,8 +378,7 @@ class Queue(object): """Creates a job to represent the delayed function call and enqueues it.""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, - **kwargs) + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, @@ -395,8 +392,7 @@ class Queue(object): from .registry import ScheduledJobRegistry (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, - **kwargs) + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, @@ -488,8 +484,7 @@ class Queue(object): if dependent.origin == self.name: self.enqueue_job(dependent, pipeline=pipe) else: - queue = self.__class__(name=dependent.origin, - connection=self.connection) + queue = self.__class__(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipe) pipe.delete(dependents_key) @@ -528,8 +523,7 @@ class Queue(object): connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: - raise ValueError( - 'RQ does not support indefinite timeouts. Please pick a timeout value > 0') + raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) From c679c1af2f86adf6cf966582d743723fe2152e3d Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Tue, 14 Apr 2020 22:06:00 -0400 Subject: [PATCH 12/16] Change parameter name from `exclude` ... ...to `exclude_job_id`. Also make it a single id not a set. --- rq/job.py | 11 +++-------- rq/queue.py | 4 ++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/rq/job.py b/rq/job.py index 6ae0633..978532a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,6 +9,7 @@ from functools import partial from uuid import uuid4 from rq.compat import as_text, decode_redis_hash, string_types, text_type + from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -740,12 +741,7 @@ class Job(object): return [Job.key_for(as_text(_id)) for _id in dependencies] - def dependencies_are_met( - self, - pipeline=None, - exclude=None - - ): + def dependencies_are_met(self, exclude_job_id=None, pipeline=None): """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. @@ -755,7 +751,6 @@ class Job(object): `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this method is _called_ in the _stack_ of it's dependents are being enqueued. """ - exclude = exclude or [] pipe = pipeline if pipeline is not None else self.connection @@ -786,6 +781,6 @@ class Job(object): return all(status == JobStatus.FINISHED or not created_at for dependency_id, created_at, status in dependencies_statuses - if dependency_id not in exclude) + if not (exclude_job_id and dependency_id == exclude_job_id)) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 6a0ed05..c545f43 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -469,8 +469,8 @@ class Queue(object): dependent_job_ids, connection=self.connection ) if dependent_job.dependencies_are_met( - pipeline=pipe, - exclude={job.id} + exclude_job_id=job.id, + pipeline=pipe ) ] From 0672cd00c6ab5268f83887e66990726904cbe1f9 Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Thu, 16 Apr 2020 23:13:06 -0400 Subject: [PATCH 13/16] Revisions * Rename `dependent_jobs` to `jobs_to_enqueue` in queue.py * Rename `dependencies_job_ids` to `dependency_ids`. * Remove `as_text` (no more python2 support). Use `bytes.decode` --- rq/job.py | 8 ++++---- rq/queue.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index 978532a..51b76d8 100644 --- a/rq/job.py +++ b/rq/job.py @@ -13,9 +13,9 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack +from .serializers import resolve_serializer from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) -from .serializers import resolve_serializer try: import cPickle as pickle @@ -736,9 +736,9 @@ class Job(object): connection.sadd(self.dependencies_key, dependency_id) @property - def dependencies_job_ids(self): + def dependency_ids(self): dependencies = self.connection.smembers(self.dependencies_key) - return [Job.key_for(as_text(_id)) + return [Job.key_for(_id.decode()) for _id in dependencies] def dependencies_are_met(self, exclude_job_id=None, pipeline=None): @@ -755,7 +755,7 @@ class Job(object): pipe = pipeline if pipeline is not None else self.connection if pipeline is not None: - pipe.watch(*self.dependencies_job_ids) + pipe.watch(*self.dependency_ids) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_fields = ( diff --git a/rq/queue.py b/rq/queue.py index c545f43..297d5e2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -463,7 +463,7 @@ class Queue(object): dependent_job_ids = [as_text(_id) for _id in pipe.smembers(dependents_key)] - dependent_jobs = [ + jobs_to_enqueue = [ dependent_job for dependent_job in self.job_class.fetch_many( dependent_job_ids, @@ -476,7 +476,7 @@ class Queue(object): pipe.multi() - for dependent in dependent_jobs: + for dependent in jobs_to_enqueue: registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) From 0b528dae4b763ee8f1f7af9d3a5fd85b301fa6e6 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 27 Apr 2020 14:53:23 -0400 Subject: [PATCH 14/16] Update Job#dependencies_are_met ... ... such that it fetch all dependency status using SMEMBERS and HGET rather than SORT. --- rq/job.py | 43 ++++++++++++++++++------------------------- rq/queue.py | 2 +- tests/test_job.py | 2 +- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/rq/job.py b/rq/job.py index 51b76d8..a97e4ef 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,12 +4,12 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings -import zlib from functools import partial from uuid import uuid4 -from rq.compat import as_text, decode_redis_hash, string_types, text_type +import zlib +from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -752,35 +752,28 @@ class Job(object): method is _called_ in the _stack_ of it's dependents are being enqueued. """ - pipe = pipeline if pipeline is not None else self.connection + connection = pipeline if pipeline is not None else self.connection if pipeline is not None: - pipe.watch(*self.dependency_ids) + connection.watch(*self.dependency_ids) - sort_by = self.redis_job_namespace_prefix + '*->ended_at' - get_fields = ( - '#', - self.redis_job_namespace_prefix + '*->created_at', - self.redis_job_namespace_prefix + '*->status' - ) + dependencies_ids = {_id.decode() + for _id in connection.smembers(self.dependencies_key)} + + if exclude_job_id: + dependencies_ids.discard(exclude_job_id) - # As a minor optimization to more quickly tell if all dependencies - # are _FINISHED_, sort dependencies by the `ended_at` timestamp so - # those jobs which are not yet finished are at the start of the - # list. Sorting here lexographically works because these dates are - # stored in an ISO 8601 format, so lexographic order is the same as - # chronological order. dependencies_statuses = [ - tuple(map(as_text, result)) - for result in pipe.sort(name=self.dependencies_key, by=sort_by, - get=get_fields, alpha=True, - groups=True, ) + connection.hget(self.key_for(key), 'status') + for key + in dependencies_ids ] - # if `created_at` is None, then this has been deleted! - return all(status == JobStatus.FINISHED or not created_at - for dependency_id, created_at, status - in dependencies_statuses - if not (exclude_job_id and dependency_id == exclude_job_id)) + return all( + status.decode() == JobStatus.FINISHED + for status + in dependencies_statuses + if status + ) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 297d5e2..bfaa0fa 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -282,7 +282,7 @@ class Queue(object): at_front=False, meta=None): """Creates a job to represent the delayed function call and enqueues it. - +nd It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. diff --git a/tests/test_job.py b/tests/test_job.py index 0a0f49d..670a6a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -910,5 +910,5 @@ class TestJob(RQTestCase): w = Worker([queue]) w.work(burst=True, max_jobs=1) - assert dependent_job.get_status() == JobStatus.QUEUED assert dependent_job.dependencies_are_met() + assert dependent_job.get_status() == JobStatus.QUEUED From 33e4beacf4118e6745543504b737d6db780e3e5d Mon Sep 17 00:00:00 2001 From: thomas Date: Wed, 13 May 2020 23:12:45 -0400 Subject: [PATCH 15/16] pipeline calls to get dependency statuses --- rq/job.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index a97e4ef..f0f28ec 100644 --- a/rq/job.py +++ b/rq/job.py @@ -763,11 +763,11 @@ class Job(object): if exclude_job_id: dependencies_ids.discard(exclude_job_id) - dependencies_statuses = [ - connection.hget(self.key_for(key), 'status') - for key - in dependencies_ids - ] + with connection.pipeline() as pipeline: + for key in dependencies_ids: + pipeline.hget(self.key_for(key), 'status') + + dependencies_statuses = pipeline.execute() return all( status.decode() == JobStatus.FINISHED From ec2f8cb4ed3405437b4f78a387b29db26f2e59c3 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2020 19:46:10 +0700 Subject: [PATCH 16/16] Don't try to import cPickle --- rq/job.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index c7a67d1..69422da 100644 --- a/rq/job.py +++ b/rq/job.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import inspect +import pickle import warnings import zlib @@ -18,12 +19,6 @@ from .serializers import resolve_serializer from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) -try: - import cPickle as pickle -except ImportError: # noqa # pragma: no cover - import pickle - - # Serialize pickle dumps using the highest pickle protocol (binary, default # uses ascii) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)