diff --git a/rq/job.py b/rq/job.py index c68af23..0ceccd2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -120,8 +120,8 @@ class Job(object): if pickled_data is None: raise NoSuchJobError('No such job: %s' % (key,)) + self.origin = conn.hget(key, 'origin') self.func, self.args, self.kwargs = unpickle(pickled_data) - self.created_at = times.to_universal(conn.hget(key, 'created_at')) def save(self): @@ -130,6 +130,7 @@ class Job(object): key = self.key conn.hset(key, 'data', pickled_data) + conn.hset(key, 'origin', self.origin) conn.hset(key, 'created_at', times.format(self.created_at, 'UTC')) diff --git a/rq/queue.py b/rq/queue.py index 734d4f5..80138a2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -5,6 +5,10 @@ from .job import Job from .exceptions import NoSuchJobError, UnpickleError +def compact(lst): + return [item for item in lst if item is not None] + + @total_ordering class Queue(object): redis_queue_namespace_prefix = 'rq:queue:' @@ -43,14 +47,21 @@ class Queue(object): return self.count == 0 @property - def messages(self): - """Returns a list of all messages (pickled job data) in the queue.""" + def job_ids(self): + """Returns a list of all job IDS in the queue.""" return conn.lrange(self.key, 0, -1) @property def jobs(self): - """Returns a list of all jobs in the queue.""" - return map(Job.unpickle, self.messages) + """Returns a list of all (valid) jobs in the queue.""" + def safe_fetch(job_id): + try: + job = Job.fetch(job_id) + except UnpickleError: + return None + return job + + return compact([safe_fetch(job_id) for job_id in self.job_ids]) @property def count(self): @@ -98,6 +109,7 @@ class Queue(object): """ if blocking: queue_key, job_id = conn.blpop(queue_keys) + return queue_key, job_id else: for queue_key in queue_keys: blob = conn.lpop(queue_key) @@ -124,7 +136,6 @@ class Queue(object): # reporting e.queue = self raise e - job.origin = self return job @classmethod @@ -154,8 +165,7 @@ class Queue(object): e.job_id = job_id e.queue = queue raise e - job.origin = queue - return job + return job, queue # Total ordering defition (the rest of the required Python methods are diff --git a/rq/worker.py b/rq/worker.py index bf3671f..9268205 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,8 +15,11 @@ except ImportError: from logging import Logger from .queue import Queue from .proxy import conn +from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +green = make_colorizer('darkgreen') + def iterable(x): return hasattr(x, '__iter__') @@ -239,7 +242,7 @@ class Worker(object): """ self._install_signal_handlers() - did_work = False + did_perform_work = False self.register_birth() self.state = 'starting' try: @@ -253,7 +256,11 @@ class Worker(object): self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst try: - job = Queue.dequeue_any(self.queues, wait_for_job) + result = Queue.dequeue_any(self.queues, wait_for_job) + if result is None: + break + job, queue = result + self.log.info('%s: %s' % (green(queue.name), job)) except UnpickleError as e: self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.debug('Data follows:') @@ -262,17 +269,14 @@ class Worker(object): self.failure_queue.push_job_id(e.job_id) continue - if job is None: - break self.state = 'busy' - self.fork_and_perform_job(job) - did_work = True + did_perform_work = True finally: if not self.is_horse: self.register_death() - return did_work + return did_perform_work def fork_and_perform_job(self, job): child_pid = os.fork() diff --git a/tests/test_queue.py b/tests/test_queue.py index 0fb0b6e..0cfe613 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -76,7 +76,7 @@ class TestQueue(RQTestCase): job = q.dequeue() self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, q) + self.assertEquals(job.origin, q.name) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') @@ -108,21 +108,24 @@ class TestQueue(RQTestCase): # Enqueue a single item barq.enqueue(testjob) - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) self.assertEquals(job.func, testjob) + self.assertEquals(queue, barq) # Enqueue items on both queues barq.enqueue(testjob, 'for Bar') fooq.enqueue(testjob, 'for Foo') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, fooq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, fooq) + self.assertEquals(job.origin, fooq.name) self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, barq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, barq) + self.assertEquals(job.origin, barq.name) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') def test_dequeue_any_ignores_nonexisting_jobs(self):