Main worker should use zadd(xx=True) to update heartbeat. (#1550)

main
Selwin Ong 3 years ago committed by GitHub
parent bac58f24ca
commit c556106a38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -442,11 +442,11 @@ class Job:
raise TypeError('id must be a string, not {0}'.format(type(value))) raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value self._id = value
def heartbeat(self, timestamp, ttl, pipeline=None): def heartbeat(self, timestamp, ttl, pipeline=None, xx=False):
self.last_heartbeat = timestamp self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
self.started_job_registry.add(self, ttl, pipeline=pipeline) self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx)
id = property(get_id, set_id) id = property(get_id, set_id)

@ -61,15 +61,15 @@ class BaseRegistry:
self.cleanup() self.cleanup()
return self.connection.zcard(self.key) return self.connection.zcard(self.key)
def add(self, job, ttl=0, pipeline=None): def add(self, job, ttl=0, pipeline=None, xx=False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf""" """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
score = ttl if ttl < 0 else current_timestamp() + ttl score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1: if score == -1:
score = '+inf' score = '+inf'
if pipeline is not None: if pipeline is not None:
return pipeline.zadd(self.key, {job.id: score}) return pipeline.zadd(self.key, {job.id: score}, xx=xx)
return self.connection.zadd(self.key, {job.id: score}) return self.connection.zadd(self.key, {job.id: score}, xx=xx)
def remove(self, job, pipeline=None, delete_job=False): def remove(self, job, pipeline=None, delete_job=False):
"""Removes job from registry and deletes it if `delete_job == True`""" """Removes job from registry and deletes it if `delete_job == True`"""

@ -814,7 +814,7 @@ class Worker:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job) ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline) job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
pipeline.execute() pipeline.execute()
except OSError as e: except OSError as e:

@ -449,12 +449,10 @@ class TestRQCli(RQTestCase):
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n' suffix = '\'.\n'
print(result.stdout) self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
self.assertTrue(result.stdout.startswith(prefix))
self.assertTrue(result.stdout.endswith(suffix))
job_id = result.stdout[len(prefix):-len(suffix)] job_id = result.output[len(prefix):-len(suffix)]
queue_key = 'rq:queue:default' queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)

Loading…
Cancel
Save