diff --git a/rq/job.py b/rq/job.py index 4981f12..daf08a7 100644 --- a/rq/job.py +++ b/rq/job.py @@ -11,7 +11,7 @@ from uuid import uuid4 from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection -from .exceptions import NoSuchJobError, UnpickleError +from .exceptions import InvalidJobDependency, NoSuchJobError, UnpickleError from .local import LocalStack from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) @@ -140,8 +140,10 @@ class Job(object): job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] return job - def get_status(self): - self._status = as_text(self.connection.hget(self.key, 'status')) + def get_status(self, refresh=True): + if refresh: + self._status = as_text(self.connection.hget(self.key, 'status')) + return self._status def set_status(self, status, pipeline=None): @@ -297,8 +299,13 @@ class Job(object): return job @classmethod - def fetch_many(cls, job_ids, connection=None): - """Bulk version of Job.fetch""" + def fetch_many(cls, job_ids, connection): + """ + Bulk version of Job.fetch + + For any job_ids which a job does not exist, the corresponding item in + the returned list will be None. + """ with connection.pipeline() as pipeline: for job_id in job_ids: pipeline.hgetall(cls.key_for(job_id)) @@ -393,6 +400,31 @@ class Job(object): """The Redis key that is used to store job dependents hash under.""" return self.dependents_key_for(self.id) + @property + def dependencies_key(self): + return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) + + def fetch_dependencies(self, watch=False, pipeline=None): + """ + Fetch all of a job's dependencies. If a pipeline is supplied, and + watch is true, then set WATCH on all the keys of all dependencies. + + Returned jobs will use self's connection, not the pipeline supplied. + """ + connection = pipeline if pipeline is not None else self.connection + + if watch and self._dependency_ids: + connection.watch(*self._dependency_ids) + + jobs = self.fetch_many(self._dependency_ids, connection=self.connection) + + for i, job in enumerate(jobs): + if not job: + raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i])) + + return jobs + + @property def result(self): """Returns the return value of the job. @@ -444,7 +476,7 @@ class Job(object): self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa - self._status = obj.get('status') if obj.get('status') else None + self._status = as_text(obj.get('status')) if obj.get('status') else None dependency_id = obj.get('dependency_id', None) self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] @@ -591,8 +623,7 @@ class Job(object): if delete_dependents: self.delete_dependents(pipeline=pipeline) - connection.delete(self.key) - connection.delete(self.dependents_key) + connection.delete(self.key, self.dependents_key, self.dependencies_key) def delete_dependents(self, pipeline=None): """Delete jobs depending on this job.""" @@ -668,6 +699,8 @@ class Job(object): elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) + connection.expire(self.dependents_key, ttl) + connection.expire(self.dependencies_key, ttl) @property def failed_job_registry(self): @@ -698,5 +731,6 @@ class Job(object): for dependency_id in self._dependency_ids: dependents_key = self.dependents_key_for(dependency_id) connection.sadd(dependents_key, self.id) + connection.sadd(self.dependencies_key, dependency_id) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index e68dae0..135a99a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -13,7 +13,7 @@ from .defaults import DEFAULT_RESULT_TTL from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError, UnpickleError) from .job import Job, JobStatus -from .utils import backend_class, import_attribute, utcnow, parse_timeout +from .utils import backend_class, import_attribute, parse_timeout, utcnow def compact(lst): @@ -252,39 +252,41 @@ class Queue(object): depends_on=depends_on, timeout=timeout, id=job_id, origin=self.name, meta=meta) - # If job depends on an unfinished job, register itself on it's - # parent's dependents instead of enqueueing it. - # If WatchError is raised in the process, that means something else is - # modifying the dependency. In this case we simply retry + # If a _dependent_ job depends on any unfinished job, register all the + #_dependent_ job's dependencies instead of enqueueing it. + # + # `Job#fetch_dependencies` sets WATCH on all dependencies. If + # WatchError is raised in the when the pipeline is executed, that means + # something else has modified either the set of dependencies or the + # status of one of them. In this case, we simply retry. if depends_on is not None: - if not isinstance(depends_on, self.job_class): - depends_on = self.job_class(id=depends_on, - connection=self.connection) with self.connection.pipeline() as pipe: while True: try: - pipe.watch(depends_on.key) - - # If the dependency does not exist, raise an - # exception to avoid creating an orphaned job. - if not self.job_class.exists(depends_on.id, - self.connection): - raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id)) - - if depends_on.get_status() != JobStatus.FINISHED: - pipe.multi() - job.set_status(JobStatus.DEFERRED) - job.register_dependency(pipeline=pipe) - job.save(pipeline=pipe) - job.cleanup(ttl=job.ttl, pipeline=pipe) - pipe.execute() - return job + + pipe.watch(job.dependencies_key) + + dependencies = job.fetch_dependencies( + watch=True, + pipeline=pipe + ) + + pipe.multi() + + for dependency in dependencies: + if dependency.get_status(refresh=False) != JobStatus.FINISHED: + job.set_status(JobStatus.DEFERRED, pipeline=pipe) + job.register_dependency(pipeline=pipe) + job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) + pipe.execute() + return job + break except WatchError: continue job = self.enqueue_job(job, at_front=at_front) - return job def run_job(self, job): diff --git a/tests/test_job.py b/tests/test_job.py index 95a2358..8f168d2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,6 +7,8 @@ import time import zlib from datetime import datetime +from redis import WatchError + from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import Job, JobStatus, cancel_job, get_current_job @@ -23,8 +25,6 @@ if is_py2: else: import queue as queue - - try: from cPickle import loads, dumps except ImportError: @@ -45,7 +45,8 @@ class TestJob(RQTestCase): expected_string = "myfunc(12, '☃', null=None, snowman='☃')" else: # Python 2 - expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') + expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode( + 'utf-8') self.assertEqual( job.description, @@ -241,7 +242,8 @@ class TestJob(RQTestCase): def test_store_then_fetch(self): """Store, then fetch.""" - job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), + kwargs=dict(z=2)) job.save() job2 = Job.fetch(job.id) @@ -331,7 +333,6 @@ class TestJob(RQTestCase): job.refresh() self.assertEqual(job.data, job_data) - def test_custom_meta_is_persisted(self): """Additional meta data on jobs are stored persisted correctly.""" job = Job.create(func=fixtures.say_hello, args=('Lionel',)) @@ -397,7 +398,8 @@ class TestJob(RQTestCase): def test_description_is_persisted(self): """Ensure that job's custom description is set properly""" - job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!') + job = Job.create(func=fixtures.say_hello, args=('Lionel',), + description='Say hello!') job.save() Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.description, 'Say hello!') @@ -499,6 +501,22 @@ class TestJob(RQTestCase): job.cleanup(ttl=0) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) + def test_cleanup_expires_dependency_keys(self): + + dependency_job = Job.create(func=fixtures.say_hello) + dependency_job.save() + + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + + dependent_job.cleanup(ttl=100) + dependency_job.cleanup(ttl=100) + + self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100) + self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100) + def test_job_with_dependents_delete_parent(self): """job.delete() deletes itself from Redis but not dependents. Wthout a save, the dependent job is never saved into redis. The delete @@ -613,6 +631,33 @@ class TestJob(RQTestCase): self.assertNotIn(job.id, queue.get_job_ids()) + def test_dependent_job_creates_dependencies_key(self): + + queue = Queue(connection=self.testconn) + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + + self.assertTrue(self.testconn.exists(dependent_job.dependencies_key)) + + def test_dependent_job_deletes_dependencies_key(self): + """ + job.delete() deletes itself from Redis. + """ + queue = Queue(connection=self.testconn) + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + dependent_job.delete() + + self.assertTrue(self.testconn.exists(dependency_job.key)) + self.assertFalse(self.testconn.exists(dependent_job.dependencies_key)) + self.assertFalse(self.testconn.exists(dependent_job.key)) + def test_create_job_with_id(self): """test creating jobs with a custom ID""" queue = Queue(connection=self.testconn) @@ -626,7 +671,8 @@ class TestJob(RQTestCase): """test call string with unicode keyword arguments""" queue = Queue(connection=self.testconn) - job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject()) + job = queue.enqueue(fixtures.echo, + arg_with_unicode=fixtures.UnicodeStringObject()) self.assertIsNotNone(job.get_call_string()) job.perform() @@ -665,3 +711,67 @@ class TestJob(RQTestCase): key = Job.key_for(job_id=job_id) assert key == (Job.redis_job_namespace_prefix + job_id).encode('utf-8') + + def test_dependencies_key_should_have_prefixed_job_id(self): + job_id = 'random' + job = Job(id=job_id) + expected_key = Job.redis_job_namespace_prefix + ":" + job_id + ':dependencies' + + assert job.dependencies_key == expected_key + + def test_fetch_dependencies_returns_dependency_jobs(self): + queue = Queue(connection=self.testconn) + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + + dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn) + + self.assertListEqual(dependencies, [dependency_job]) + + def test_fetch_dependencies_returns_empty_if_not_dependent_job(self): + queue = Queue(connection=self.testconn) + dependent_job = Job.create(func=fixtures.say_hello) + + dependent_job.register_dependency() + dependent_job.save() + + dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn) + + self.assertListEqual(dependencies, []) + + def test_fetch_dependencies_raises_if_dependency_deleted(self): + queue = Queue(connection=self.testconn) + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + + dependency_job.delete() + + with self.assertRaises(NoSuchJobError): + dependent_job.fetch_dependencies(pipeline=self.testconn) + + def test_fetch_dependencies_watches(self): + queue = Queue(connection=self.testconn) + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + + dependent_job.register_dependency() + dependent_job.save() + + with self.testconn.pipeline() as pipeline: + dependent_job.fetch_dependencies( + watch=True, + pipeline=pipeline + ) + + pipeline.multi() + + with self.assertRaises(WatchError): + self.testconn.set(dependency_job.id, 'somethingelsehappened') + pipeline.touch(dependency_job.id) + pipeline.execute() diff --git a/tests/test_queue.py b/tests/test_queue.py index 20fb058..22b6b11 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,14 +2,13 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from tests import RQTestCase -from tests.fixtures import echo, say_hello - from rq import Queue -from rq.exceptions import InvalidJobDependency +from rq.exceptions import InvalidJobDependency, NoSuchJobError from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker +from tests import RQTestCase +from tests.fixtures import echo, say_hello class CustomJob(Job): @@ -488,10 +487,10 @@ class TestQueue(RQTestCase): # without save() the job is not visible to others q = Queue() - with self.assertRaises(InvalidJobDependency): + with self.assertRaises(NoSuchJobError): q.enqueue_call(say_hello, depends_on=parent_job) - with self.assertRaises(InvalidJobDependency): + with self.assertRaises(NoSuchJobError): q.enqueue_call(say_hello, depends_on=parent_job.id) self.assertEqual(q.job_ids, [])