diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index b91ab21..1d39e9e 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -191,6 +191,32 @@ job = q.enqueue(count_words_at_url, ) ``` +## Job position in queue + +For user feedback or debuging it is possible to get the position of a job +within the work queue. This allows to track the job processing through the +queue. + +This function iterates over all jobs within the queue and therefore does +perform poorly on very large job queues. + +```python +from rq import Queue +from redis import Redis +from hello import say_hello + +redis_conn = Redis() +q = Queue(connection=redis_conn) + +job = q.enqueue(say_hello) +job2 = q.enqueue(say_hello) + +job2.get_position() +# returns 1 + +q.get_job_position(job) +# return 0 +``` ## Failed Jobs diff --git a/rq/job.py b/rq/job.py index 3d129ef..ce82784 100644 --- a/rq/job.py +++ b/rq/job.py @@ -128,6 +128,13 @@ class Job(object): job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] return job + def get_position(self): + from .queue import Queue + if self.origin: + q = Queue(name=self.origin, connection=self.connection) + return q.get_job_position(self._id) + return None + def get_status(self, refresh=True): if refresh: self._status = as_text(self.connection.hget(self.key, 'status')) diff --git a/rq/queue.py b/rq/queue.py index 0b7afd7..5f0b38e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -153,6 +153,19 @@ class Queue(object): if job.origin == self.name: return job + def get_job_position(self, job_or_id): + """Returns the position of a job within the queue + + WARNING: The current implementation has a complexity of worse than O(N) + and should not be used for very long job queues. Future implementation + may use Redis LPOS command to improve the complexity to O(N) and + running natively in Redis C implementation. + """ + job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id + if job_id in self.job_ids: + return self.job_ids.index(job_id) + return None + def get_job_ids(self, offset=0, length=-1): """Returns a slice of job IDs in the queue.""" start = offset diff --git a/tests/test_job.py b/tests/test_job.py index 670a6a4..a7cb7ce 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -528,6 +528,16 @@ class TestJob(RQTestCase): self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100) self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100) + def test_job_get_position(self): + queue = Queue(connection=self.testconn) + job = queue.enqueue(fixtures.say_hello) + job2 = queue.enqueue(fixtures.say_hello) + job3 = Job(fixtures.say_hello) + + self.assertEqual(0, job.get_position()) + self.assertEqual(1, job2.get_position()) + self.assertEqual(None, job3.get_position()) + def test_job_with_dependents_delete_parent(self): """job.delete() deletes itself from Redis but not dependents. Wthout a save, the dependent job is never saved into redis. The delete diff --git a/tests/test_queue.py b/tests/test_queue.py index 79dd646..f438854 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -133,6 +133,18 @@ class TestQueue(RQTestCase): self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys))) self.assertEqual(False, self.testconn.exists(q.key)) + def test_position(self): + """Test queue.delete properly removes queue but keeps the job keys in the redis store""" + q = Queue('example') + job = q.enqueue(say_hello) + job2 = q.enqueue(say_hello) + job3 = q.enqueue(say_hello) + + self.assertEqual(0, q.get_job_position(job.id)) + self.assertEqual(1, q.get_job_position(job2.id)) + self.assertEqual(2, q.get_job_position(job3)) + self.assertEqual(None, q.get_job_position("no_real_job")) + def test_remove(self): """Ensure queue.remove properly removes Job from queue.""" q = Queue('example')