Added `delete_job` argument to registry.remove()` (#1161)

main
Selwin Ong 5 years ago committed by GitHub
parent 5bb03b9c2c
commit af678243e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,7 +7,7 @@ Each queue maintains a set of Job Registries:
* `StartedJobRegistry` Holds currently executing jobs. Jobs are added right before they are * `StartedJobRegistry` Holds currently executing jobs. Jobs are added right before they are
executed and removed right after completion (success or failure). executed and removed right after completion (success or failure).
* `FinishedJobRegistry` Holds successfully completed jobs. * `FinishedJobRegistry` Holds successfully completed jobs.
* `FailedJobRegistry` Holds jobs that have failed. * `FailedJobRegistry` Holds jobs that have been executed, but didn't finish successfully.
* `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that * `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that
job to finish). job to finish).
@ -20,15 +20,15 @@ from rq import Queue
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
from somewhere import count_words_at_url from somewhere import count_words_at_url
conn = Redis() redis = Redis()
queue = Queue(connection=conn) queue = Queue(connection=redis)
job = queue.enqueue(count_words_at_url, 'http://nvie.com') job = queue.enqueue(count_words_at_url, 'http://nvie.com')
# get StartedJobRegistry by queue # get StartedJobRegistry by queue
registry = StartedJobRegistry(queue=queue) registry = StartedJobRegistry(queue=queue)
# or get StartedJobRegistry by queue name and connection # or get StartedJobRegistry by queue name and connection
registry2 = StartedJobRegistry(name='my_queue', connection=conn) registry2 = StartedJobRegistry(name='my_queue', connection=redis)
# sleep for a moment while job is taken off the queue # sleep for a moment while job is taken off the queue
time.sleep(0.1) time.sleep(0.1)
@ -42,4 +42,30 @@ print('IDs in registry %s' % registry.get_job_ids())
# test if a job is in the registry using the job instance or job id # test if a job is in the registry using the job instance or job id
print('Job in registry %s' % (job in registry)) print('Job in registry %s' % (job in registry))
print('Job in registry %s' % (job.id in registry)) print('Job in registry %s' % (job.id in registry))
```
## Removing Jobs
To remove a job from a job registry, use `registry.remove()`. This is useful
when you want to manually remove jobs from a registry, such as deleting failed
jobs before they expire from `FailedJobRegistry`.
```python
from redis import Redis
from rq import Queue
from rq.registry import FailedJobRegistry
redis = Redis()
queue = Queue(connection=redis)
registry = FailedJobRegistry(queue=queue)
# This is how to remove a job from a registry
for job_id in registry.get_job_ids():
registry.remove(job_id)
# If you want to remove a job from a registry AND delete the job,
# use `delete_job=True`
for job_id in registry.get_job_ids():
registry.remove(job_id, delete_job=True)
``` ```

@ -58,9 +58,18 @@ class BaseRegistry(object):
return self.connection.zadd(self.key, {job.id: score}) return self.connection.zadd(self.key, {job.id: score})
def remove(self, job, pipeline=None): def remove(self, job, pipeline=None, delete_job=False):
"""Removes job from registry and deletes it if `delete_job == True`"""
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id) job_id = job.id if isinstance(job, self.job_class) else job
result = connection.zrem(self.key, job_id)
if delete_job:
if isinstance(job, self.job_class):
job_instance = job
else:
job_instance = Job.fetch(job_id, connection=connection)
job_instance.delete()
return result
def get_expired_job_ids(self, timestamp=None): def get_expired_job_ids(self, timestamp=None):
"""Returns job ids whose score are less than current timestamp. """Returns job ids whose score are less than current timestamp.

@ -60,7 +60,9 @@ class TestRegistry(RQTestCase):
def test_add_and_remove(self): def test_add_and_remove(self):
"""Adding and removing job to StartedJobRegistry.""" """Adding and removing job to StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()
job = Job()
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
# Test that job is added with the right score # Test that job is added with the right score
self.registry.add(job, 1000) self.registry.add(job, 1000)
@ -71,9 +73,33 @@ class TestRegistry(RQTestCase):
self.registry.add(job, -1) self.registry.add(job, -1)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), float('inf')) self.assertEqual(self.testconn.zscore(self.registry.key, job.id), float('inf'))
# Ensure that job is properly removed from sorted set # Ensure that job is removed from sorted set, but job key is not deleted
self.registry.remove(job) self.registry.remove(job)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.assertTrue(self.testconn.exists(job.key))
self.registry.add(job, -1)
# registry.remove() also accepts job.id
self.registry.remove(job.id)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.registry.add(job, -1)
# delete_job = True deletes job key
self.registry.remove(job, delete_job=True)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.assertFalse(self.testconn.exists(job.key))
job = queue.enqueue(say_hello)
self.registry.add(job, -1)
# delete_job = True also works with job.id
self.registry.remove(job.id, delete_job=True)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.assertFalse(self.testconn.exists(job.key))
def test_get_job_ids(self): def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry.""" """Getting job ids from StartedJobRegistry."""

Loading…
Cancel
Save