@ -1,40 +1,37 @@
import inspect
import json
import pickle
import warnings
import zlib
import typing as t
import asyncio
from collections . abc import Iterable
from datetime import datetime , timedelta , timezone
from enum import Enum
from functools import partial
from redis import WatchError
from typing import Any , List , Optional
from typing import ( TYPE_CHECKING , Any , Callable , Dict , Iterable , List ,
Optional , Tuple , Union )
from uuid import uuid4
if t . TYPE_CHECKING :
if TYPE_CHECKING :
from . results import Result
from . queue import Queue
from redis import Redis
from redis . client import Pipeline
from . connections import resolve_connection
from . exceptions import DeserializationError , InvalidJobOperation , NoSuchJobError
from . exceptions import ( DeserializationError , InvalidJobOperation ,
NoSuchJobError )
from . local import LocalStack
from . serializers import resolve_serializer
from . utils import ( get_version , import_attribute , parse_timeout , str_to_date ,
utcformat , utcnow , ensure_list , get_call_string , as_text ,
decode_redis_hash )
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
dumps = partial ( pickle . dumps , protocol = pickle . HIGHEST_PROTOCOL )
loads = pickle . loads
from . types import FunctionReferenceType , JobDependencyType
from . utils import ( as_text , decode_redis_hash , ensure_list , get_call_string ,
get_version , import_attribute , parse_timeout , str_to_date ,
utcformat , utcnow )
class JobStatus ( str , Enum ) :
""" The Status of Job within its lifecycle at any given time. """
QUEUED = ' queued '
FINISHED = ' finished '
FAILED = ' failed '
@ -46,7 +43,21 @@ class JobStatus(str, Enum):
class Dependency :
def __init__ ( self , jobs : t . List [ t . Union [ ' Job ' , str ] ] , allow_failure : bool = False , enqueue_at_front : bool = False ) :
def __init__ ( self , jobs : List [ Union [ ' Job ' , str ] ] , allow_failure : bool = False , enqueue_at_front : bool = False ) :
""" The definition of a Dependency.
Args :
jobs ( List [ Union [ Job , str ] ] ) : A list of Job instances or Job IDs .
Anything different will raise a ValueError
allow_failure ( bool , optional ) : Whether to allow for failure when running the depency ,
meaning , the dependencies should continue running even after one of them failed .
Defaults to False .
enqueue_at_front ( bool , optional ) : Whether this dependecy should be enqueued at the front of the queue .
Defaults to False .
Raises :
ValueError : If the ` jobs ` param has anything different than ` str ` or ` Job ` class or the job list is empty
"""
dependent_jobs = ensure_list ( jobs )
if not all (
isinstance ( job , Job ) or isinstance ( job , str )
@ -62,46 +73,118 @@ class Dependency:
self . enqueue_at_front = enqueue_at_front
# Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated.
UNEVALUATED = object ( )
""" Sentinel value to mark that some of our lazily evaluated properties have not
yet been evaluated .
"""
def cancel_job ( job_id : str , connection : Optional [ ' Redis ' ] = None , serializer = None , enqueue_dependents : bool = False ) :
""" Cancels the job with the given job ID, preventing execution. Discards
any job info ( i . e . it can ' t be requeued later).
def cancel_job ( job_id : str , connection : Optional [ ' Redis ' ] = None ,
serializer = None , enqueue_dependents : bool = False ) :
""" Cancels the job with the given job ID, preventing execution.
Use with caution . This will discard any job info ( i . e . it can ' t be requeued later).
Args :
job_id ( str ) : The Job ID
connection ( Optional [ Redis ] , optional ) : The Redis Connection . Defaults to None .
serializer ( str , optional ) : The string of the path to the serializer to use . Defaults to None .
enqueue_dependents ( bool , optional ) : Whether dependents should still be enqueued . Defaults to False .
"""
Job . fetch ( job_id , connection = connection , serializer = serializer ) . cancel ( enqueue_dependents = enqueue_dependents )
def get_current_job ( connection : Optional [ ' Redis ' ] = None , job_class : Optional [ ' Job ' ] = None ) :
""" Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context , None is returned .
def get_current_job ( connection : Optional [ ' Redis ' ] = None , job_class : Optional [ ' Job ' ] = None ) - > Optional [ ' Job ' ] :
""" Returns the Job instance that is currently being executed.
If this function is invoked from outside a job context , None is returned .
Args :
connection ( Optional [ Redis ] , optional ) : The connection to use . Defaults to None .
job_class ( Optional [ Job ] , optional ) : The job class ( DEPRECATED ) . Defaults to None .
Returns :
job ( Optional [ Job ] ) : The current Job running
"""
if connection :
warnings . warn ( " connection argument for get_current_job is deprecated. " ,
DeprecationWarning )
if job_class :
warnings . warn ( " job_class argument for get_current_job is deprecated. " ,
DeprecationWarning )
return _job_stack . top
def requeue_job ( job_id : str , connection : ' Redis ' , serializer = None ) :
def requeue_job ( job_id : str , connection : ' Redis ' , serializer = None ) - > ' Job ' :
""" Fetches a Job by ID and requeues it using the `requeue()` method.
Args :
job_id ( str ) : The Job ID that should be requeued .
connection ( Redis ) : The Redis Connection to use
serializer ( Optional [ str ] , optional ) : The serializer . Defaults to None .
Returns :
Job : The requeued Job object .
"""
job = Job . fetch ( job_id , connection = connection , serializer = serializer )
return job . requeue ( )
class Job :
""" A Job is just a convenient datastructure to pass around job (meta) data.
"""
""" A Job is just a convenient datastructure to pass around job (meta) data. """
redis_job_namespace_prefix = ' rq:job: '
# Job construction
@classmethod
def create ( cls , func : t . Callable [ . . . , t . Any ] , args = None , kwargs = None , connection : Optional [ ' Redis ' ] = None ,
result_ttl = None , ttl = None , status : JobStatus = None , description = None ,
depends_on = None , timeout = None , id = None , origin = None , meta = None ,
failure_ttl = None , serializer = None , * , on_success = None , on_failure = None ) - > ' Job ' :
def create ( cls , func : FunctionReferenceType , args : Union [ List [ Any ] , Optional [ Tuple ] ] = None ,
kwargs : Optional [ Dict [ str , Any ] ] = None , connection : Optional [ ' Redis ' ] = None ,
result_ttl : Optional [ int ] = None , ttl : Optional [ int ] = None ,
status : Optional [ JobStatus ] = None , description : Optional [ str ] = None ,
depends_on : Optional [ JobDependencyType ] = None ,
timeout : Optional [ int ] = None , id : Optional [ str ] = None ,
origin = None , meta : Optional [ Dict [ str , Any ] ] = None ,
failure_ttl : Optional [ int ] = None , serializer = None , * ,
on_success : Optional [ Callable [ . . . , Any ] ] = None ,
on_failure : Optional [ Callable [ . . . , Any ] ] = None ) - > ' Job ' :
""" Creates a new Job instance for the given function, arguments, and
keyword arguments .
Args :
func ( FunctionReference ) : The function / method / callable for the Job . This can be
a reference to a concrete callable or a string representing the path of function / method to be
imported . Effectively this is the only required attribute when creating a new Job .
args ( Union [ List [ Any ] , Optional [ Tuple ] ] , optional ) : A Tuple / List of positional arguments to pass the callable .
Defaults to None , meaning no args being passed .
kwargs ( Optional [ Dict ] , optional ) : A Dictionary of keyword arguments to pass the callable .
Defaults to None , meaning no kwargs being passed .
connection ( Optional [ Redis ] , optional ) : The Redis connection to use . Defaults to None .
This will be " resolved " using the ` resolve_connection ` function when initialzing the Job Class .
result_ttl ( Optional [ int ] , optional ) : The amount of time in seconds the results should live .
Defaults to None .
ttl ( Optional [ int ] , optional ) : The Time To Live ( TTL ) for the job itself . Defaults to None .
status ( JobStatus , optional ) : The Job Status . Defaults to None .
description ( Optional [ str ] , optional ) : The Job Description . Defaults to None .
depends_on ( Union [ ' Dependency ' , List [ Union [ ' Dependency ' , ' Job ' ] ] ] , optional ) : What the jobs depends on .
This accepts a variaty of different arguments including a ` Dependency ` , a list of ` Dependency ` or a ` Job `
list of ` Job ` . Defaults to None .
timeout ( Optional [ int ] , optional ) : The amount of time in seconds that should be a hardlimit for a job execution . Defaults to None .
id ( Optional [ str ] , optional ) : An Optional ID ( str ) for the Job . Defaults to None .
origin ( Optional [ str ] , optional ) : The queue of origin . Defaults to None .
meta ( Optional [ Dict [ str , Any ] ] , optional ) : Custom metadata about the job , takes a dictioanry . Defaults to None .
failure_ttl ( Optional [ int ] , optional ) : THe time to live in seconds for failed - jobs information . Defaults to None .
serializer ( Optional [ str ] , optional ) : The serializer class path to use . Should be a string with the import
path for the serializer to use . eg . ` mymodule . myfile . MySerializer ` Defaults to None .
on_success ( Optional [ Callable [ . . . , Any ] ] , optional ) : A callback function , should be a callable to run
when / if the Job finishes sucessfully . Defaults to None .
on_failure ( Optional [ Callable [ . . . , Any ] ] , optional ) : A callback function , should be a callable to run
when / if the Job fails . Defaults to None .
Raises :
TypeError : If ` args ` is not a tuple / list
TypeError : If ` kwargs ` is not a dict
TypeError : If the ` func ` is something other than a string or a Callable reference
ValueError : If ` on_failure ` is not a function
ValueError : If ` on_success ` is not a function
Returns :
Job : A job instance .
"""
if args is None :
args = ( )
@ -171,25 +254,51 @@ class Job:
return job
def get_position ( self ) :
def get_position ( self ) - > Optional [ int ] :
""" Get ' s the job ' s position on the queue
Returns :
position ( Optional [ int ] ) : The position
"""
from . queue import Queue
if self . origin :
q = Queue ( name = self . origin , connection = self . connection )
return q . get_job_position ( self . _id )
return None
def get_status ( self , refresh : bool = True ) - > str :
def get_status ( self , refresh : bool = True ) - > JobStatus :
""" Gets the Job Status
Args :
refresh ( bool , optional ) : Whether to refresh the Job . Defaults to True .
Returns :
status ( JobStatus ) : The Job Status
"""
if refresh :
self . _status = as_text ( self . connection . hget ( self . key , ' status ' ) )
return self . _status
def set_status ( self , status : str , pipeline : Optional [ ' Pipeline ' ] = None ) :
def set_status ( self , status : JobStatus , pipeline : Optional [ ' Pipeline ' ] = None ) - > None :
""" Set ' s the Job Status
Args :
status ( JobStatus ) : The Job Status to be set
pipeline ( Optional [ Pipeline ] , optional ) : Optional Redis Pipeline to use . Defaults to None .
"""
self . _status = status
connection : ' Redis ' = pipeline if pipeline is not None else self . connection
connection . hset ( self . key , ' status ' , self . _status )
def get_meta ( self , refresh : bool = True ) :
def get_meta ( self , refresh : bool = True ) - > Dict :
""" Get ' s the metadata for a Job, an arbitrary dictionary.
Args :
refresh ( bool , optional ) : Whether to refresh . Defaults to True .
Returns :
meta ( Dict ) : The dictionary of metadata
"""
if refresh :
meta = self . connection . hget ( self . key , ' meta ' )
self . meta = self . serializer . loads ( meta ) if meta else { }
@ -231,7 +340,7 @@ class Job:
@property
def _dependency_id ( self ) :
""" Returns the first item in self._dependency_ids. Present to
preserve compatibility with third party packages . .
preserve compatibility with third party packages .
"""
if self . _dependency_ids :
return self . _dependency_ids [ 0 ]
@ -250,7 +359,7 @@ class Job:
return job
@property
def dependent_ids ( self ) - > t. List[ str ] :
def dependent_ids ( self ) - > List[ str ] :
""" Returns a list of ids of jobs whose execution depends on this
job ' s successful execution. " " "
return list ( map ( as_text , self . connection . smembers ( self . dependents_key ) ) )
@ -287,10 +396,15 @@ class Job:
return self . _failure_callback
def _deserialize_data ( self ) :
""" Deserializes the Job `data` into a tuple.
This includes the ` _func_name ` , ` _instance ` , ` _args ` and ` _kwargs `
Raises :
DeserializationError : Cathes any deserialization error ( since serializers are generic )
"""
try :
self . _func_name , self . _instance , self . _args , self . _kwargs = self . serializer . loads ( self . data )
except Exception as e :
# catch anything because serializers are generic
raise DeserializationError ( ) from e
@property
@ -365,45 +479,71 @@ class Job:
self . _data = UNEVALUATED
@classmethod
def exists ( cls , job_id : str , connection : Optional [ ' Redis ' ] = None ) - > int :
""" Returns whether a job hash exists for the given job ID. """
def exists ( cls , job_id : str , connection : Optional [ ' Redis ' ] = None ) - > bool :
""" Checks whether a Job Hash exists for the given Job ID
Args :
job_id ( str ) : The Job ID
connection ( Optional [ Redis ] , optional ) : Optional connection to use . Defaults to None .
Returns :
job_exists ( bool ) : Whether the Job exists
"""
conn = resolve_connection ( connection )
return conn . exists ( cls . key_for ( job_id ) )
job_key = cls . key_for ( job_id )
job_exists = conn . exists ( job_key )
return bool ( job_exists )
@classmethod
def fetch ( cls , id : str , connection : Optional [ ' Redis ' ] = None , serializer = None ) - > ' Job ' :
""" Fetches a persisted job from its corresponding Redis key and
instantiates it .
""" Fetches a persisted Job from its corresponding Redis key and instantiates it
Args :
id ( str ) : The Job to fetch
connection ( Optional [ & #39;Redis'], optional): An optional Redis connection. Defaults to None.
serializer ( _type_ , optional ) : The serializer to use . Defaults to None .
Returns :
Job : The Job instance
"""
job = cls ( id , connection = connection , serializer = serializer )
job . refresh ( )
return job
@classmethod
def fetch_many ( cls , job_ids : t . Iterable [ str ] , connection : ' Redis ' , serializer = None ) :
def fetch_many ( cls , job_ids : Iterable[ str ] , connection : ' Redis ' , serializer = None ) - > List [ ' Job ' ] :
"""
Bulk version of Job . fetch
For any job_ids which a job does not exist , the corresponding item in
the returned list will be None .
Args :
job_ids ( Iterable [ str ] ) : A list of job ids .
connection ( Redis ) : Redis connection
serializer ( Callable ) : A serializer
Returns :
jobs ( list [ Job ] ) : A list of Jobs instances .
"""
with connection . pipeline ( ) as pipeline :
for job_id in job_ids :
pipeline . hgetall ( cls . key_for ( job_id ) )
results = pipeline . execute ( )
jobs : t . List [ Optional [ ' Job ' ] ] = [ ]
jobs : List[ Optional [ ' Job ' ] ] = [ ]
for i , job_id in enumerate ( job_ids ) :
if results [ i ] :
if not results [ i ] :
jobs . append ( None )
continue
job = cls ( job_id , connection = connection , serializer = serializer )
job . restore ( results [ i ] )
jobs . append ( job )
else :
jobs . append ( None )
return jobs
def __init__ ( self , id : str = None , connection : Optional [ ' Redis ' ] = None , serializer = None ) :
def __init__ ( self , id : Optional [ str ] = None , connection : Optional [ ' Redis ' ] = None , serializer = None ) :
self . connection = resolve_connection ( connection )
self . _id = id
self . created_at = utcnow ( )
@ -429,12 +569,12 @@ class Job:
self . ttl : Optional [ int ] = None
self . worker_name : Optional [ str ] = None
self . _status = None
self . _dependency_ids : t. List[ str ] = [ ]
self . _dependency_ids : List[ str ] = [ ]
self . meta = { }
self . serializer = resolve_serializer ( serializer )
self . retries_left = None
self . retry_intervals : Optional [ t. List[ int ] ] = None
self . redis_server_version = None
self . retry_intervals : Optional [ List[ int ] ] = None
self . redis_server_version : Optional [ Tuple [ int , int , int ] ] = None
self . last_heartbeat : Optional [ datetime ] = None
self . allow_dependency_failures : Optional [ bool ] = None
self . enqueue_at_front : Optional [ bool ] = None
@ -459,21 +599,38 @@ class Job:
return hash ( self . id )
# Data access
def get_id ( self ) : # noqa
def get_id ( self ) - > str : # noqa
""" The job ID for this job instance. Generates an ID lazily the
first time the ID is requested .
Returns :
job_id ( str ) : The Job ID
"""
if self . _id is None :
self . _id = str ( uuid4 ( ) )
return self . _id
def set_id ( self , value : str ) :
""" Sets a job ID for the given job. """
def set_id ( self , value : str ) - > None :
""" Sets a job ID for the given job
Args :
value ( str ) : The value to set as Job ID
"""
if not isinstance ( value , str ) :
raise TypeError ( ' id must be a string, not {0} ' . format ( type ( value ) ) )
self . _id = value
def heartbeat ( self , timestamp : datetime , ttl : int , pipeline : Optional [ ' Pipeline ' ] = None , xx : bool = False ) :
""" Sets the heartbeat for a job.
It will set a hash in Redis with the ` last_heartbeat ` key and datetime value .
If a Redis ' pipeline is passed, it will use that, else, it will use the job ' s own connection .
Args :
timestamp ( datetime ) : The timestamp to use
ttl ( int ) : The time to live
pipeline ( Optional [ Pipeline ] , optional ) : Can receive a Redis ' pipeline to use. Defaults to None.
xx ( bool , optional ) : Only sets the key if already exists . Defaults to False .
"""
self . last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self . connection
connection . hset ( self . key , ' last_heartbeat ' , utcformat ( self . last_heartbeat ) )
@ -482,13 +639,27 @@ class Job:
id = property ( get_id , set_id )
@classmethod
def key_for ( cls , job_id : str ) :
""" The Redis key that is used to store job hash under. """
def key_for ( cls , job_id : str ) - > bytes :
""" The Redis key that is used to store job hash under.
Args :
job_id ( str ) : The Job ID
Returns :
redis_job_key ( bytes ) : The Redis fully qualified key for the job
"""
return ( cls . redis_job_namespace_prefix + job_id ) . encode ( ' utf-8 ' )
@classmethod
def dependents_key_for ( cls , job_id : str ) :
""" The Redis key that is used to store job dependents hash under. """
def dependents_key_for ( cls , job_id : str ) - > str :
""" The Redis key that is used to store job dependents hash under.
Args :
job_id ( str ) : The " parent " job id
Returns :
dependents_key ( str ) : The dependents key
"""
return ' {0} {1} :dependents ' . format ( cls . redis_job_namespace_prefix , job_id )
@property
@ -505,14 +676,20 @@ class Job:
def dependencies_key ( self ) :
return ' {0} : {1} :dependencies ' . format ( self . redis_job_namespace_prefix , self . id )
def fetch_dependencies ( self , watch : bool = False , pipeline : Optional [ ' Pipeline ' ] = None ) :
"""
Fetch all of a job ' s dependencies. If a pipeline is supplied, and
def fetch_dependencies ( self , watch : bool = False , pipeline : Optional [ ' Pipeline ' ] = None ) - > List [ ' Job ' ] :
""" Fetch all of a job ' s dependencies. If a pipeline is supplied, and
watch is true , then set WATCH on all the keys of all dependencies .
Returned jobs will use self ' s connection, not the pipeline supplied.
If a job has been deleted from redis , it is not returned .
Args :
watch ( bool , optional ) : Wether to WATCH the keys . Defaults to False .
pipeline ( Optional [ Pipeline ] ) : The Redis ' pipeline to use. Defaults to None.
Returns :
jobs ( list [ Job ] ) : A list of Jobs
"""
connection = pipeline if pipeline is not None else self . connection
@ -520,10 +697,8 @@ class Job:
connection . watch ( * [ self . key_for ( dependency_id )
for dependency_id in self . _dependency_ids ] )
jobs = [ job
for job in self . fetch_many ( self . _dependency_ids , connection = self . connection , serializer = self . serializer )
if job ]
dependencies_list = self . fetch_many ( self . _dependency_ids , connection = self . connection , serializer = self . serializer )
jobs = [ job for job in dependencies_list if job ]
return jobs
@property
@ -545,19 +720,30 @@ class Job:
return self . _exc_info
def return_value ( self , refresh = False ) - > Any :
""" Returns the return value of the latest execution, if it was successful """
def return_value ( self , refresh : bool = False ) - > Optional [ Any ] :
""" Returns the return value of the latest execution, if it was successful
Args :
refresh ( bool , optional ) : Whether to refresh the current status . Defaults to False .
Returns :
result ( Optional [ Any ] ) : The job return value .
"""
from . results import Result
if refresh :
self . _cached_result = None
if self . supports_redis_streams :
if not self . supports_redis_streams :
return None
if not self . _cached_result :
self . _cached_result = self . latest_result ( )
if self . _cached_result and self . _cached_result . type == Result . Type . SUCCESSFUL :
return self . _cached_result . return_value
return None
@property
def result ( self ) - > Any :
""" Returns the return value of the job.
@ -597,17 +783,33 @@ class Job:
return self . _result
def results ( self ) - > List [ ' Result ' ] :
""" Returns all Result objects """
""" Returns all Result objects
Returns :
all_results ( List [ Result ] ) : A list of ' Result ' objects
"""
from . results import Result
return Result . all ( self , serializer = self . serializer )
def latest_result ( self ) - > Optional [ ' Result ' ] :
""" Get the latest job result.
Returns :
result ( Result ) : The Result object
"""
""" Returns the latest Result object """
from . results import Result
return Result . fetch_latest ( self , serializer = self . serializer )
def restore ( self , raw_data ) :
""" Overwrite properties with the provided values stored in Redis """
def restore ( self , raw_data ) - > Any :
""" Overwrite properties with the provided values stored in Redis.
Args :
raw_data ( _type_ ) : The raw data to load the job data from
Raises :
NoSuchJobError : If there way an error getting the job data
"""
obj = decode_redis_hash ( raw_data )
try :
raw_data = obj [ ' data ' ]
@ -680,11 +882,17 @@ class Job:
self . restore ( data )
def to_dict ( self , include_meta : bool = True , include_result : bool = True ) - > dict :
"""
Returns a serialization of the current job instance
""" Returns a serialization of the current job instance
You can exclude serializing the ` meta ` dictionary by setting
` include_meta = False ` .
Args :
include_meta ( bool , optional ) : Whether to include the Job ' s metadata. Defaults to True.
include_result ( bool , optional ) : Whether to include the Job ' s result. Defaults to True.
Returns :
dict : The Job serialized as a dictionary
"""
obj = {
' created_at ' : utcformat ( self . created_at or utcnow ( ) ) ,
@ -741,15 +949,19 @@ class Job:
return obj
def save ( self , pipeline : Optional [ ' Pipeline ' ] = None , include_meta : bool = True ,
include_result = True ) :
"""
Dumps the current job instance to its corresponding Redis key .
include_result : bool = True ) :
""" Dumps the current job instance to its corresponding Redis key.
Exclude saving the ` meta ` dictionary by setting
` include_meta = False ` . This is useful to prevent clobbering
user metadata without an expensive ` refresh ( ) ` call first .
Redis key persistence may be altered by ` cleanup ( ) ` method .
Args :
pipeline ( Optional [ Pipeline ] , optional ) : The Redis ' pipeline to use. Defaults to None.
include_meta ( bool , optional ) : Whether to include the job ' s metadata. Defaults to True.
include_result ( bool , optional ) : Whether to include the job ' s result. Defaults to True.
"""
key = self . key
connection = pipeline if pipeline is not None else self . connection
@ -766,9 +978,13 @@ class Job:
""" Only supported by Redis server >= 5.0 is required. """
return self . get_redis_server_version ( ) > = ( 5 , 0 , 0 )
def get_redis_server_version ( self ) :
""" Return Redis server version of connection """
if not self . redis_server_version :
def get_redis_server_version ( self ) - > Tuple [ int , int , int ] :
""" Return Redis server version of connection
Returns :
redis_server_version ( Tuple [ int , int , int ] ) : The Redis version within a Tuple of integers , eg ( 5 , 0 , 9 )
"""
if self . redis_server_version is None :
self . redis_server_version = get_version ( self . connection )
return self . redis_server_version
@ -788,6 +1004,13 @@ class Job:
You can enqueue the jobs dependents optionally ,
Same pipelining behavior as Queue . enqueue_dependents on whether or not a pipeline is passed in .
Args :
pipeline ( Optional [ Pipeline ] , optional ) : The Redis ' pipeline to use. Defaults to None.
enqueue_dependents ( bool , optional ) : Whether to enqueue dependents jobs . Defaults to False .
Raises :
InvalidJobOperation : If the job has already been cancelled .
"""
if self . is_canceled :
raise InvalidJobOperation ( " Cannot cancel already canceled job: {} " . format ( self . get_id ( ) ) )
@ -834,8 +1057,15 @@ class Job:
# handle it
raise
def requeue ( self , at_front : bool = False ) :
""" Requeues job. """
def requeue ( self , at_front : bool = False ) - > ' Job ' :
""" Requeues job
Args :
at_front ( bool , optional ) : Whether the job should be requeued at the front of the queue . Defaults to False .
Returns :
job ( Job ) : The requeued Job instance
"""
return self . failed_job_registry . requeue ( self , at_front = at_front )
def _remove_from_registries ( self , pipeline : Optional [ ' Pipeline ' ] = None , remove_from_queue : bool = True ) :
@ -888,10 +1118,15 @@ class Job:
registry . remove ( self , pipeline = pipeline )
def delete ( self , pipeline : Optional [ ' Pipeline ' ] = None , remove_from_queue : bool = True ,
delete_dependents = False ) :
delete_dependents : bool = False ) :
""" Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well . """
on this job can optionally be deleted as well .
Args :
pipeline ( Optional [ Pipeline ] , optional ) : Redis ' piepline. Defaults to None.
remove_from_queue ( bool , optional ) : Whether the job should be removed from the queue . Defaults to True .
delete_dependents ( bool , optional ) : Whether job dependents should also be deleted . Defaults to False .
"""
connection = pipeline if pipeline is not None else self . connection
self . _remove_from_registries ( pipeline = pipeline , remove_from_queue = remove_from_queue )
@ -902,21 +1137,29 @@ class Job:
connection . delete ( self . key , self . dependents_key , self . dependencies_key )
def delete_dependents ( self , pipeline : Optional [ ' Pipeline ' ] = None ) :
""" Delete jobs depending on this job. """
""" Delete jobs depending on this job.
Args :
pipeline ( Optional [ Pipeline ] , optional ) : Redis ' piepline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self . connection
for dependent_id in self . dependent_ids :
try :
job = Job . fetch ( dependent_id , connection = self . connection , serializer = self . serializer )
job . delete ( pipeline = pipeline ,
remove_from_queue = False )
job . delete ( pipeline = pipeline , remove_from_queue = False )
except NoSuchJobError :
# It could be that the dependent job was never saved to redis
pass
connection . delete ( self . dependents_key )
# Job execution
def perform ( self ) : # noqa
""" Invokes the job function with the job arguments. """
def perform ( self ) - > Any : # noqa
""" The main execution method. Invokes the job function with the job arguments.
This is the method that actually performs the job - it ' s what its called by the worker.
Returns :
result ( Any ) : The job result
"""
self . connection . persist ( self . key )
_job_stack . push ( self )
try :
@ -926,13 +1169,19 @@ class Job:
return self . _result
def prepare_for_execution ( self , worker_name : str , pipeline : ' Pipeline ' ) :
""" Set job metadata before execution begins """
""" Prepares the job for execution, setting the worker name,
heartbeat information , status and other metadata before execution begins .
Args :
worker_name ( str ) : The worker that will perform the job
pipeline ( Pipeline ) : The Redis ' piipeline to use
"""
self . worker_name = worker_name
self . last_heartbeat = utcnow ( )
self . started_at = self . last_heartbeat
self . _status = JobStatus . STARTED
mapping = {
' last_heartbeat ' : utcformat ( self . last_heartbeat ) , # type: ignore
' last_heartbeat ' : utcformat ( self . last_heartbeat ) ,
' status ' : self . _status ,
' started_at ' : utcformat ( self . started_at ) , # type: ignore
' worker_name ' : worker_name
@ -940,9 +1189,17 @@ class Job:
if self . get_redis_server_version ( ) > = ( 4 , 0 , 0 ) :
pipeline . hset ( self . key , mapping = mapping )
else :
pipeline . hmset ( self . key , mapping )
pipeline . hmset ( self . key , mapping = mapping )
def _execute ( self ) :
def _execute ( self ) - > Any :
""" Actually runs the function with it ' s *args and **kwargs.
It will use the ` func ` property , which was already resolved and ready to run at this point .
If the function is a coroutine ( it ' s an async function/method), then the `result`
will have to be awaited within an event loop .
Returns :
result ( Any ) : The function result
"""
result = self . func ( * self . args , * * self . kwargs )
if asyncio . iscoroutine ( result ) :
loop = asyncio . new_event_loop ( )
@ -950,36 +1207,56 @@ class Job:
return coro_result
return result
def get_ttl ( self , default_ttl : Optional [ int ] = None ) :
def get_ttl ( self , default_ttl : Optional [ int ] = None ) - > Optional [ int ] :
""" Returns ttl for a job that determines how long a job will be
persisted . In the future , this method will also be responsible
for determining ttl for repeated jobs .
Args :
default_ttl ( Optional [ int ] ) : The default time to live for the job
Returns :
ttl ( int ) : The time to live
"""
return default_ttl if self . ttl is None else self . ttl
def get_result_ttl ( self , default_ttl : Optional [ int ] = None ) :
def get_result_ttl ( self , default_ttl : Optional [ int ] = None ) - > Optional [ int ] :
""" Returns ttl for a job that determines how long a jobs result will
be persisted . In the future , this method will also be responsible
for determining ttl for repeated jobs .
Args :
default_ttl ( Optional [ int ] ) : The default time to live for the job result
Returns :
ttl ( int ) : The time to live for the result
"""
return default_ttl if self . result_ttl is None else self . result_ttl
# Representation
def get_call_string ( self ) : # noqa
def get_call_string ( self ) - > Optional [ str ] : # noqa
""" Returns a string representation of the call, formatted as a regular
Python function invocation statement .
Returns :
call_repr ( str ) : The string representation
"""
return get_call_string ( self . func_name , self . args , self . kwargs , max_length = 75 )
call_repr = get_call_string ( self . func_name , self . args , self . kwargs , max_length = 75 )
return call_repr
def cleanup ( self , ttl : Optional [ int ] = None , pipeline : Optional [ ' Pipeline ' ] = None ,
remove_from_queue : bool = True ) :
""" Prepare job for eventual deletion (if needed). This method is usually
called after successful execution . How long we persist the job and its
result depends on the value of ttl :
""" Prepare job for eventual deletion (if needed).
This method is usually called after successful execution .
How long we persist the job and its result depends on the value of ttl :
- If ttl is 0 , cleanup the job immediately .
- If it ' s a positive number, set the job to expire in X seconds.
- If ttl is negative , don ' t set an expiry to it (persist
forever )
- If ttl is negative , don ' t set an expiry to it (persist forever)
Args :
ttl ( Optional [ int ] , optional ) : Time to live . Defaults to None .
pipeline ( Optional [ Pipeline ] , optional ) : Redis ' pipeline. Defaults to None.
remove_from_queue ( bool , optional ) : Whether the job should be removed from the queue . Defaults to True .
"""
if ttl == 0 :
self . delete ( pipeline = pipeline , remove_from_queue = remove_from_queue )
@ -1005,10 +1282,13 @@ class Job:
job_class = self . __class__ ,
serializer = self . serializer )
def get_retry_interval ( self ) :
def get_retry_interval ( self ) - > int :
""" Returns the desired retry interval.
If number of retries is bigger than length of intervals , the first
value in the list will be used multiple times .
Returns :
retry_interval ( int ) : The desired retry interval
"""
if self . retry_intervals is None :
return 0
@ -1017,7 +1297,15 @@ class Job:
return self . retry_intervals [ index ]
def retry ( self , queue : ' Queue ' , pipeline : ' Pipeline ' ) :
""" Requeue or schedule this job for execution """
""" Requeue or schedule this job for execution.
If the the ` retry_interval ` was set on the job itself ,
it will calculate a scheduled time for the job to run , and instead
of just regularly ` enqueing ` the job , it will ` schedule ` it .
Args :
queue ( Queue ) : The queue to retry the job on
pipeline ( Pipeline ) : The Redis ' pipeline to use
"""
retry_interval = self . get_retry_interval ( )
self . retries_left = self . retries_left - 1
if retry_interval :
@ -1032,11 +1320,15 @@ class Job:
depend on are successfully performed . We record this relation as
a reverse dependency ( a Redis set ) , with a key that looks something
like :
. . codeblock : python : :
rq : job : job_id : dependents = { ' job_id_1 ' , ' job_id_2 ' }
This method adds the job in its dependencies ' dependents sets,
and adds the job to DeferredJobRegistry .
Args :
pipeline ( Optional [ Pipeline ] ) : The Redis ' pipeline. Defaults to None
"""
from . registry import DeferredJobRegistry
@ -1054,14 +1346,14 @@ class Job:
connection . sadd ( self . dependencies_key , dependency_id )
@property
def dependency_ids ( self ) :
def dependency_ids ( self ) - > List [ bytes ] :
dependencies = self . connection . smembers ( self . dependencies_key )
return [ Job . key_for ( _id . decode ( ) )
for _id in dependencies ]
def dependencies_are_met ( self , parent_job : Optional [ ' Job ' ] = None ,
pipeline: Optional [ ' Pipeline ' ] = None , exclude_job_id : str = None ) :
""" Returns a boolean indicating if all of this job ' s dependencies are _FINISHED_
def dependencies_are_met ( self , parent_job : Optional [ ' Job ' ] = None , pipeline : Optional [ ' Pipeline ' ] = None ,
exclude_job_id: Optional [ str ] = None ) - > bool :
""" Returns a boolean indicating if all of this job ' s dependencies are `FINISHED`
If a pipeline is passed , all dependencies are WATCHed .
@ -1069,6 +1361,14 @@ class Job:
This is useful when enqueueing the dependents of a _successful_ job - - that status of
` FINISHED ` may not be yet set in redis , but said job is indeed _done_ and this
method is _called_ in the _stack_ of its dependents are being enqueued .
Args :
parent_job ( Optional [ Job ] , optional ) : The parent Job . Defaults to None .
pipeline ( Optional [ Pipeline ] , optional ) : The Redis ' pipeline. Defaults to None.
exclude_job_id ( Optional [ str ] , optional ) : Whether to exclude the job id . . Defaults to None .
Returns :
are_met ( bool ) : Whether the dependencies were met .
"""
connection = pipeline if pipeline is not None else self . connection
@ -1076,8 +1376,7 @@ class Job:
connection . watch ( * [ self . key_for ( dependency_id )
for dependency_id in self . _dependency_ids ] )
dependencies_ids = { _id . decode ( )
for _id in connection . smembers ( self . dependencies_key ) }
dependencies_ids = { _id . decode ( ) for _id in connection . smembers ( self . dependencies_key ) }
if exclude_job_id :
dependencies_ids . discard ( exclude_job_id )
@ -1104,10 +1403,9 @@ class Job:
dependencies_statuses = pipeline . execute ( )
if self . allow_dependency_failures :
allowed_statuses = [ JobStatus . FINISHED , JobStatus . FAILED ]
else :
allowed_statuses = [ JobStatus . FINISHED ]
if self . allow_dependency_failures :
allowed_statuses . append ( JobStatus . FAILED )
return all (
status . decode ( ) in allowed_statuses
@ -1121,8 +1419,18 @@ _job_stack = LocalStack()
class Retry :
def __init__ ( self , max , interval : int = 0 ) :
""" `interval` can be a positive number or a list of ints """
def __init__ ( self , max : int , interval : Union [ int , List [ int ] ] = 0 ) :
""" The main object to defined Retry logics for jobs.
Args :
max ( int ) : The max number of times a job should be retried
interval ( Union [ int , List [ int ] ] , optional ) : The interval between retries .
Can be a positive number ( int ) or a list of ints . Defaults to 0 ( meaning no interval between retries ) .
Raises :
ValueError : If the ` max ` argument is lower than 1
ValueError : If the interval param is negative or the list contains negative numbers
"""
super ( ) . __init__ ( )
if max < 1 :
raise ValueError ( ' max: please enter a value greater than 0 ' )