Store worker's RQ and Python versions (#1125)

* Store worker version to Redis

* Store worker's Python version to Redis

* Store worker version in __init__ body as suggested in review
main
Vladimir Protasov 5 years ago committed by Selwin Ong
parent 68276e7252
commit 8c34e2b353

@ -175,6 +175,8 @@ class Worker(object):
self.job_class = backend_class(self, 'job_class', override=job_class) self.job_class = backend_class(self, 'job_class', override=job_class)
self.queue_class = backend_class(self, 'queue_class', override=queue_class) self.queue_class = backend_class(self, 'queue_class', override=queue_class)
self.version = VERSION
self.python_version = sys.version
queues = [self.queue_class(name=q, queues = [self.queue_class(name=q,
connection=connection, connection=connection,
@ -268,6 +270,8 @@ class Worker(object):
p.hset(key, 'queues', queues) p.hset(key, 'queues', queues)
p.hset(key, 'pid', self.pid) p.hset(key, 'pid', self.pid)
p.hset(key, 'hostname', self.hostname) p.hset(key, 'hostname', self.hostname)
p.hset(key, 'version', self.version)
p.hset(key, 'python_version', self.python_version)
worker_registration.register(self, p) worker_registration.register(self, p)
p.expire(key, self.default_worker_ttl) p.expire(key, self.default_worker_ttl)
p.execute() p.execute()
@ -558,13 +562,15 @@ class Worker(object):
data = self.connection.hmget( data = self.connection.hmget(
self.key, 'queues', 'state', 'current_job', 'last_heartbeat', self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
'birth', 'failed_job_count', 'successful_job_count', 'birth', 'failed_job_count', 'successful_job_count',
'total_working_time', 'hostname', 'pid' 'total_working_time', 'hostname', 'pid', 'version', 'python_version',
) )
(queues, state, job_id, last_heartbeat, birth, failed_job_count, (queues, state, job_id, last_heartbeat, birth, failed_job_count,
successful_job_count, total_working_time, hostname, pid) = data successful_job_count, total_working_time, hostname, pid, version, python_version) = data
queues = as_text(queues) queues = as_text(queues)
self.hostname = hostname self.hostname = hostname
self.pid = int(pid) if pid else None self.pid = int(pid) if pid else None
self.version = as_text(version)
self.python_version = as_text(python_version)
self._state = as_text(state or '?') self._state = as_text(state or '?')
self._job_id = job_id or None self._job_id = job_id or None
if last_heartbeat: if last_heartbeat:

@ -33,6 +33,7 @@ from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow from rq.utils import utcnow
from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus from rq.worker import HerokuWorker, WorkerStatus
@ -140,6 +141,7 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.get_state(), WorkerStatus.STARTED) self.assertEqual(worker.get_state(), WorkerStatus.STARTED)
self.assertEqual(worker._job_id, None) self.assertEqual(worker._job_id, None)
self.assertTrue(worker.key in Worker.all_keys(worker.connection)) self.assertTrue(worker.key in Worker.all_keys(worker.connection))
self.assertEqual(worker.version, VERSION)
# If worker is gone, its keys should also be removed # If worker is gone, its keys should also be removed
worker.connection.delete(worker.key) worker.connection.delete(worker.key)
@ -920,6 +922,35 @@ class TestWorker(RQTestCase):
w.dequeue_job_and_maintain_ttl(10) w.dequeue_job_and_maintain_ttl(10)
self.assertNotIn("Frank", mock_logger_info.call_args[0][2]) self.assertNotIn("Frank", mock_logger_info.call_args[0][2])
def test_worker_version(self):
q = Queue()
w = Worker([q])
w.version = '0.0.0'
w.register_birth()
self.assertEqual(w.version, '0.0.0')
w.refresh()
self.assertEqual(w.version, '0.0.0')
# making sure that version is preserved when worker is retrieved by key
worker = Worker.find_by_key(w.key)
self.assertEqual(worker.version, '0.0.0')
def test_python_version(self):
python_version = sys.version
q = Queue()
w = Worker([q])
w.register_birth()
self.assertEqual(w.python_version, python_version)
# now patching version
python_version = 'X.Y.Z.final' # dummy version
self.assertNotEqual(python_version, sys.version) # otherwise tests are pointless
w2 = Worker([q])
w2.python_version = python_version
w2.register_birth()
self.assertEqual(w2.python_version, python_version)
# making sure that version is preserved when worker is retrieved by key
worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version)
def kill_worker(pid, double_kill): def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process # wait for the worker to be started over on the main process

Loading…
Cancel
Save