Add get_job_position and get_position feature (#1271)

Fix #1197

Signed-off-by: Paul Spooren <mail@aparcar.org>
main
Paul Spooren 5 years ago committed by GitHub
parent e8795f941a
commit 73506b26fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -191,6 +191,32 @@ job = q.enqueue(count_words_at_url,
) )
``` ```
## Job position in queue
For user feedback or debuging it is possible to get the position of a job
within the work queue. This allows to track the job processing through the
queue.
This function iterates over all jobs within the queue and therefore does
perform poorly on very large job queues.
```python
from rq import Queue
from redis import Redis
from hello import say_hello
redis_conn = Redis()
q = Queue(connection=redis_conn)
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
job2.get_position()
# returns 1
q.get_job_position(job)
# return 0
```
## Failed Jobs ## Failed Jobs

@ -128,6 +128,13 @@ class Job(object):
job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
return job return job
def get_position(self):
from .queue import Queue
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
return q.get_job_position(self._id)
return None
def get_status(self, refresh=True): def get_status(self, refresh=True):
if refresh: if refresh:
self._status = as_text(self.connection.hget(self.key, 'status')) self._status = as_text(self.connection.hget(self.key, 'status'))

@ -153,6 +153,19 @@ class Queue(object):
if job.origin == self.name: if job.origin == self.name:
return job return job
def get_job_position(self, job_or_id):
"""Returns the position of a job within the queue
WARNING: The current implementation has a complexity of worse than O(N)
and should not be used for very long job queues. Future implementation
may use Redis LPOS command to improve the complexity to O(N) and
running natively in Redis C implementation.
"""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if job_id in self.job_ids:
return self.job_ids.index(job_id)
return None
def get_job_ids(self, offset=0, length=-1): def get_job_ids(self, offset=0, length=-1):
"""Returns a slice of job IDs in the queue.""" """Returns a slice of job IDs in the queue."""
start = offset start = offset

@ -528,6 +528,16 @@ class TestJob(RQTestCase):
self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100) self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100)
self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100) self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100)
def test_job_get_position(self):
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
job2 = queue.enqueue(fixtures.say_hello)
job3 = Job(fixtures.say_hello)
self.assertEqual(0, job.get_position())
self.assertEqual(1, job2.get_position())
self.assertEqual(None, job3.get_position())
def test_job_with_dependents_delete_parent(self): def test_job_with_dependents_delete_parent(self):
"""job.delete() deletes itself from Redis but not dependents. """job.delete() deletes itself from Redis but not dependents.
Wthout a save, the dependent job is never saved into redis. The delete Wthout a save, the dependent job is never saved into redis. The delete

@ -133,6 +133,18 @@ class TestQueue(RQTestCase):
self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys))) self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys)))
self.assertEqual(False, self.testconn.exists(q.key)) self.assertEqual(False, self.testconn.exists(q.key))
def test_position(self):
"""Test queue.delete properly removes queue but keeps the job keys in the redis store"""
q = Queue('example')
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
job3 = q.enqueue(say_hello)
self.assertEqual(0, q.get_job_position(job.id))
self.assertEqual(1, q.get_job_position(job2.id))
self.assertEqual(2, q.get_job_position(job3))
self.assertEqual(None, q.get_job_position("no_real_job"))
def test_remove(self): def test_remove(self):
"""Ensure queue.remove properly removes Job from queue.""" """Ensure queue.remove properly removes Job from queue."""
q = Queue('example') q = Queue('example')

Loading…
Cancel
Save