Address Deleted Dependencies

1) Check if `created_at` when checking if dependencies are met.

   If `created_at` is `None` then the job has been deleted. This is sort of hack - we just need one of the fields on the job's hash that is ALWAYS populated. You can persist a job to redis without setting status...

2) Job#fetch_dependencies no longer raises NoSuchJob.

   If one of a job's dependencies has been deleted from Redis, it is not returned from `fetch_dependencies` and no exception is raised.
main
thomas 5 years ago committed by Thomas Matecki
parent 83fa6b2386
commit 01ebe25f56

@ -403,18 +403,17 @@ class Job(object):
watch is true, then set WATCH on all the keys of all dependencies.
Returned jobs will use self's connection, not the pipeline supplied.
If a job has been deleted from redis, it is not returned.
"""
connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids:
connection.watch(*self._dependency_ids)
jobs = self.fetch_many(self._dependency_ids, connection=self.connection)
for i, job in enumerate(jobs):
if not job:
raise NoSuchJobError(
'Dependency {0} does not exist'.format(self._dependency_ids[i]))
jobs = [job for
job in self.fetch_many(self._dependency_ids, connection=self.connection)
if job]
return jobs
@ -739,6 +738,12 @@ class Job(object):
connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id)
@property
def dependencies_job_ids(self):
dependencies = self.connection.smembers(self.dependencies_key)
return [Job.key_for(as_text(_id))
for _id in dependencies]
def dependencies_are_met(
self,
pipeline=None,
@ -758,14 +763,15 @@ class Job(object):
pipe = pipeline if pipeline is not None else self.connection
dependencies = self.connection.smembers(self.dependencies_key)
if pipeline is not None:
pipe.watch(*[Job.key_for(as_text(_id))
for _id in dependencies])
pipe.watch(*self.dependencies_job_ids)
sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status'
get_fields = (
'#',
self.redis_job_namespace_prefix + '*->created_at',
self.redis_job_namespace_prefix + '*->status'
)
# As a minor optimization to more quickly tell if all dependencies
# are _FINISHED_, sort dependencies by the `ended_at` timestamp so
@ -774,16 +780,17 @@ class Job(object):
# stored in an ISO 8601 format, so lexographic order is the same as
# chronological order.
dependencies_statuses = [
(as_text(_id), as_text(status))
for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by,
get=['#', get_field], alpha=True,
groups=True, )
tuple(map(as_text, result))
for result in pipe.sort(name=self.dependencies_key, by=sort_by,
get=get_fields, alpha=True,
groups=True, )
]
return all(status == JobStatus.FINISHED
for job_id, status
# if `created_at` is None, then this has been deleted!
return all(status == JobStatus.FINISHED or not created_at
for dependency_id, created_at, status
in dependencies_statuses
if job_id not in exclude)
if dependency_id not in exclude)
_job_stack = LocalStack()

@ -8,7 +8,6 @@ import queue
import zlib
from datetime import datetime, timedelta
import pytest
from redis import WatchError
from rq.compat import PY2, as_text
@ -774,8 +773,12 @@ class TestJob(RQTestCase):
dependency_job.delete()
with self.assertRaises(NoSuchJobError):
dependent_job.fetch_dependencies(pipeline=self.testconn)
self.assertNotIn(
dependent_job.id,
[job.id for job in dependent_job.fetch_dependencies(
pipeline=self.testconn
)]
)
def test_fetch_dependencies_watches(self):
queue = Queue(connection=self.testconn)
@ -877,7 +880,6 @@ class TestJob(RQTestCase):
dependent_job.register_dependency()
with self.testconn.pipeline() as pipeline:
dependent_job.dependencies_are_met(
pipeline=pipeline,
)
@ -888,3 +890,25 @@ class TestJob(RQTestCase):
with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()
def test_can_enqueue_job_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
w = Worker([queue])
w.work(burst=True)
assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
def test_dependents_are_met_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
w = Worker([queue])
w.work(burst=True, max_jobs=1)
assert dependent_job.get_status() == JobStatus.QUEUED
assert dependent_job.dependencies_are_met()

@ -531,20 +531,6 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_invalid_dependency(self):
"""Enqueuing a job fails, if the dependency does not exist at all."""
parent_job = Job.create(func=say_hello)
# without save() the job is not visible to others
q = Queue()
with self.assertRaises(NoSuchJobError):
q.enqueue_call(say_hello, depends_on=parent_job)
with self.assertRaises(NoSuchJobError):
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
def test_enqueue_job_with_multiple_queued_dependencies(self):
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]

Loading…
Cancel
Save