Worker statistics (#897)

* First stab at implementing worker statistics.

* Moved worker data restoration logic to worker.refresh().

* Failed and successfull job counts are now properly incremented.

* Worker now keeps track of total_working_time

* Ensure job.ended_at is set in the case of unhandled job failure.

* handle_job_failure shouldn't crash if job.started_at is not present.
main
Selwin Ong 7 years ago committed by GitHub
parent 32747b59fa
commit 1d7b5e834b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -510,7 +510,6 @@ class FailedQueue(Queue):
# Add Queue key set # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key) self.connection.sadd(self.redis_queues_keys, self.key)
job.ended_at = utcnow()
job.exc_info = exc_info job.exc_info = exc_info
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire

@ -132,15 +132,9 @@ class Worker(object):
connection=connection, connection=connection,
job_class=job_class, job_class=job_class,
queue_class=queue_class) queue_class=queue_class)
queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job')
queues = as_text(queues) worker.refresh()
worker._state = as_text(state or '?')
worker._job_id = job_id or None
if queues:
worker.queues = [worker.queue_class(queue,
connection=connection,
job_class=job_class)
for queue in queues.split(',')]
return worker return worker
def __init__(self, queues, name=None, default_result_ttl=None, connection=None, def __init__(self, queues, name=None, default_result_ttl=None, connection=None,
@ -179,6 +173,10 @@ class Worker(object):
self.failed_queue = get_failed_queue(connection=self.connection, self.failed_queue = get_failed_queue(connection=self.connection,
job_class=self.job_class) job_class=self.job_class)
self.last_cleaned_at = None self.last_cleaned_at = None
self.successful_job_count = 0
self.failed_job_count = 0
self.total_working_time = 0
self.birth_date = None
# By default, push the "move-to-failed-queue" exception handler onto # By default, push the "move-to-failed-queue" exception handler onto
# the stack # the stack
@ -264,7 +262,11 @@ class Worker(object):
queues = ','.join(self.queue_names()) queues = ','.join(self.queue_names())
with self.connection._pipeline() as p: with self.connection._pipeline() as p:
p.delete(key) p.delete(key)
p.hset(key, 'birth', utcformat(utcnow())) now = utcnow()
now_in_string = utcformat(utcnow())
self.birth_date = now
p.hset(key, 'birth', now_in_string)
p.hset(key, 'last_heartbeat', now_in_string)
p.hset(key, 'queues', queues) p.hset(key, 'queues', queues)
p.sadd(self.redis_workers_keys, key) p.sadd(self.redis_workers_keys, key)
p.expire(key, self.default_worker_ttl) p.expire(key, self.default_worker_ttl)
@ -285,12 +287,12 @@ class Worker(object):
"""Sets the date on which the worker received a (warm) shutdown request""" """Sets the date on which the worker received a (warm) shutdown request"""
self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow())) self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
@property # @property
def birth_date(self): # def birth_date(self):
"""Fetches birth date from Redis.""" # """Fetches birth date from Redis."""
birth_timestamp = self.connection.hget(self.key, 'birth') # birth_timestamp = self.connection.hget(self.key, 'birth')
if birth_timestamp is not None: # if birth_timestamp is not None:
return utcparse(as_text(birth_timestamp)) # return utcparse(as_text(birth_timestamp))
@property @property
def shutdown_requested_date(self): def shutdown_requested_date(self):
@ -525,9 +527,46 @@ class Worker(object):
timeout = max(timeout, self.default_worker_ttl) timeout = max(timeout, self.default_worker_ttl)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout) connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout)) 'Next one should arrive within {0} seconds.'.format(timeout))
def refresh(self):
data = self.connection.hmget(
self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
'birth', 'failed_job_count', 'successful_job_count', 'total_working_time'
)
queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data
queues = as_text(queues)
self._state = as_text(state or '?')
self._job_id = job_id or None
self.last_heartbeat = utcparse(as_text(last_heartbeat))
self.birth_date = utcparse(as_text(birth))
if failed_job_count:
self.failed_job_count = int(as_text(failed_job_count))
if successful_job_count:
self.successful_job_count = int(as_text(successful_job_count))
if total_working_time:
self.total_working_time = float(as_text(total_working_time))
if queues:
self.queues = [self.queue_class(queue,
connection=self.connection,
job_class=self.job_class)
for queue in queues.split(',')]
def increment_failed_job_count(self, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1)
def increment_successful_job_count(self, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'successful_job_count', 1)
def increment_total_working_time(self, job_execution_time, pipeline):
pipeline.hincrbyfloat(self.key, 'total_working_time',
job_execution_time.microseconds)
def fork_work_horse(self, job, queue): def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.
""" """
@ -567,6 +606,10 @@ class Worker(object):
if job_status is None: # Job completed and its ttl has expired if job_status is None: # Job completed and its ttl has expired
return return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
self.handle_job_failure(job=job) self.handle_job_failure(job=job)
# Unhandled failure: move the job to the failed queue # Unhandled failure: move the job to the failed queue
@ -635,8 +678,7 @@ class Worker(object):
job_class=self.job_class) job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline) registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline)
self.connection._hset(job.key, 'started_at', pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
utcformat(utcnow()), pipeline)
pipeline.execute() pipeline.execute()
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
@ -648,7 +690,6 @@ class Worker(object):
2. Removing the job from the started_job_registry 2. Removing the job from the started_job_registry
3. Setting the workers current job to None 3. Setting the workers current job to None
""" """
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
if started_job_registry is None: if started_job_registry is None:
started_job_registry = StartedJobRegistry(job.origin, started_job_registry = StartedJobRegistry(job.origin,
@ -657,6 +698,11 @@ class Worker(object):
job.set_status(JobStatus.FAILED, pipeline=pipeline) job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
self.increment_total_working_time(job.ended_at - job.started_at,
pipeline)
try: try:
pipeline.execute() pipeline.execute()
except Exception: except Exception:
@ -665,6 +711,7 @@ class Worker(object):
pass pass
def handle_job_success(self, job, queue, started_job_registry): def handle_job_success(self, job, queue, started_job_registry):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
while True: while True:
try: try:
@ -675,6 +722,10 @@ class Worker(object):
queue.enqueue_dependents(job, pipeline=pipeline) queue.enqueue_dependents(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time(
job.ended_at - job.started_at, pipeline
)
result_ttl = job.get_result_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
@ -709,6 +760,7 @@ class Worker(object):
job_class=self.job_class) job_class=self.job_class)
try: try:
job.started_at = utcnow()
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
@ -722,6 +774,8 @@ class Worker(object):
queue=queue, queue=queue,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
except Exception: except Exception:
job.ended_at = utcnow()
self.handle_job_failure(job=job, self.handle_job_failure(job=job,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())

@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import os import os
import shutil import shutil
from datetime import timedelta from datetime import datetime, timedelta
from time import sleep from time import sleep
import signal import signal
import time import time
@ -197,6 +197,18 @@ class TestWorker(RQTestCase):
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1) self.assertEqual(failed_q.count, 1)
def test_heartbeat(self):
"""Heartbeat saves last_heartbeat"""
q = Queue()
w = Worker([q])
w.register_birth()
w.heartbeat()
last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat')
self.assertTrue(last_heartbeat is not None)
w = Worker.find_by_key(w.key)
self.assertIsInstance(w.last_heartbeat, datetime)
def test_work_fails(self): def test_work_fails(self):
"""Failing jobs are put on the failed queue.""" """Failing jobs are put on the failed queue."""
q = Queue() q = Queue()
@ -230,6 +242,36 @@ class TestWorker(RQTestCase):
self.assertEqual(str(job.enqueued_at), enqueued_at_date) self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info self.assertIsNotNone(job.exc_info) # should contain exc_info
def test_statistics(self):
"""Successful and failed job counts are saved properly"""
q = Queue()
job = q.enqueue(div_by_zero)
w = Worker([q])
w.register_birth()
self.assertEqual(w.failed_job_count, 0)
self.assertEqual(w.successful_job_count, 0)
self.assertEqual(w.total_working_time, 0)
registry = StartedJobRegistry(connection=w.connection)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
w.handle_job_failure(job)
w.handle_job_success(job, q, registry)
w.refresh()
self.assertEqual(w.failed_job_count, 1)
self.assertEqual(w.successful_job_count, 1)
self.assertEqual(w.total_working_time, 1500000) # 1.5 seconds in microseconds
w.handle_job_failure(job)
w.handle_job_success(job, q, registry)
w.refresh()
self.assertEqual(w.failed_job_count, 2)
self.assertEqual(w.successful_job_count, 2)
self.assertEqual(w.total_working_time, 3000000)
def test_custom_exc_handling(self): def test_custom_exc_handling(self):
"""Custom exception handling.""" """Custom exception handling."""
def black_hole(job, *exc_info): def black_hole(job, *exc_info):
@ -559,7 +601,7 @@ class TestWorker(RQTestCase):
death_date = w.death_date death_date = w.death_date
self.assertIsNotNone(death_date) self.assertIsNotNone(death_date)
self.assertEqual(type(death_date).__name__, 'datetime') self.assertIsInstance(death_date, datetime)
def test_clean_queue_registries(self): def test_clean_queue_registries(self):
"""worker.clean_registries sets last_cleaned_at and cleans registries.""" """worker.clean_registries sets last_cleaned_at and cleans registries."""
@ -799,6 +841,7 @@ def schedule_access_self():
q.enqueue(access_self) q.enqueue(access_self)
@pytest.mark.skipif(sys.platform == 'darwin', reason='Fails on OS X')
class TestWorkerSubprocess(RQTestCase): class TestWorkerSubprocess(RQTestCase):
def setUp(self): def setUp(self):
super(TestWorkerSubprocess, self).setUp() super(TestWorkerSubprocess, self).setUp()

Loading…
Cancel
Save