diff --git a/docs/docs/job_registries.md b/docs/docs/job_registries.md index 729db10..8fd93ca 100644 --- a/docs/docs/job_registries.md +++ b/docs/docs/job_registries.md @@ -7,7 +7,7 @@ Each queue maintains a set of Job Registries: * `StartedJobRegistry` Holds currently executing jobs. Jobs are added right before they are executed and removed right after completion (success or failure). * `FinishedJobRegistry` Holds successfully completed jobs. -* `FailedJobRegistry` Holds jobs that have failed. +* `FailedJobRegistry` Holds jobs that have been executed, but didn't finish successfully. * `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that job to finish). @@ -20,15 +20,15 @@ from rq import Queue from rq.registry import StartedJobRegistry from somewhere import count_words_at_url -conn = Redis() -queue = Queue(connection=conn) +redis = Redis() +queue = Queue(connection=redis) job = queue.enqueue(count_words_at_url, 'http://nvie.com') # get StartedJobRegistry by queue registry = StartedJobRegistry(queue=queue) # or get StartedJobRegistry by queue name and connection -registry2 = StartedJobRegistry(name='my_queue', connection=conn) +registry2 = StartedJobRegistry(name='my_queue', connection=redis) # sleep for a moment while job is taken off the queue time.sleep(0.1) @@ -42,4 +42,30 @@ print('IDs in registry %s' % registry.get_job_ids()) # test if a job is in the registry using the job instance or job id print('Job in registry %s' % (job in registry)) print('Job in registry %s' % (job.id in registry)) +``` + +## Removing Jobs + +To remove a job from a job registry, use `registry.remove()`. This is useful +when you want to manually remove jobs from a registry, such as deleting failed +jobs before they expire from `FailedJobRegistry`. + +```python +from redis import Redis +from rq import Queue +from rq.registry import FailedJobRegistry + +redis = Redis() +queue = Queue(connection=redis) +registry = FailedJobRegistry(queue=queue) + +# This is how to remove a job from a registry +for job_id in registry.get_job_ids(): + registry.remove(job_id) + +# If you want to remove a job from a registry AND delete the job, +# use `delete_job=True` +for job_id in registry.get_job_ids(): + registry.remove(job_id, delete_job=True) + ``` \ No newline at end of file diff --git a/rq/registry.py b/rq/registry.py index 85fefc2..104de06 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -58,9 +58,18 @@ class BaseRegistry(object): return self.connection.zadd(self.key, {job.id: score}) - def remove(self, job, pipeline=None): + def remove(self, job, pipeline=None, delete_job=False): + """Removes job from registry and deletes it if `delete_job == True`""" connection = pipeline if pipeline is not None else self.connection - return connection.zrem(self.key, job.id) + job_id = job.id if isinstance(job, self.job_class) else job + result = connection.zrem(self.key, job_id) + if delete_job: + if isinstance(job, self.job_class): + job_instance = job + else: + job_instance = Job.fetch(job_id, connection=connection) + job_instance.delete() + return result def get_expired_job_ids(self, timestamp=None): """Returns job ids whose score are less than current timestamp. diff --git a/tests/test_registry.py b/tests/test_registry.py index eb651dc..d0d7b09 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -60,7 +60,9 @@ class TestRegistry(RQTestCase): def test_add_and_remove(self): """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() - job = Job() + + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) # Test that job is added with the right score self.registry.add(job, 1000) @@ -71,9 +73,33 @@ class TestRegistry(RQTestCase): self.registry.add(job, -1) self.assertEqual(self.testconn.zscore(self.registry.key, job.id), float('inf')) - # Ensure that job is properly removed from sorted set + # Ensure that job is removed from sorted set, but job key is not deleted self.registry.remove(job) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) + self.assertTrue(self.testconn.exists(job.key)) + + self.registry.add(job, -1) + + # registry.remove() also accepts job.id + self.registry.remove(job.id) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) + + self.registry.add(job, -1) + + # delete_job = True deletes job key + self.registry.remove(job, delete_job=True) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) + self.assertFalse(self.testconn.exists(job.key)) + + job = queue.enqueue(say_hello) + + self.registry.add(job, -1) + + # delete_job = True also works with job.id + self.registry.remove(job.id, delete_job=True) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) + self.assertFalse(self.testconn.exists(job.key)) + def test_get_job_ids(self): """Getting job ids from StartedJobRegistry."""