Revert move of status update in `Worker#handle_job_success`

When a job with dependents is _successful_ it's dependents are enqueued. Only if the FINISHing job's `result_ttl` is non-zero is the change in status persisted in Redis - that is, when each dependent job is enqueued, the _FINISHing_ job (,triggering the enqueueing,) has an _outdated_ status in redis. This avoids redundant call because if `result_ttl=0` then the job is deleted then deleted in `Job#cleanup`.

In order to enqueue the dependents, we therefore _exclude_ the FINISHing job from the check if each dependents' dependencies have been met.
main
thomas 5 years ago committed by Thomas Matecki
parent 9f15df2d55
commit 83fa6b2386

@ -5,10 +5,10 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
import zlib
from functools import partial
from uuid import uuid4
from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
@ -16,6 +16,15 @@ from .utils import (enum, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow)
from .serializers import resolve_serializer
try:
import cPickle as pickle
except ImportError: # noqa # pragma: no cover
import pickle
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
JobStatus = enum(
'JobStatus',
@ -94,11 +103,13 @@ class Job(object):
job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
elif isinstance(func, string_types):
job._func_name = as_text(func)
elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
elif not inspect.isclass(func) and hasattr(func,
'__call__'): # a callable class instance
job._instance = func
job._func_name = '__call__'
else:
raise TypeError('Expected a callable or a string, but got: {0}'.format(func))
raise TypeError(
'Expected a callable or a string, but got: {0}'.format(func))
job._args = args
job._kwargs = kwargs
@ -113,7 +124,8 @@ class Job(object):
# dependency could be job instance or id
if depends_on is not None:
job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
job._dependency_ids = [
depends_on.id if isinstance(depends_on, Job) else depends_on]
return job
def get_status(self, refresh=True):
@ -401,7 +413,8 @@ class Job(object):
for i, job in enumerate(jobs):
if not job:
raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i]))
raise NoSuchJobError(
'Dependency {0} does not exist'.format(self._dependency_ids[i]))
return jobs
@ -459,8 +472,10 @@ class Job(object):
except Exception as e:
self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self.result_ttl = int(obj.get('result_ttl')) if obj.get(
'result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get(
'failure_ttl') else None # noqa
self._status = as_text(obj.get('status')) if obj.get('status') else None
dependency_id = obj.get('dependency_id', None)
@ -725,19 +740,29 @@ class Job(object):
connection.sadd(self.dependencies_key, dependency_id)
def dependencies_are_met(
self,
pipeline=None
self,
pipeline=None,
exclude=None
):
"""Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
`exclude` allows us to exclude some job id from the status check. This is useful
when enqueueing the dependents of a _successful_ job -- that status of
`FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
method is _called_ in the _stack_ of it's dependents are being enqueued.
"""
exclude = exclude or []
pipe = pipeline if pipeline is not None else self.connection
dependencies = self.connection.smembers(self.dependencies_key)
if pipeline is not None:
pipe.watch(*[Job.key_for(as_text(_id))
for _id in self.connection.smembers(self.dependencies_key)])
for _id in dependencies])
sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status'
@ -751,11 +776,14 @@ class Job(object):
dependencies_statuses = [
(as_text(_id), as_text(status))
for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by,
get=['#', get_field], alpha=True, groups=True, )
get=['#', get_field], alpha=True,
groups=True, )
]
return all(status == JobStatus.FINISHED
for job_id, status
in dependencies_statuses)
in dependencies_statuses
if job_id not in exclude)
_job_stack = LocalStack()

@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
from datetime import datetime
from redis import WatchError
@ -66,7 +65,8 @@ class Queue(object):
if 'async' in kwargs:
self._is_async = kwargs['async']
warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
warnings.warn('The `async` keyword is deprecated. Use `is_async` instead',
DeprecationWarning)
# override class attribute job_class if one was passed
if job_class is not None:
@ -317,7 +317,8 @@ class Queue(object):
pipe.multi()
for dependency in dependencies:
if dependency.get_status(refresh=False) != JobStatus.FINISHED:
if dependency.get_status(
refresh=False) != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
@ -379,7 +380,8 @@ class Queue(object):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
**kwargs)
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
@ -393,7 +395,8 @@ class Queue(object):
from .registry import ScheduledJobRegistry
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args,
**kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
@ -465,9 +468,14 @@ class Queue(object):
for _id in pipe.smembers(dependents_key)]
dependent_jobs = [
job for job in self.job_class.fetch_many(dependent_job_ids,
connection=self.connection)
if job.dependencies_are_met(pipeline=pipe)
dependent_job for dependent_job
in self.job_class.fetch_many(
dependent_job_ids,
connection=self.connection
) if dependent_job.dependencies_are_met(
pipeline=pipe,
exclude={job.id}
)
]
pipe.multi()
@ -480,7 +488,8 @@ class Queue(object):
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe)
else:
queue = self.__class__(name=dependent.origin, connection=self.connection)
queue = self.__class__(name=dependent.origin,
connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
pipe.delete(dependents_key)
@ -519,7 +528,8 @@ class Queue(object):
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
raise ValueError(
'RQ does not support indefinite timeouts. Please pick a timeout value > 0')
result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)

@ -845,8 +845,6 @@ class Worker(object):
# if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute()
pipeline.watch(job.dependents_key)
# TODO: This was moved
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
@ -858,6 +856,7 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)

Loading…
Cancel
Save