diff --git a/rq/job.py b/rq/job.py index 2c1251d..69422da 100644 --- a/rq/job.py +++ b/rq/job.py @@ -3,20 +3,26 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import inspect +import pickle import warnings import zlib + +from functools import partial from uuid import uuid4 from rq.compat import (as_text, decode_redis_hash, hmset, string_types, text_type) - from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack +from .serializers import resolve_serializer from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) -from .serializers import resolve_serializer +# Serialize pickle dumps using the highest pickle protocol (binary, default +# uses ascii) +dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) +loads = pickle.loads JobStatus = enum( 'JobStatus', @@ -125,7 +131,7 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - connection = pipeline or self.connection + connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @property @@ -392,21 +398,20 @@ class Job(object): watch is true, then set WATCH on all the keys of all dependencies. Returned jobs will use self's connection, not the pipeline supplied. + + If a job has been deleted from redis, it is not returned. """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: connection.watch(*self._dependency_ids) - jobs = self.fetch_many(self._dependency_ids, connection=self.connection) - - for i, job in enumerate(jobs): - if not job: - raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i])) + jobs = [job + for job in self.fetch_many(self._dependency_ids, connection=self.connection) + if job] return jobs - @property def result(self): """Returns the return value of the job. @@ -726,4 +731,45 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) + @property + def dependency_ids(self): + dependencies = self.connection.smembers(self.dependencies_key) + return [Job.key_for(_id.decode()) + for _id in dependencies] + + def dependencies_are_met(self, exclude_job_id=None, pipeline=None): + """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ + + If a pipeline is passed, all dependencies are WATCHed. + + `exclude` allows us to exclude some job id from the status check. This is useful + when enqueueing the dependents of a _successful_ job -- that status of + `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this + method is _called_ in the _stack_ of it's dependents are being enqueued. + """ + + connection = pipeline if pipeline is not None else self.connection + + if pipeline is not None: + connection.watch(*self.dependency_ids) + + dependencies_ids = {_id.decode() + for _id in connection.smembers(self.dependencies_key)} + + if exclude_job_id: + dependencies_ids.discard(exclude_job_id) + + with connection.pipeline() as pipeline: + for key in dependencies_ids: + pipeline.hget(self.key_for(key), 'status') + + dependencies_statuses = pipeline.execute() + + return all( + status.decode() == JobStatus.FINISHED + for status + in dependencies_statuses + if status + ) + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 0aebe8f..0b7afd7 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings - from datetime import datetime from redis import WatchError @@ -14,8 +13,8 @@ from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus -from .utils import backend_class, import_attribute, parse_timeout, utcnow from .serializers import resolve_serializer +from .utils import backend_class, import_attribute, parse_timeout, utcnow def compact(lst): @@ -283,7 +282,7 @@ class Queue(object): at_front=False, meta=None): """Creates a job to represent the delayed function call and enqueues it. - +nd It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. @@ -463,12 +462,23 @@ class Queue(object): if pipeline is None: pipe.watch(dependents_key) - dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) - for job_id in pipe.smembers(dependents_key)] + dependent_job_ids = [as_text(_id) + for _id in pipe.smembers(dependents_key)] + + jobs_to_enqueue = [ + dependent_job for dependent_job + in self.job_class.fetch_many( + dependent_job_ids, + connection=self.connection + ) if dependent_job.dependencies_are_met( + exclude_job_id=job.id, + pipeline=pipe + ) + ] pipe.multi() - for dependent in dependent_jobs: + for dependent in jobs_to_enqueue: registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) diff --git a/tests/test_job.py b/tests/test_job.py index 25c37e2..670a6a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -6,7 +6,7 @@ import json import time import queue import zlib -from datetime import datetime +from datetime import datetime, timedelta from redis import WatchError @@ -17,7 +17,7 @@ from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, ScheduledJobRegistry) -from rq.utils import utcformat +from rq.utils import utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures @@ -773,8 +773,12 @@ class TestJob(RQTestCase): dependency_job.delete() - with self.assertRaises(NoSuchJobError): - dependent_job.fetch_dependencies(pipeline=self.testconn) + self.assertNotIn( + dependent_job.id, + [job.id for job in dependent_job.fetch_dependencies( + pipeline=self.testconn + )] + ) def test_fetch_dependencies_watches(self): queue = Queue(connection=self.testconn) @@ -796,3 +800,115 @@ class TestJob(RQTestCase): self.testconn.set(dependency_job.id, 'somethingelsehappened') pipeline.touch(dependency_job.id) pipeline.execute() + + def test_dependencies_finished_returns_false_if_dependencies_queued(self): + queue = Queue(connection=self.testconn) + + dependency_job_ids = [ + queue.enqueue(fixtures.say_hello).id + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = dependency_job_ids + dependent_job.register_dependency() + + dependencies_finished = dependent_job.dependencies_are_met() + + self.assertFalse(dependencies_finished) + + def test_dependencies_finished_returns_true_if_no_dependencies(self): + queue = Queue(connection=self.testconn) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job.register_dependency() + + dependencies_finished = dependent_job.dependencies_are_met() + + self.assertTrue(dependencies_finished) + + def test_dependencies_finished_returns_true_if_all_dependencies_finished(self): + dependency_jobs = [ + Job.create(fixtures.say_hello) + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + # Set ended_at timestamps + for i, job in enumerate(dependency_jobs): + job._status = JobStatus.FINISHED + job.ended_at = now - timedelta(seconds=i) + job.save() + + dependencies_finished = dependent_job.dependencies_are_met() + + self.assertTrue(dependencies_finished) + + def test_dependencies_finished_returns_false_if_unfinished_job(self): + dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] + + dependency_jobs[0]._status = JobStatus.FINISHED + dependency_jobs[0].ended_at = utcnow() + dependency_jobs[0].save() + + dependency_jobs[1]._status = JobStatus.STARTED + dependency_jobs[1].ended_at = None + dependency_jobs[1].save() + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + dependencies_finished = dependent_job.dependencies_are_met() + + self.assertFalse(dependencies_finished) + + def test_dependencies_finished_watches_job(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [dependency_job.id] + dependent_job.register_dependency() + + with self.testconn.pipeline() as pipeline: + dependent_job.dependencies_are_met( + pipeline=pipeline, + ) + + dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn) + pipeline.multi() + + with self.assertRaises(WatchError): + pipeline.touch(Job.key_for(dependent_job.id)) + pipeline.execute() + + def test_can_enqueue_job_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + + w = Worker([queue]) + w.work(burst=True) + + assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + def test_dependents_are_met_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + w = Worker([queue]) + w.work(burst=True, max_jobs=1) + + assert dependent_job.dependencies_are_met() + assert dependent_job.get_status() == JobStatus.QUEUED diff --git a/tests/test_queue.py b/tests/test_queue.py index 7004aad..79dd646 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import json from datetime import datetime, timedelta +from mock.mock import patch from rq import Queue from rq.compat import utc @@ -23,6 +24,20 @@ class CustomJob(Job): pass +class MultipleDependencyJob(Job): + """ + Allows for the patching of `_dependency_ids` to simulate multi-dependency + support without modifying the public interface of `Job` + """ + create_job = Job.create + + @classmethod + def create(cls, *args, **kwargs): + dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids') + _job = cls.create_job(*args, **kwargs) + _job._dependency_ids = dependency_ids + return _job + class TestQueue(RQTestCase): def test_create_queue(self): """Creating queues.""" @@ -410,6 +425,9 @@ class TestQueue(RQTestCase): job_2 = q.enqueue(say_hello, depends_on=parent_job) registry = DeferredJobRegistry(q.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry.get_job_ids()), set([job_1.id, job_2.id]) @@ -440,6 +458,9 @@ class TestQueue(RQTestCase): set([job_1.id]) ) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry_2.get_job_ids()), set([job_2.id]) @@ -510,18 +531,83 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) - def test_enqueue_job_with_invalid_dependency(self): - """Enqueuing a job fails, if the dependency does not exist at all.""" - parent_job = Job.create(func=say_hello) - # without save() the job is not visible to others + def test_enqueue_job_with_multiple_queued_dependencies(self): + + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.QUEUED + job.save() q = Queue() - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job) + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids = [job.id for job in parent_jobs]) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueue_job_with_multiple_finished_dependencies(self): - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job.id) + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.FINISHED + job.save() + + q = Queue() + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids=[job.id for job in parent_jobs]) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueues_dependent_if_other_dependencies_finished(self): + + parent_jobs = [Job.create(func=say_hello) for _ in + range(3)] + + parent_jobs[0]._status = JobStatus.STARTED + parent_jobs[0].save() + + parent_jobs[1]._status = JobStatus.FINISHED + parent_jobs[1].save() + + parent_jobs[2]._status = JobStatus.FINISHED + parent_jobs[2].save() + + q = Queue() + with patch('rq.queue.Job.create', + new=MultipleDependencyJob.create): + # dependent job deferred, b/c parent_job 0 is still 'started' + dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0], + _dependency_ids=[job.id for job in parent_jobs]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + + # now set parent job 0 to 'finished' + parent_jobs[0].set_status(JobStatus.FINISHED) + + q.enqueue_dependents(parent_jobs[0]) + self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [dependent_job.id]) + + def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self): + + started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) + started_dependency.save() + + queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED) + queued_dependency.save() + + q = Queue() + with patch('rq.queue.Job.create', new=MultipleDependencyJob.create): + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency], + _dependency_ids=[started_dependency.id, queued_dependency.id]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + q.enqueue_dependents(started_dependency) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) self.assertEqual(q.job_ids, []) def test_fetch_job_successful(self):