diff --git a/rq/job.py b/rq/job.py index 7b17492..9afe578 100644 --- a/rq/job.py +++ b/rq/job.py @@ -83,7 +83,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -122,6 +122,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.ttl = ttl job.timeout = timeout job._status = status @@ -302,6 +303,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -446,6 +448,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) + self.cleanup(self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -482,8 +485,15 @@ class Job(object): return self._result def get_ttl(self, default_ttl=None): - """Returns ttl for a job that determines how long a job and its result - will be persisted. In the future, this method will also be responsible + """Returns ttl for a job that determines how long a job will be + persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.ttl is None else self.ttl + + def get_result_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a jobs result will + be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.result_ttl is None else self.result_ttl @@ -504,14 +514,16 @@ class Job(object): def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its - result depends on the value of result_ttl: - - If result_ttl is 0, cleanup the job immediately. + result depends on the value of ttl: + - If ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist + - If ttl is negative, don't set an expiry to it (persist forever) """ if ttl == 0: self.cancel() + elif not ttl: + return elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) diff --git a/rq/queue.py b/rq/queue.py index 61f5575..d822968 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,8 +167,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None, - job_id=None): + result_ttl=None, ttl=None, description=None, + depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -190,7 +190,7 @@ class Queue(object): # modifying the dependency. In this case we simply retry if depends_on is not None: if not isinstance(depends_on, self.job_class): - depends_on = Job.fetch(id=depends_on, connection=self.connection) + depends_on = Job(id=depends_on, connection=self.connection) with self.connection.pipeline() as pipe: while True: try: @@ -229,6 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) + ttl = kwargs.pop('ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -238,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, + timeout=timeout, result_ttl=result_ttl, ttl=ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/rq/registry.py b/rq/registry.py index c59bf88..b4cf43f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,9 +8,6 @@ class BaseRegistry(object): """ Base implementation of job registry, implemented in Redis sorted set. Each job is stored as a key in the registry, scored by expiration time (unix timestamp). - - Jobs with scores are lower than current time is considered "expired" and - should be cleaned up. """ def __init__(self, name='default', connection=None): @@ -27,9 +24,9 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, timeout, pipeline=None): - """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" - score = current_timestamp() + timeout + def add(self, job, ttl, pipeline=None): + """Adds a job to a registry with expiry time of now + ttl.""" + score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: return pipeline.zadd(self.key, score, job.id) @@ -39,10 +36,16 @@ class BaseRegistry(object): connection = pipeline if pipeline is not None else self.connection return connection.zrem(self.key, job.id) - def get_expired_job_ids(self): - """Returns job ids whose score are less than current timestamp.""" + def get_expired_job_ids(self, timestamp=None): + """Returns job ids whose score are less than current timestamp. + + Returns ids for jobs with an expiry time earlier than timestamp, + specified as seconds since the Unix epoch. timestamp defaults to call + time if unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, current_timestamp())] + self.connection.zrangebyscore(self.key, 0, score)] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" @@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". """ def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) self.key = 'rq:wip:%s' % name - def cleanup(self): - """Remove expired jobs from registry and add them to FailedQueue.""" - job_ids = self.get_expired_job_ids() + def cleanup(self, timestamp=None): + """Remove expired jobs from registry and add them to FailedQueue. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. Removed jobs are added to the global failed job queue. + """ + score = timestamp if timestamp is not None else current_timestamp() + job_ids = self.get_expired_job_ids(score) if job_ids: failed_queue = FailedQueue(connection=self.connection) with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) - pipeline.zremrangebyscore(self.key, 0, current_timestamp()) + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() return job_ids @@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry): super(FinishedJobRegistry, self).__init__(name, connection) self.key = 'rq:finished:%s' % name - def cleanup(self): - """Remove expired jobs from registry.""" - self.connection.zremrangebyscore(self.key, 0, current_timestamp()) + def cleanup(self, timestamp=None): + """Remove expired jobs from registry. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() + self.connection.zremrangebyscore(self.key, 0, score) diff --git a/rq/worker.py b/rq/worker.py index 72f1aa4..f099037 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -546,7 +546,7 @@ class Worker(object): self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_ttl(self.default_result_ttl) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() job._status = JobStatus.FINISHED @@ -618,6 +618,16 @@ class Worker(object): """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def __eq__(self, other): + """Equality does not take the database/connection into account""" + if not isinstance(other, self.__class__): + raise TypeError('Cannot compare workers to other types (of workers)') + return self.name == other.name + + def __hash__(self): + """The hash does not take the database/connection into account""" + return hash(self.name) + class SimpleWorker(Worker): def _install_signal_handlers(self, *args, **kwargs): diff --git a/tests/test_job.py b/tests/test_job.py index 28fad40..34859a7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -290,17 +290,27 @@ class TestJob(RQTestCase): self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) - def test_get_ttl(self): - """Getting job TTL.""" - job_ttl = 1 + def test_get_result_ttl(self): + """Getting job result TTL.""" + job_result_ttl = 1 default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_ttl) + job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) + self.assertEqual(job.get_result_ttl(), job_result_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_result_ttl(), None) + + def test_get_job_ttl(self): + """Getting job TTL.""" + ttl = 1 + job = Job.create(func=say_hello, ttl=ttl) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) - self.assertEqual(job.get_ttl(), job_ttl) + self.assertEqual(job.get_ttl(), ttl) job = Job.create(func=say_hello) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_ttl(), None) def test_cleanup(self): diff --git a/tests/test_queue.py b/tests/test_queue.py index c905f13..347753b 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -355,9 +355,7 @@ class TestQueue(RQTestCase): def test_enqueue_job_with_dependency_by_id(self): """Enqueueing jobs should work as expected by id as well as job-objects.""" parent_job = Job.create(func=say_hello) - # We need to save the job for the ID to exist in redis - parent_job.save() - + q = Queue() q.enqueue_call(say_hello, depends_on=parent_job.id) self.assertEqual(q.job_ids, []) diff --git a/tests/test_registry.py b/tests/test_registry.py index ce4a345..26470e3 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -27,6 +27,10 @@ class TestRegistry(RQTestCase): self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1002) + # Ensure that a timeout of -1 results in a score of -1 + self.registry.add(job, -1) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), -1) + # Ensure that job is properly removed from sorted set self.registry.remove(job) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) @@ -44,14 +48,22 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), + ['foo', 'bar']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 2, 'foo') + + self.registry.cleanup(1) + self.assertNotIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) @@ -99,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase): timestamp = current_timestamp() self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.registry.cleanup() - self.assertEqual(self.registry.get_job_ids(), ['bar']) + self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) + + self.registry.cleanup(timestamp + 20) + self.assertEqual(self.registry.get_job_ids(), ['baz']) + def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index cebfdc4..fd9d0bd 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -370,3 +370,12 @@ class TestWorker(RQTestCase): # The suspension should be expired now, and a burst of work should now clear the queue w.work(burst=True) assert q.count == 0 + + def test_worker_hash_(self): + """Workers are hashed by their .name attribute""" + q = Queue('foo') + w1 = Worker([q], name="worker1") + w2 = Worker([q], name="worker2") + w3 = Worker([q], name="worker1") + worker_set = set([w1, w2, w3]) + self.assertEquals(len(worker_set), 2)