@ -19,7 +19,7 @@ from .connections import get_current_connection
from . job import Job , Status
from . utils import make_colorizer
from . logutils import setup_loghandlers
from . exceptions import NoQueueError , UnpickleError
from . exceptions import NoQueueError , UnpickleError , DequeueTimeout
from . timeouts import death_penalty_after
from . version import VERSION
@ -27,6 +27,7 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer ( ' darkyellow ' )
blue = make_colorizer ( ' darkblue ' )
DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500
logger = logging . getLogger ( __name__ )
@ -85,6 +86,7 @@ class Worker(object):
if connection is None :
connection = get_current_connection ( )
if not connection . exists ( worker_key ) :
connection . srem ( cls . redis_workers_keys , worker_key )
return None
name = worker_key [ len ( prefix ) : ]
@ -97,8 +99,9 @@ class Worker(object):
return worker
def __init__ ( self , queues , name = None , default_result_ttl = DEFAULT_RESULT_TTL ,
connection = None , exc_handler = None ) : # noqa
def __init__ ( self , queues , name = None ,
default_result_ttl = DEFAULT_RESULT_TTL , connection = None ,
exc_handler = None , default_worker_ttl = DEFAULT_WORKER_TTL ) : # noqa
if connection is None :
connection = get_current_connection ( )
self . connection = connection
@ -109,6 +112,7 @@ class Worker(object):
self . validate_queues ( )
self . _exc_handlers = [ ]
self . default_result_ttl = default_result_ttl
self . default_worker_ttl = default_worker_ttl
self . _state = ' starting '
self . _is_horse = False
self . _horse_pid = 0
@ -200,6 +204,7 @@ class Worker(object):
p . hset ( key , ' birth ' , now )
p . hset ( key , ' queues ' , queues )
p . sadd ( self . redis_workers_keys , key )
p . expire ( key , self . default_worker_ttl )
p . execute ( )
def register_death ( self ) :
@ -301,10 +306,9 @@ class Worker(object):
self . log . info ( ' ' )
self . log . info ( ' *** Listening on %s ... ' % \
green ( ' , ' . join ( qnames ) ) )
timeout = None if burst else 0
timeout = None if burst else self . default_worker_ttl - 6 0
try :
result = Queue . dequeue_any ( self . queues , timeout , \
connection = self . connection )
result = self . dequeue_job_and_maintain_ttl ( timeout )
if result is None :
break
except StopRequested :
@ -322,7 +326,9 @@ class Worker(object):
self . log . info ( ' %s : %s ( %s ) ' % ( green ( queue . name ) ,
blue ( job . description ) , job . id ) )
self . connection . expire ( self . key , ( job . timeout or 180 ) + 60 )
self . fork_and_perform_job ( job )
self . connection . expire ( self . key , self . default_worker_ttl )
did_perform_work = True
finally :
@ -330,6 +336,16 @@ class Worker(object):
self . register_death ( )
return did_perform_work
def dequeue_job_and_maintain_ttl ( self , timeout ) :
while True :
try :
return Queue . dequeue_any ( self . queues , timeout ,
connection = self . connection )
except DequeueTimeout :
self . connection . expire ( self . key , self . default_worker_ttl )
def fork_and_perform_job ( self , job ) :
""" Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes