diff --git a/rq/job.py b/rq/job.py index cf8948e..8eaa7a3 100644 --- a/rq/job.py +++ b/rq/job.py @@ -5,10 +5,10 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings import zlib +from functools import partial from uuid import uuid4 from rq.compat import as_text, decode_redis_hash, string_types, text_type - from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -16,6 +16,15 @@ from .utils import (enum, import_attribute, parse_timeout, str_to_date, utcformat, utcnow) from .serializers import resolve_serializer +try: + import cPickle as pickle +except ImportError: # noqa # pragma: no cover + import pickle + +# Serialize pickle dumps using the highest pickle protocol (binary, default +# uses ascii) +dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) +loads = pickle.loads JobStatus = enum( 'JobStatus', @@ -94,11 +103,13 @@ class Job(object): job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) elif isinstance(func, string_types): job._func_name = as_text(func) - elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance + elif not inspect.isclass(func) and hasattr(func, + '__call__'): # a callable class instance job._instance = func job._func_name = '__call__' else: - raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) + raise TypeError( + 'Expected a callable or a string, but got: {0}'.format(func)) job._args = args job._kwargs = kwargs @@ -113,7 +124,8 @@ class Job(object): # dependency could be job instance or id if depends_on is not None: - job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] + job._dependency_ids = [ + depends_on.id if isinstance(depends_on, Job) else depends_on] return job def get_status(self, refresh=True): @@ -401,7 +413,8 @@ class Job(object): for i, job in enumerate(jobs): if not job: - raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i])) + raise NoSuchJobError( + 'Dependency {0} does not exist'.format(self._dependency_ids[i])) return jobs @@ -459,8 +472,10 @@ class Job(object): except Exception as e: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa - self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get( + 'result_ttl') else None # noqa + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get( + 'failure_ttl') else None # noqa self._status = as_text(obj.get('status')) if obj.get('status') else None dependency_id = obj.get('dependency_id', None) @@ -725,19 +740,29 @@ class Job(object): connection.sadd(self.dependencies_key, dependency_id) def dependencies_are_met( - self, - pipeline=None + self, + pipeline=None, + exclude=None + ): """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. + + `exclude` allows us to exclude some job id from the status check. This is useful + when enqueueing the dependents of a _successful_ job -- that status of + `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this + method is _called_ in the _stack_ of it's dependents are being enqueued. """ + exclude = exclude or [] pipe = pipeline if pipeline is not None else self.connection + dependencies = self.connection.smembers(self.dependencies_key) + if pipeline is not None: pipe.watch(*[Job.key_for(as_text(_id)) - for _id in self.connection.smembers(self.dependencies_key)]) + for _id in dependencies]) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_field = self.redis_job_namespace_prefix + '*->status' @@ -751,11 +776,14 @@ class Job(object): dependencies_statuses = [ (as_text(_id), as_text(status)) for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, - get=['#', get_field], alpha=True, groups=True, ) + get=['#', get_field], alpha=True, + groups=True, ) ] return all(status == JobStatus.FINISHED for job_id, status - in dependencies_statuses) + in dependencies_statuses + if job_id not in exclude) + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 4fe1b44..6dba72f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings - from datetime import datetime from redis import WatchError @@ -66,7 +65,8 @@ class Queue(object): if 'async' in kwargs: self._is_async = kwargs['async'] - warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning) + warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', + DeprecationWarning) # override class attribute job_class if one was passed if job_class is not None: @@ -317,7 +317,8 @@ class Queue(object): pipe.multi() for dependency in dependencies: - if dependency.get_status(refresh=False) != JobStatus.FINISHED: + if dependency.get_status( + refresh=False) != JobStatus.FINISHED: job.set_status(JobStatus.DEFERRED, pipeline=pipe) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) @@ -379,8 +380,9 @@ class Queue(object): """Creates a job to represent the delayed function call and enqueues it.""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) - + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, + **kwargs) + return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, @@ -393,7 +395,8 @@ class Queue(object): from .registry import ScheduledJobRegistry (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, + **kwargs) job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, @@ -465,9 +468,14 @@ class Queue(object): for _id in pipe.smembers(dependents_key)] dependent_jobs = [ - job for job in self.job_class.fetch_many(dependent_job_ids, - connection=self.connection) - if job.dependencies_are_met(pipeline=pipe) + dependent_job for dependent_job + in self.job_class.fetch_many( + dependent_job_ids, + connection=self.connection + ) if dependent_job.dependencies_are_met( + pipeline=pipe, + exclude={job.id} + ) ] pipe.multi() @@ -480,7 +488,8 @@ class Queue(object): if dependent.origin == self.name: self.enqueue_job(dependent, pipeline=pipe) else: - queue = self.__class__(name=dependent.origin, connection=self.connection) + queue = self.__class__(name=dependent.origin, + connection=self.connection) queue.enqueue_job(dependent, pipeline=pipe) pipe.delete(dependents_key) @@ -519,7 +528,8 @@ class Queue(object): connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: - raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') + raise ValueError( + 'RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) diff --git a/rq/worker.py b/rq/worker.py index 5161a24..7e090f5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -845,8 +845,6 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) - # TODO: This was moved - job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) @@ -858,6 +856,7 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False)