|
|
@ -14,10 +14,10 @@ import traceback
|
|
|
|
import logging
|
|
|
|
import logging
|
|
|
|
from .queue import Queue, get_failed_queue
|
|
|
|
from .queue import Queue, get_failed_queue
|
|
|
|
from .connections import get_current_connection
|
|
|
|
from .connections import get_current_connection
|
|
|
|
from .job import Job, Status
|
|
|
|
from .job import Status
|
|
|
|
from .utils import make_colorizer, utcnow, utcformat
|
|
|
|
from .utils import make_colorizer, utcnow, utcformat
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
from .exceptions import NoQueueError, UnpickleError, DequeueTimeout
|
|
|
|
from .exceptions import NoQueueError, DequeueTimeout
|
|
|
|
from .timeouts import death_penalty_after
|
|
|
|
from .timeouts import death_penalty_after
|
|
|
|
from .version import VERSION
|
|
|
|
from .version import VERSION
|
|
|
|
from rq.compat import text_type, as_text
|
|
|
|
from rq.compat import text_type, as_text
|
|
|
@ -213,14 +213,24 @@ class Worker(object):
|
|
|
|
p.expire(self.key, 60)
|
|
|
|
p.expire(self.key, 60)
|
|
|
|
p.execute()
|
|
|
|
p.execute()
|
|
|
|
|
|
|
|
|
|
|
|
def set_state(self, new_state):
|
|
|
|
def set_state(self, state):
|
|
|
|
self._state = new_state
|
|
|
|
self._state = state
|
|
|
|
self.connection.hset(self.key, 'state', new_state)
|
|
|
|
self.connection.hset(self.key, 'state', state)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_state(self, state):
|
|
|
|
|
|
|
|
self.set_state(state)
|
|
|
|
|
|
|
|
|
|
|
|
def get_state(self):
|
|
|
|
def get_state(self):
|
|
|
|
return self._state
|
|
|
|
return self._state
|
|
|
|
|
|
|
|
|
|
|
|
state = property(get_state, set_state)
|
|
|
|
def _get_state(self):
|
|
|
|
|
|
|
|
"""Raise a DeprecationWarning if ``worker.state == X`` is used"""
|
|
|
|
|
|
|
|
raise DeprecationWarning(
|
|
|
|
|
|
|
|
"worker.state is deprecated, use worker.get_state() instead."
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
return self.get_state()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
state = property(_get_state, _set_state)
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def stopped(self):
|
|
|
|
def stopped(self):
|
|
|
@ -263,7 +273,7 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
# If shutdown is requested in the middle of a job, wait until
|
|
|
|
# If shutdown is requested in the middle of a job, wait until
|
|
|
|
# finish before shutting down
|
|
|
|
# finish before shutting down
|
|
|
|
if self.state == 'busy':
|
|
|
|
if self.get_state() == 'busy':
|
|
|
|
self._stopped = True
|
|
|
|
self._stopped = True
|
|
|
|
self.log.debug('Stopping after current horse is finished. '
|
|
|
|
self.log.debug('Stopping after current horse is finished. '
|
|
|
|
'Press Ctrl+C again for a cold shutdown.')
|
|
|
|
'Press Ctrl+C again for a cold shutdown.')
|
|
|
@ -289,13 +299,13 @@ class Worker(object):
|
|
|
|
did_perform_work = False
|
|
|
|
did_perform_work = False
|
|
|
|
self.register_birth()
|
|
|
|
self.register_birth()
|
|
|
|
self.log.info('RQ worker started, version %s' % VERSION)
|
|
|
|
self.log.info('RQ worker started, version %s' % VERSION)
|
|
|
|
self.state = 'starting'
|
|
|
|
self.set_state('starting')
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
if self.stopped:
|
|
|
|
if self.stopped:
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
break
|
|
|
|
break
|
|
|
|
self.state = 'idle'
|
|
|
|
self.set_state('idle')
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('')
|
|
|
@ -309,7 +319,7 @@ class Worker(object):
|
|
|
|
except StopRequested:
|
|
|
|
except StopRequested:
|
|
|
|
break
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
self.state = 'busy'
|
|
|
|
self.set_state('busy')
|
|
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
job, queue = result
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|