diff --git a/rq/job.py b/rq/job.py index 29562d6..474b660 100644 --- a/rq/job.py +++ b/rq/job.py @@ -447,9 +447,14 @@ class Job(object): cancellation. Technically, this call is (currently) the same as just deleting the job hash. """ + from .queue import Queue pipeline = self.connection._pipeline() self.delete(pipeline=pipeline) pipeline.delete(self.dependents_key) + + if self.origin: + queue = Queue(name=self.origin, connection=self.connection) + queue.remove(self, pipeline=pipeline) pipeline.execute() def delete(self, pipeline=None): diff --git a/rq/queue.py b/rq/queue.py index baac6ca..a5c69e0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -135,9 +135,13 @@ class Queue(object): """Returns a count of all messages in the queue.""" return self.connection.llen(self.key) - def remove(self, job_or_id): + def remove(self, job_or_id, pipeline=None): """Removes Job from queue, accepts either a Job instance or ID.""" job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id + + if pipeline is not None: + pipeline.lrem(self.key, 0, job_id) + return self.connection._lrem(self.key, 0, job_id) def compact(self): diff --git a/tests/test_job.py b/tests/test_job.py index bbe5f7a..d13ab7a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -321,9 +321,12 @@ class TestJob(RQTestCase): def test_cancel(self): """job.cancel() deletes itself & dependents mapping from Redis.""" - job = Job.create(func=say_hello) + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) job2 = Job.create(func=say_hello, depends_on=job) job2.register_dependency() job.cancel() self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.dependents_key)) + + self.assertNotIn(job.id, queue.get_job_ids()) diff --git a/tests/test_queue.py b/tests/test_queue.py index d368590..c60b510 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -94,16 +94,12 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) def test_compact(self): - """Compacting queueus.""" + """Queue.compact() removes non-existing jobs.""" q = Queue() q.enqueue(say_hello, 'Alice') - bob = q.enqueue(say_hello, 'Bob') q.enqueue(say_hello, 'Charlie') - debrah = q.enqueue(say_hello, 'Debrah') - - bob.cancel() - debrah.cancel() + self.testconn.lpush(q.key, '1', '2') self.assertEquals(q.count, 4) diff --git a/tests/test_worker.py b/tests/test_worker.py index 5609841..e02ee2a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -159,9 +159,7 @@ class TestWorker(RQTestCase): job = q.enqueue(create_file, SENTINEL_FILE) # Here, we cancel the job, so the sentinel file may not be created - assert q.count == 1 - job.cancel() - assert q.count == 1 + self.testconn.delete(job.key) w = Worker([q]) w.work(burst=True)