diff --git a/rq/queue.py b/rq/queue.py index 11e6553..a537536 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -510,7 +510,6 @@ class FailedQueue(Queue): # Add Queue key set self.connection.sadd(self.redis_queues_keys, self.key) - job.ended_at = utcnow() job.exc_info = exc_info job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire diff --git a/rq/worker.py b/rq/worker.py index 8dd855a..7602c2e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -132,15 +132,9 @@ class Worker(object): connection=connection, job_class=job_class, queue_class=queue_class) - queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job') - queues = as_text(queues) - worker._state = as_text(state or '?') - worker._job_id = job_id or None - if queues: - worker.queues = [worker.queue_class(queue, - connection=connection, - job_class=job_class) - for queue in queues.split(',')] + + worker.refresh() + return worker def __init__(self, queues, name=None, default_result_ttl=None, connection=None, @@ -179,6 +173,10 @@ class Worker(object): self.failed_queue = get_failed_queue(connection=self.connection, job_class=self.job_class) self.last_cleaned_at = None + self.successful_job_count = 0 + self.failed_job_count = 0 + self.total_working_time = 0 + self.birth_date = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -264,7 +262,11 @@ class Worker(object): queues = ','.join(self.queue_names()) with self.connection._pipeline() as p: p.delete(key) - p.hset(key, 'birth', utcformat(utcnow())) + now = utcnow() + now_in_string = utcformat(utcnow()) + self.birth_date = now + p.hset(key, 'birth', now_in_string) + p.hset(key, 'last_heartbeat', now_in_string) p.hset(key, 'queues', queues) p.sadd(self.redis_workers_keys, key) p.expire(key, self.default_worker_ttl) @@ -285,12 +287,12 @@ class Worker(object): """Sets the date on which the worker received a (warm) shutdown request""" self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow())) - @property - def birth_date(self): - """Fetches birth date from Redis.""" - birth_timestamp = self.connection.hget(self.key, 'birth') - if birth_timestamp is not None: - return utcparse(as_text(birth_timestamp)) + # @property + # def birth_date(self): + # """Fetches birth date from Redis.""" + # birth_timestamp = self.connection.hget(self.key, 'birth') + # if birth_timestamp is not None: + # return utcparse(as_text(birth_timestamp)) @property def shutdown_requested_date(self): @@ -525,9 +527,46 @@ class Worker(object): timeout = max(timeout, self.default_worker_ttl) connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) + connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) + def refresh(self): + data = self.connection.hmget( + self.key, 'queues', 'state', 'current_job', 'last_heartbeat', + 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time' + ) + queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data + queues = as_text(queues) + self._state = as_text(state or '?') + self._job_id = job_id or None + self.last_heartbeat = utcparse(as_text(last_heartbeat)) + self.birth_date = utcparse(as_text(birth)) + if failed_job_count: + self.failed_job_count = int(as_text(failed_job_count)) + if successful_job_count: + self.successful_job_count = int(as_text(successful_job_count)) + if total_working_time: + self.total_working_time = float(as_text(total_working_time)) + + if queues: + self.queues = [self.queue_class(queue, + connection=self.connection, + job_class=self.job_class) + for queue in queues.split(',')] + + def increment_failed_job_count(self, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + connection.hincrby(self.key, 'failed_job_count', 1) + + def increment_successful_job_count(self, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + connection.hincrby(self.key, 'successful_job_count', 1) + + def increment_total_working_time(self, job_execution_time, pipeline): + pipeline.hincrbyfloat(self.key, 'total_working_time', + job_execution_time.microseconds) + def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. """ @@ -567,6 +606,10 @@ class Worker(object): if job_status is None: # Job completed and its ttl has expired return if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + + if not job.ended_at: + job.ended_at = utcnow() + self.handle_job_failure(job=job) # Unhandled failure: move the job to the failed queue @@ -635,8 +678,7 @@ class Worker(object): job_class=self.job_class) registry.add(job, timeout, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline) - self.connection._hset(job.key, 'started_at', - utcformat(utcnow()), pipeline) + pipeline.hset(job.key, 'started_at', utcformat(utcnow())) pipeline.execute() msg = 'Processing {0} from {1} since {2}' @@ -648,7 +690,6 @@ class Worker(object): 2. Removing the job from the started_job_registry 3. Setting the workers current job to None """ - with self.connection._pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry(job.origin, @@ -657,6 +698,11 @@ class Worker(object): job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline) + self.increment_failed_job_count(pipeline) + if job.started_at and job.ended_at: + self.increment_total_working_time(job.ended_at - job.started_at, + pipeline) + try: pipeline.execute() except Exception: @@ -665,6 +711,7 @@ class Worker(object): pass def handle_job_success(self, job, queue, started_job_registry): + with self.connection._pipeline() as pipeline: while True: try: @@ -675,6 +722,10 @@ class Worker(object): queue.enqueue_dependents(job, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline) + self.increment_successful_job_count(pipeline=pipeline) + self.increment_total_working_time( + job.ended_at - job.started_at, pipeline + ) result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: @@ -708,7 +759,8 @@ class Worker(object): self.connection, job_class=self.job_class) - try: + try: + job.started_at = utcnow() with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -722,6 +774,8 @@ class Worker(object): queue=queue, started_job_registry=started_job_registry) except Exception: + + job.ended_at = utcnow() self.handle_job_failure(job=job, started_job_registry=started_job_registry) self.handle_exception(job, *sys.exc_info()) diff --git a/tests/test_worker.py b/tests/test_worker.py index 85435e7..3f76e4d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, import os import shutil -from datetime import timedelta +from datetime import datetime, timedelta from time import sleep import signal import time @@ -197,6 +197,18 @@ class TestWorker(RQTestCase): self.assertEqual(q.count, 0) self.assertEqual(failed_q.count, 1) + def test_heartbeat(self): + """Heartbeat saves last_heartbeat""" + q = Queue() + w = Worker([q]) + w.register_birth() + w.heartbeat() + last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat') + + self.assertTrue(last_heartbeat is not None) + w = Worker.find_by_key(w.key) + self.assertIsInstance(w.last_heartbeat, datetime) + def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue() @@ -230,6 +242,36 @@ class TestWorker(RQTestCase): self.assertEqual(str(job.enqueued_at), enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info + def test_statistics(self): + """Successful and failed job counts are saved properly""" + q = Queue() + job = q.enqueue(div_by_zero) + w = Worker([q]) + w.register_birth() + + self.assertEqual(w.failed_job_count, 0) + self.assertEqual(w.successful_job_count, 0) + self.assertEqual(w.total_working_time, 0) + + registry = StartedJobRegistry(connection=w.connection) + job.started_at = utcnow() + job.ended_at = job.started_at + timedelta(seconds=0.75) + w.handle_job_failure(job) + w.handle_job_success(job, q, registry) + + w.refresh() + self.assertEqual(w.failed_job_count, 1) + self.assertEqual(w.successful_job_count, 1) + self.assertEqual(w.total_working_time, 1500000) # 1.5 seconds in microseconds + + w.handle_job_failure(job) + w.handle_job_success(job, q, registry) + + w.refresh() + self.assertEqual(w.failed_job_count, 2) + self.assertEqual(w.successful_job_count, 2) + self.assertEqual(w.total_working_time, 3000000) + def test_custom_exc_handling(self): """Custom exception handling.""" def black_hole(job, *exc_info): @@ -559,7 +601,7 @@ class TestWorker(RQTestCase): death_date = w.death_date self.assertIsNotNone(death_date) - self.assertEqual(type(death_date).__name__, 'datetime') + self.assertIsInstance(death_date, datetime) def test_clean_queue_registries(self): """worker.clean_registries sets last_cleaned_at and cleans registries.""" @@ -799,6 +841,7 @@ def schedule_access_self(): q.enqueue(access_self) +@pytest.mark.skipif(sys.platform == 'darwin', reason='Fails on OS X') class TestWorkerSubprocess(RQTestCase): def setUp(self): super(TestWorkerSubprocess, self).setUp()