@ -1,26 +1,33 @@
import sys
# -*- coding: utf-8 -*-
import os
from __future__ import ( absolute_import , division , print_function ,
unicode_literals )
import errno
import errno
import logging
import os
import random
import random
import time
try :
from procname import setprocname
except ImportError :
def setprocname ( * args , * * kwargs ) : # noqa
pass
import socket
import signal
import signal
import socket
import sys
import time
import traceback
import traceback
import logging
from . queue import Queue , get_failed_queue
from rq . compat import as_text , text_type
from . connections import get_current_connection
from . connections import get_current_connection
from . exceptions import DequeueTimeout , NoQueueError
from . job import Job , Status
from . job import Job , Status
from . utils import make_colorizer , utcnow , utcformat
from . logutils import setup_loghandlers
from . logutils import setup_loghandlers
from . exceptions import NoQueueError , DequeueTimeout
from . queue import get_failed_queue , Queue
from . timeouts import death_penalty_after
from . timeouts import UnixSignalDeathPenalty
from . utils import make_colorizer , utcformat , utcnow
from . version import VERSION
from . version import VERSION
from rq . compat import text_type , as_text
try :
from procname import setprocname
except ImportError :
def setprocname ( * args , * * kwargs ) : # noqa
pass
green = make_colorizer ( ' darkgreen ' )
green = make_colorizer ( ' darkgreen ' )
yellow = make_colorizer ( ' darkyellow ' )
yellow = make_colorizer ( ' darkyellow ' )
@ -59,6 +66,7 @@ def signal_name(signum):
class Worker ( object ) :
class Worker ( object ) :
redis_worker_namespace_prefix = ' rq:worker: '
redis_worker_namespace_prefix = ' rq:worker: '
redis_workers_keys = ' rq:workers '
redis_workers_keys = ' rq:workers '
death_penalty_class = UnixSignalDeathPenalty
@classmethod
@classmethod
def all ( cls , connection = None ) :
def all ( cls , connection = None ) :
@ -98,8 +106,8 @@ class Worker(object):
return worker
return worker
def __init__ ( self , queues , name = None ,
def __init__ ( self , queues , name = None ,
default_result_ttl = DEFAULT_RESULT_TTL , connection = None ,
default_result_ttl = None , connection = None ,
exc_handler = None , default_worker_ttl = DEFAULT_WORKER_TTL ) : # noqa
exc_handler = None , default_worker_ttl = None ) : # noqa
if connection is None :
if connection is None :
connection = get_current_connection ( )
connection = get_current_connection ( )
self . connection = connection
self . connection = connection
@ -109,8 +117,15 @@ class Worker(object):
self . queues = queues
self . queues = queues
self . validate_queues ( )
self . validate_queues ( )
self . _exc_handlers = [ ]
self . _exc_handlers = [ ]
if default_result_ttl is None :
default_result_ttl = DEFAULT_RESULT_TTL
self . default_result_ttl = default_result_ttl
self . default_result_ttl = default_result_ttl
if default_worker_ttl is None :
default_worker_ttl = DEFAULT_WORKER_TTL
self . default_worker_ttl = default_worker_ttl
self . default_worker_ttl = default_worker_ttl
self . _state = ' starting '
self . _state = ' starting '
self . _is_horse = False
self . _is_horse = False
self . _horse_pid = 0
self . _horse_pid = 0
@ -124,8 +139,7 @@ class Worker(object):
if exc_handler is not None :
if exc_handler is not None :
self . push_exc_handler ( exc_handler )
self . push_exc_handler ( exc_handler )
def validate_queues ( self ) :
def validate_queues ( self ) : # noqa
""" Sanity check for the given queues. """
""" Sanity check for the given queues. """
if not iterable ( self . queues ) :
if not iterable ( self . queues ) :
raise ValueError ( ' Argument queues not iterable. ' )
raise ValueError ( ' Argument queues not iterable. ' )
@ -141,8 +155,7 @@ class Worker(object):
""" Returns the Redis keys representing this worker ' s queues. """
""" Returns the Redis keys representing this worker ' s queues. """
return map ( lambda q : q . key , self . queues )
return map ( lambda q : q . key , self . queues )
@property
@property # noqa
def name ( self ) :
def name ( self ) :
""" Returns the name of the worker, under which it is registered to the
""" Returns the name of the worker, under which it is registered to the
monitoring system .
monitoring system .
@ -185,8 +198,7 @@ class Worker(object):
"""
"""
setprocname ( ' rq: %s ' % ( message , ) )
setprocname ( ' rq: %s ' % ( message , ) )
def register_birth ( self ) :
def register_birth ( self ) : # noqa
""" Registers its own birth. """
""" Registers its own birth. """
self . log . debug ( ' Registering birth of worker %s ' % ( self . name , ) )
self . log . debug ( ' Registering birth of worker %s ' % ( self . name , ) )
if self . connection . exists ( self . key ) and \
if self . connection . exists ( self . key ) and \
@ -310,8 +322,7 @@ class Worker(object):
signal . signal ( signal . SIGINT , request_stop )
signal . signal ( signal . SIGINT , request_stop )
signal . signal ( signal . SIGTERM , request_stop )
signal . signal ( signal . SIGTERM , request_stop )
def work ( self , burst = False ) :
def work ( self , burst = False ) : # noqa
""" Starts the work loop.
""" Starts the work loop.
Pops and performs all jobs on the current list of queues . When all
Pops and performs all jobs on the current list of queues . When all
@ -332,12 +343,7 @@ class Worker(object):
if self . stopped :
if self . stopped :
self . log . info ( ' Stopping on request. ' )
self . log . info ( ' Stopping on request. ' )
break
break
self . set_state ( ' idle ' )
qnames = self . queue_names ( )
self . procline ( ' Listening on %s ' % ' , ' . join ( qnames ) )
self . log . info ( ' ' )
self . log . info ( ' *** Listening on %s ... ' %
green ( ' , ' . join ( qnames ) ) )
timeout = None if burst else max ( 1 , self . default_worker_ttl - 60 )
timeout = None if burst else max ( 1 , self . default_worker_ttl - 60 )
try :
try :
result = self . dequeue_job_and_maintain_ttl ( timeout )
result = self . dequeue_job_and_maintain_ttl ( timeout )
@ -346,21 +352,10 @@ class Worker(object):
except StopRequested :
except StopRequested :
break
break
self . set_state ( ' busy ' )
job , queue = result
job , queue = result
self . set_current_job_id ( job . id )
# Use the public setter here, to immediately update Redis
job . set_status ( Status . STARTED )
self . log . info ( ' %s : %s ( %s ) ' % ( green ( queue . name ) ,
blue ( job . description ) , job . id ) )
self . heartbeat ( ( job . timeout or 180 ) + 60 )
self . execute_job ( job )
self . execute_job ( job )
self . heartbeat ( )
self . heartbeat ( )
self . set_current_job_id ( None )
if job . get_status ( ) == Status . FINISHED :
if job . get_status ( ) == Status . FINISHED :
queue . enqueue_dependents ( job )
queue . enqueue_dependents ( job )
@ -372,11 +367,25 @@ class Worker(object):
def dequeue_job_and_maintain_ttl ( self , timeout ) :
def dequeue_job_and_maintain_ttl ( self , timeout ) :
result = None
result = None
qnames = self . queue_names ( )
self . set_state ( ' idle ' )
self . procline ( ' Listening on %s ' % ' , ' . join ( qnames ) )
self . log . info ( ' ' )
self . log . info ( ' *** Listening on %s ... ' %
green ( ' , ' . join ( qnames ) ) )
while True :
while True :
self . heartbeat ( )
self . heartbeat ( )
try :
try :
result = Queue . dequeue_any ( self . queues , timeout ,
result = Queue . dequeue_any ( self . queues , timeout ,
connection = self . connection )
connection = self . connection )
if result is not None :
job , queue = result
self . log . info ( ' %s : %s ( %s ) ' % ( green ( queue . name ) ,
blue ( job . description ) , job . id ) )
break
break
except DequeueTimeout :
except DequeueTimeout :
pass
pass
@ -453,25 +462,31 @@ class Worker(object):
""" Performs the actual work of a job. Will/should only be called
""" Performs the actual work of a job. Will/should only be called
inside the work horse ' s process.
inside the work horse ' s process.
"""
"""
self . set_state ( ' busy ' )
self . set_current_job_id ( job . id )
self . heartbeat ( ( job . timeout or 180 ) + 60 )
self . procline ( ' Processing %s from %s since %s ' % (
self . procline ( ' Processing %s from %s since %s ' % (
job . func_name ,
job . func_name ,
job . origin , time . time ( ) ) )
job . origin , time . time ( ) ) )
with self . connection . _pipeline ( ) as pipeline :
with self . connection . _pipeline ( ) as pipeline :
try :
try :
with death_penalty_after ( job . timeout or Queue . DEFAULT_TIMEOUT ) :
with self . death_penalty_class ( job . timeout or Queue . DEFAULT_TIMEOUT ) :
rv = job . perform ( )
rv = job . perform ( )
# Pickle the result in the same try-except block since we need to
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
# use the same exc handling when pickling fails
job . _result = rv
job . _result = rv
job . _status = Status . FINISHED
job . ended_at = utcnow ( )
self . set_current_job_id ( None , pipeline = pipeline )
result_ttl = job . get_ttl ( self . default_result_ttl )
result_ttl = job . get_ttl ( self . default_result_ttl )
if result_ttl != 0 :
if result_ttl != 0 :
job . save ( pipeline = pipeline )
job . save ( pipeline = pipeline )
job . cleanup ( result_ttl , pipeline = pipeline )
job . cleanup ( result_ttl , pipeline = pipeline )
pipeline . execute ( )
pipeline . execute ( )
except Exception :
except Exception :