From 55c541bc5914cc09cb634c815337e533c37180db Mon Sep 17 00:00:00 2001 From: glaslos Date: Mon, 24 Nov 2014 14:12:51 +0100 Subject: [PATCH 01/10] added job ttl to queue.enqueue() --- rq/job.py | 17 ++++++++++++++--- rq/queue.py | 9 +++++---- rq/worker.py | 2 +- tests/test_job.py | 20 +++++++++++++++----- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index e8080f8..a026fda 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,6 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.job_ttl = job_ttl job.timeout = timeout job._status = status @@ -311,6 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.job_ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -455,6 +457,8 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) + if self.job_ttl: + connection.expire(key, self.job_ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -491,8 +495,15 @@ class Job(object): return self._result def get_ttl(self, default_ttl=None): - """Returns ttl for a job that determines how long a job and its result - will be persisted. In the future, this method will also be responsible + """Returns ttl for a job that determines how long a job will be + persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.job_ttl is None else self.job_ttl + + def get_result_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a jobs result will + be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.result_ttl is None else self.result_ttl diff --git a/rq/queue.py b/rq/queue.py index ff5f860..9a2c9af 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,8 +167,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None, - job_id=None): + result_ttl=None, job_ttl=None, description=None, + depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED, + result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,6 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) + job_ttl = kwargs.pop('job_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -238,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, + timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..20cd833 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -508,7 +508,7 @@ class Worker(object): self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_ttl(self.default_result_ttl) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() job._status = Status.FINISHED diff --git a/tests/test_job.py b/tests/test_job.py index 28fad40..27cebb0 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -290,17 +290,27 @@ class TestJob(RQTestCase): self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) - def test_get_ttl(self): + def test_get_result_ttl(self): + """Getting job result TTL.""" + job_result_ttl = 1 + default_ttl = 2 + job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) + self.assertEqual(job.get_result_ttl(), job_result_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_result_ttl(), None) + + def test_get_job_ttl(self): """Getting job TTL.""" job_ttl = 1 - default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_ttl) + job = Job.create(func=say_hello, job_ttl=job_ttl) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), job_ttl) job = Job.create(func=say_hello) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_ttl(), None) def test_cleanup(self): From 8a3fd91e2ec2bc09af30652f3ed9b8ce846d8922 Mon Sep 17 00:00:00 2001 From: glaslos Date: Wed, 26 Nov 2014 10:12:52 +0100 Subject: [PATCH 02/10] renaming job.job_ttl to job.ttl --- rq/job.py | 12 ++++++------ rq/queue.py | 8 ++++---- tests/test_job.py | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index a026fda..865ca30 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,7 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl - job.job_ttl = job_ttl + job.ttl = ttl job.timeout = timeout job._status = status @@ -312,7 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None - self.job_ttl = None + self.ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -457,8 +457,8 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) - if self.job_ttl: - connection.expire(key, self.job_ttl) + if self.ttl: + connection.expire(key, self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -499,7 +499,7 @@ class Job(object): persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ - return default_ttl if self.job_ttl is None else self.job_ttl + return default_ttl if self.ttl is None else self.ttl def get_result_ttl(self, default_ttl=None): """Returns ttl for a job that determines how long a jobs result will diff --git a/rq/queue.py b/rq/queue.py index 9a2c9af..ccd4e6b 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,7 +167,7 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, job_ttl=None, description=None, + result_ttl=None, ttl=None, description=None, depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED, + result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,7 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) - job_ttl = kwargs.pop('job_ttl', None) + ttl = kwargs.pop('ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -239,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl, + timeout=timeout, result_ttl=result_ttl, ttl=ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/tests/test_job.py b/tests/test_job.py index 27cebb0..34859a7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -305,10 +305,10 @@ class TestJob(RQTestCase): def test_get_job_ttl(self): """Getting job TTL.""" - job_ttl = 1 - job = Job.create(func=say_hello, job_ttl=job_ttl) + ttl = 1 + job = Job.create(func=say_hello, ttl=ttl) job.save() - self.assertEqual(job.get_ttl(), job_ttl) + self.assertEqual(job.get_ttl(), ttl) job = Job.create(func=say_hello) job.save() self.assertEqual(job.get_ttl(), None) From c6a83eaa824bc8cd4459356afd1569b2ef7f95f7 Mon Sep 17 00:00:00 2001 From: glaslos Date: Mon, 1 Dec 2014 11:37:54 +0100 Subject: [PATCH 03/10] calling self.cleanup to set job expiration --- rq/job.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 865ca30..8067a01 100644 --- a/rq/job.py +++ b/rq/job.py @@ -457,8 +457,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) - if self.ttl: - connection.expire(key, self.ttl) + self.cleanup(self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -524,14 +523,16 @@ class Job(object): def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its - result depends on the value of result_ttl: - - If result_ttl is 0, cleanup the job immediately. + result depends on the value of ttl: + - If ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist + - If ttl is negative, don't set an expiry to it (persist forever) """ if ttl == 0: self.cancel() + elif not ttl: + return elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) From 9f699d758d125a2e8b0b3f962039643ac6793e27 Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:19:37 -0500 Subject: [PATCH 04/10] Any negative number should cause a job to live forever in a registry. While the documentation explicitly mentions ttl=-1, this matches better the behaviour of the job.cleanup() method. --- rq/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/registry.py b/rq/registry.py index a1b89b2..c620eb9 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -29,7 +29,7 @@ class BaseRegistry(object): def add(self, job, timeout, pipeline=None): """Adds a job to a registry with expiry time of now + timeout.""" - score = timeout if timeout == -1 else current_timestamp() + timeout + score = timeout if timeout < 0 else current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) From 6ab7070a923f6c54c6f253ee6814cdf4dfd0663c Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:23:38 -0500 Subject: [PATCH 05/10] Rename registry timeout to registry ttl This matches job.cleanup() better and (I think) makes the purpose slightly clearer. --- rq/registry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index c620eb9..beabbad 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -27,9 +27,9 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, timeout, pipeline=None): - """Adds a job to a registry with expiry time of now + timeout.""" - score = timeout if timeout < 0 else current_timestamp() + timeout + def add(self, job, ttl, pipeline=None): + """Adds a job to a registry with expiry time of now + ttl.""" + score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: return pipeline.zadd(self.key, score, job.id) From 09cab7a90d5898779ca57a433ebcaca5c630ced2 Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Mon, 1 Dec 2014 17:50:30 -0500 Subject: [PATCH 06/10] Allow maximum job age to be specified when cleaning up or getting expired jobs from a registry. This ensures that all jobs cleaned from a started registry end up in the failed queue. --- rq/registry.py | 43 +++++++++++++++++++++++++++--------------- tests/test_registry.py | 17 +++++++++++++++-- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index beabbad..b4cf43f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,9 +8,6 @@ class BaseRegistry(object): """ Base implementation of job registry, implemented in Redis sorted set. Each job is stored as a key in the registry, scored by expiration time (unix timestamp). - - Jobs with scores are lower than current time is considered "expired" and - should be cleaned up. """ def __init__(self, name='default', connection=None): @@ -39,10 +36,16 @@ class BaseRegistry(object): connection = pipeline if pipeline is not None else self.connection return connection.zrem(self.key, job.id) - def get_expired_job_ids(self): - """Returns job ids whose score are less than current timestamp.""" + def get_expired_job_ids(self, timestamp=None): + """Returns job ids whose score are less than current timestamp. + + Returns ids for jobs with an expiry time earlier than timestamp, + specified as seconds since the Unix epoch. timestamp defaults to call + time if unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, current_timestamp())] + self.connection.zrangebyscore(self.key, 0, score)] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" @@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". """ def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) self.key = 'rq:wip:%s' % name - def cleanup(self): - """Remove expired jobs from registry and add them to FailedQueue.""" - job_ids = self.get_expired_job_ids() + def cleanup(self, timestamp=None): + """Remove expired jobs from registry and add them to FailedQueue. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. Removed jobs are added to the global failed job queue. + """ + score = timestamp if timestamp is not None else current_timestamp() + job_ids = self.get_expired_job_ids(score) if job_ids: failed_queue = FailedQueue(connection=self.connection) with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) - pipeline.zremrangebyscore(self.key, 0, current_timestamp()) + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() return job_ids @@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry): super(FinishedJobRegistry, self).__init__(name, connection) self.key = 'rq:finished:%s' % name - def cleanup(self): - """Remove expired jobs from registry.""" - self.connection.zremrangebyscore(self.key, 0, current_timestamp()) + def cleanup(self, timestamp=None): + """Remove expired jobs from registry. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() + self.connection.zremrangebyscore(self.key, 0, score) diff --git a/tests/test_registry.py b/tests/test_registry.py index 36b9792..26470e3 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -48,14 +48,22 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), + ['foo', 'bar']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 2, 'foo') + + self.registry.cleanup(1) + self.assertNotIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) @@ -103,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase): timestamp = current_timestamp() self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.registry.cleanup() - self.assertEqual(self.registry.get_job_ids(), ['bar']) + self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) + + self.registry.cleanup(timestamp + 20) + self.assertEqual(self.registry.get_job_ids(), ['baz']) + def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry.""" From de72f98fbe01d571ac6eea4eca8ecc2dcefbfdc0 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:17:32 -0500 Subject: [PATCH 07/10] test for workers equality --- tests/test_worker.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index c6d85ff..59f0b26 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -318,3 +318,12 @@ class TestWorker(RQTestCase): 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.description, '你好 世界!') + + def test_worker_hash_(self): + """Workers are hashed by their .name attribute""" + q = Queue('foo') + w1 = Worker([q], name="worker1") + w2 = Worker([q], name="worker2") + w3 = Worker([q], name="worker1") + worker_set = {w1, w2, w3} + self.assertEquals(len(worker_set), 2) From c301369c2eb45a0969aa32384ddc7115458f5da1 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:17:55 -0500 Subject: [PATCH 08/10] implement __eq__ and __hash__ for workers --- rq/worker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..d14bf88 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -580,6 +580,14 @@ class Worker(object): """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def __eq__(self, other): + if not isinstance(other, self.__class__): + raise TypeError('Cannot compare workers to other types (of workers)') + return self.name == other.name + + def __hash__(self): + return hash(self.name) + class SimpleWorker(Worker): def _install_signal_handlers(self, *args, **kwargs): From 0ad4cb3410483dae10381a781d14e3d485a91a08 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Mon, 8 Dec 2014 18:29:55 -0500 Subject: [PATCH 09/10] comments --- rq/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index d14bf88..a2bbb2a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -581,11 +581,13 @@ class Worker(object): return self._exc_handlers.pop() def __eq__(self, other): + """Equality does not take the database/connection into account""" if not isinstance(other, self.__class__): raise TypeError('Cannot compare workers to other types (of workers)') return self.name == other.name def __hash__(self): + """The hash does not take the database/connection into account""" return hash(self.name) From 6ef9177a7d8a34f49483c80dfe3a8097832fe006 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 11:13:56 -0500 Subject: [PATCH 10/10] 2.6 compatible set syntax --- tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 59f0b26..6fa127f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -325,5 +325,5 @@ class TestWorker(RQTestCase): w1 = Worker([q], name="worker1") w2 = Worker([q], name="worker2") w3 = Worker([q], name="worker1") - worker_set = {w1, w2, w3} + worker_set = set([w1, w2, w3]) self.assertEquals(len(worker_set), 2)