|
|
|
@ -132,15 +132,9 @@ class Worker(object):
|
|
|
|
|
connection=connection,
|
|
|
|
|
job_class=job_class,
|
|
|
|
|
queue_class=queue_class)
|
|
|
|
|
queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job')
|
|
|
|
|
queues = as_text(queues)
|
|
|
|
|
worker._state = as_text(state or '?')
|
|
|
|
|
worker._job_id = job_id or None
|
|
|
|
|
if queues:
|
|
|
|
|
worker.queues = [worker.queue_class(queue,
|
|
|
|
|
connection=connection,
|
|
|
|
|
job_class=job_class)
|
|
|
|
|
for queue in queues.split(',')]
|
|
|
|
|
|
|
|
|
|
worker.refresh()
|
|
|
|
|
|
|
|
|
|
return worker
|
|
|
|
|
|
|
|
|
|
def __init__(self, queues, name=None, default_result_ttl=None, connection=None,
|
|
|
|
@ -179,6 +173,10 @@ class Worker(object):
|
|
|
|
|
self.failed_queue = get_failed_queue(connection=self.connection,
|
|
|
|
|
job_class=self.job_class)
|
|
|
|
|
self.last_cleaned_at = None
|
|
|
|
|
self.successful_job_count = 0
|
|
|
|
|
self.failed_job_count = 0
|
|
|
|
|
self.total_working_time = 0
|
|
|
|
|
self.birth_date = None
|
|
|
|
|
|
|
|
|
|
# By default, push the "move-to-failed-queue" exception handler onto
|
|
|
|
|
# the stack
|
|
|
|
@ -264,7 +262,11 @@ class Worker(object):
|
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
|
|
with self.connection._pipeline() as p:
|
|
|
|
|
p.delete(key)
|
|
|
|
|
p.hset(key, 'birth', utcformat(utcnow()))
|
|
|
|
|
now = utcnow()
|
|
|
|
|
now_in_string = utcformat(utcnow())
|
|
|
|
|
self.birth_date = now
|
|
|
|
|
p.hset(key, 'birth', now_in_string)
|
|
|
|
|
p.hset(key, 'last_heartbeat', now_in_string)
|
|
|
|
|
p.hset(key, 'queues', queues)
|
|
|
|
|
p.sadd(self.redis_workers_keys, key)
|
|
|
|
|
p.expire(key, self.default_worker_ttl)
|
|
|
|
@ -285,12 +287,12 @@ class Worker(object):
|
|
|
|
|
"""Sets the date on which the worker received a (warm) shutdown request"""
|
|
|
|
|
self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def birth_date(self):
|
|
|
|
|
"""Fetches birth date from Redis."""
|
|
|
|
|
birth_timestamp = self.connection.hget(self.key, 'birth')
|
|
|
|
|
if birth_timestamp is not None:
|
|
|
|
|
return utcparse(as_text(birth_timestamp))
|
|
|
|
|
# @property
|
|
|
|
|
# def birth_date(self):
|
|
|
|
|
# """Fetches birth date from Redis."""
|
|
|
|
|
# birth_timestamp = self.connection.hget(self.key, 'birth')
|
|
|
|
|
# if birth_timestamp is not None:
|
|
|
|
|
# return utcparse(as_text(birth_timestamp))
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def shutdown_requested_date(self):
|
|
|
|
@ -525,9 +527,46 @@ class Worker(object):
|
|
|
|
|
timeout = max(timeout, self.default_worker_ttl)
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
connection.expire(self.key, timeout)
|
|
|
|
|
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
|
|
|
|
|
self.log.debug('Sent heartbeat to prevent worker timeout. '
|
|
|
|
|
'Next one should arrive within {0} seconds.'.format(timeout))
|
|
|
|
|
|
|
|
|
|
def refresh(self):
|
|
|
|
|
data = self.connection.hmget(
|
|
|
|
|
self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
|
|
|
|
|
'birth', 'failed_job_count', 'successful_job_count', 'total_working_time'
|
|
|
|
|
)
|
|
|
|
|
queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data
|
|
|
|
|
queues = as_text(queues)
|
|
|
|
|
self._state = as_text(state or '?')
|
|
|
|
|
self._job_id = job_id or None
|
|
|
|
|
self.last_heartbeat = utcparse(as_text(last_heartbeat))
|
|
|
|
|
self.birth_date = utcparse(as_text(birth))
|
|
|
|
|
if failed_job_count:
|
|
|
|
|
self.failed_job_count = int(as_text(failed_job_count))
|
|
|
|
|
if successful_job_count:
|
|
|
|
|
self.successful_job_count = int(as_text(successful_job_count))
|
|
|
|
|
if total_working_time:
|
|
|
|
|
self.total_working_time = float(as_text(total_working_time))
|
|
|
|
|
|
|
|
|
|
if queues:
|
|
|
|
|
self.queues = [self.queue_class(queue,
|
|
|
|
|
connection=self.connection,
|
|
|
|
|
job_class=self.job_class)
|
|
|
|
|
for queue in queues.split(',')]
|
|
|
|
|
|
|
|
|
|
def increment_failed_job_count(self, pipeline=None):
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
connection.hincrby(self.key, 'failed_job_count', 1)
|
|
|
|
|
|
|
|
|
|
def increment_successful_job_count(self, pipeline=None):
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
connection.hincrby(self.key, 'successful_job_count', 1)
|
|
|
|
|
|
|
|
|
|
def increment_total_working_time(self, job_execution_time, pipeline):
|
|
|
|
|
pipeline.hincrbyfloat(self.key, 'total_working_time',
|
|
|
|
|
job_execution_time.microseconds)
|
|
|
|
|
|
|
|
|
|
def fork_work_horse(self, job, queue):
|
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|
|
"""
|
|
|
|
@ -567,6 +606,10 @@ class Worker(object):
|
|
|
|
|
if job_status is None: # Job completed and its ttl has expired
|
|
|
|
|
return
|
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
|
|
|
|
|
|
if not job.ended_at:
|
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
|
|
|
|
|
|
self.handle_job_failure(job=job)
|
|
|
|
|
|
|
|
|
|
# Unhandled failure: move the job to the failed queue
|
|
|
|
@ -635,8 +678,7 @@ class Worker(object):
|
|
|
|
|
job_class=self.job_class)
|
|
|
|
|
registry.add(job, timeout, pipeline=pipeline)
|
|
|
|
|
job.set_status(JobStatus.STARTED, pipeline=pipeline)
|
|
|
|
|
self.connection._hset(job.key, 'started_at',
|
|
|
|
|
utcformat(utcnow()), pipeline)
|
|
|
|
|
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
@ -648,7 +690,6 @@ class Worker(object):
|
|
|
|
|
2. Removing the job from the started_job_registry
|
|
|
|
|
3. Setting the workers current job to None
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
if started_job_registry is None:
|
|
|
|
|
started_job_registry = StartedJobRegistry(job.origin,
|
|
|
|
@ -657,6 +698,11 @@ class Worker(object):
|
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
|
self.increment_failed_job_count(pipeline)
|
|
|
|
|
if job.started_at and job.ended_at:
|
|
|
|
|
self.increment_total_working_time(job.ended_at - job.started_at,
|
|
|
|
|
pipeline)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
except Exception:
|
|
|
|
@ -665,6 +711,7 @@ class Worker(object):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def handle_job_success(self, job, queue, started_job_registry):
|
|
|
|
|
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
@ -675,6 +722,10 @@ class Worker(object):
|
|
|
|
|
queue.enqueue_dependents(job, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
|
self.increment_successful_job_count(pipeline=pipeline)
|
|
|
|
|
self.increment_total_working_time(
|
|
|
|
|
job.ended_at - job.started_at, pipeline
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
result_ttl = job.get_result_ttl(self.default_result_ttl)
|
|
|
|
|
if result_ttl != 0:
|
|
|
|
@ -708,7 +759,8 @@ class Worker(object):
|
|
|
|
|
self.connection,
|
|
|
|
|
job_class=self.job_class)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
try:
|
|
|
|
|
job.started_at = utcnow()
|
|
|
|
|
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
|
|
|
|
|
rv = job.perform()
|
|
|
|
|
|
|
|
|
@ -722,6 +774,8 @@ class Worker(object):
|
|
|
|
|
queue=queue,
|
|
|
|
|
started_job_registry=started_job_registry)
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
|
self.handle_job_failure(job=job,
|
|
|
|
|
started_job_registry=started_job_registry)
|
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
|