|
|
|
@ -12,18 +12,20 @@ import sys
|
|
|
|
|
import time
|
|
|
|
|
import traceback
|
|
|
|
|
import warnings
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
from rq.compat import as_text, string_types, text_type
|
|
|
|
|
|
|
|
|
|
from .connections import get_current_connection
|
|
|
|
|
from .exceptions import DequeueTimeout, NoQueueError
|
|
|
|
|
from .job import Job, Status
|
|
|
|
|
from .job import Job, JobStatus
|
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
|
from .queue import get_failed_queue, Queue
|
|
|
|
|
from .timeouts import UnixSignalDeathPenalty
|
|
|
|
|
from .utils import import_attribute, make_colorizer, utcformat, utcnow
|
|
|
|
|
from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
|
|
|
|
|
from .version import VERSION
|
|
|
|
|
from .registry import FinishedJobRegistry, StartedJobRegistry
|
|
|
|
|
from .suspension import is_suspended
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from procname import setprocname
|
|
|
|
@ -52,8 +54,8 @@ def compact(l):
|
|
|
|
|
return [x for x in l if x is not None]
|
|
|
|
|
|
|
|
|
|
_signames = dict((getattr(signal, signame), signame)
|
|
|
|
|
for signame in dir(signal)
|
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
|
for signame in dir(signal)
|
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def signal_name(signum):
|
|
|
|
@ -65,6 +67,12 @@ def signal_name(signum):
|
|
|
|
|
return 'SIG_UNKNOWN'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WorkerStatus = enum('WorkerStatus',
|
|
|
|
|
STARTED='started', SUSPENDED='suspended', BUSY='busy',
|
|
|
|
|
IDLE='idle'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Worker(object):
|
|
|
|
|
redis_worker_namespace_prefix = 'rq:worker:'
|
|
|
|
|
redis_workers_keys = 'rq:workers'
|
|
|
|
@ -333,6 +341,30 @@ class Worker(object):
|
|
|
|
|
signal.signal(signal.SIGINT, request_stop)
|
|
|
|
|
signal.signal(signal.SIGTERM, request_stop)
|
|
|
|
|
|
|
|
|
|
def check_for_suspension(self, burst):
|
|
|
|
|
"""Check to see if the workers have been suspended by something like `rq suspend`"""
|
|
|
|
|
|
|
|
|
|
before_state = None
|
|
|
|
|
notified = False
|
|
|
|
|
|
|
|
|
|
while not self.stopped and is_suspended(self.connection):
|
|
|
|
|
|
|
|
|
|
if burst:
|
|
|
|
|
self.log.info('Suspended in burst mode -- exiting.')
|
|
|
|
|
self.log.info('Note: There could still be unperformed jobs on the queue')
|
|
|
|
|
raise StopRequested
|
|
|
|
|
|
|
|
|
|
if not notified:
|
|
|
|
|
self.log.info('Worker suspended, use "rq resume" command to resume')
|
|
|
|
|
before_state = self.get_state()
|
|
|
|
|
self.set_state(WorkerStatus.SUSPENDED)
|
|
|
|
|
notified = True
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
|
if before_state:
|
|
|
|
|
self.set_state(before_state)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def work(self, burst=False):
|
|
|
|
|
"""Starts the work loop.
|
|
|
|
|
|
|
|
|
@ -348,15 +380,19 @@ class Worker(object):
|
|
|
|
|
did_perform_work = False
|
|
|
|
|
self.register_birth()
|
|
|
|
|
self.log.info('RQ worker started, version %s' % VERSION)
|
|
|
|
|
self.set_state('starting')
|
|
|
|
|
self.set_state(WorkerStatus.STARTED)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
|
|
|
|
if self.stopped:
|
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
|
try:
|
|
|
|
|
self.check_for_suspension(burst)
|
|
|
|
|
|
|
|
|
|
if self.stopped:
|
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
|
|
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
|
if result is None:
|
|
|
|
|
break
|
|
|
|
@ -367,20 +403,22 @@ class Worker(object):
|
|
|
|
|
self.execute_job(job)
|
|
|
|
|
self.heartbeat()
|
|
|
|
|
|
|
|
|
|
if job.get_status() == Status.FINISHED:
|
|
|
|
|
if job.get_status() == JobStatus.FINISHED:
|
|
|
|
|
queue.enqueue_dependents(job)
|
|
|
|
|
|
|
|
|
|
did_perform_work = True
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
if not self.is_horse:
|
|
|
|
|
self.register_death()
|
|
|
|
|
return did_perform_work
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
|
result = None
|
|
|
|
|
qnames = self.queue_names()
|
|
|
|
|
|
|
|
|
|
self.set_state('idle')
|
|
|
|
|
self.set_state(WorkerStatus.IDLE)
|
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
|
self.log.info('')
|
|
|
|
|
self.log.info('*** Listening on %s...' %
|
|
|
|
@ -395,7 +433,7 @@ class Worker(object):
|
|
|
|
|
if result is not None:
|
|
|
|
|
job, queue = result
|
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
|
except DequeueTimeout:
|
|
|
|
@ -477,12 +515,12 @@ class Worker(object):
|
|
|
|
|
timeout = (job.timeout or 180) + 60
|
|
|
|
|
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
self.set_state('busy', pipeline=pipeline)
|
|
|
|
|
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
|
|
|
|
|
self.set_current_job_id(job.id, pipeline=pipeline)
|
|
|
|
|
self.heartbeat(timeout, pipeline=pipeline)
|
|
|
|
|
registry = StartedJobRegistry(job.origin, self.connection)
|
|
|
|
|
registry.add(job, timeout, pipeline=pipeline)
|
|
|
|
|
job.set_status(Status.STARTED, pipeline=pipeline)
|
|
|
|
|
job.set_status(JobStatus.STARTED, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
@ -511,7 +549,7 @@ class Worker(object):
|
|
|
|
|
result_ttl = job.get_ttl(self.default_result_ttl)
|
|
|
|
|
if result_ttl != 0:
|
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
|
job._status = Status.FINISHED
|
|
|
|
|
job._status = JobStatus.FINISHED
|
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
|
|
|
|
@ -523,7 +561,7 @@ class Worker(object):
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
job.set_status(Status.FAILED, pipeline=pipeline)
|
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
@ -552,7 +590,7 @@ class Worker(object):
|
|
|
|
|
'arguments': job.args,
|
|
|
|
|
'kwargs': job.kwargs,
|
|
|
|
|
'queue': job.origin,
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|
|
self.log.debug('Invoking exception handler %s' % (handler,))
|
|
|
|
|