make job class dynamic for more hackability

main
Marko Mrdjenovic 11 years ago
parent 415662d42c
commit 67dd1cbf34

@ -22,6 +22,7 @@ def compact(lst):
@total_ordering @total_ordering
class Queue(object): class Queue(object):
jobcls = Job
DEFAULT_TIMEOUT = 180 # Default timeout seconds. DEFAULT_TIMEOUT = 180 # Default timeout seconds.
redis_queue_namespace_prefix = 'rq:queue:' redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues' redis_queues_keys = 'rq:queues'
@ -91,7 +92,7 @@ class Queue(object):
def fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
return Job.fetch(job_id, connection=self.connection) return self.jobcls.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
@ -127,7 +128,7 @@ class Queue(object):
def remove(self, job_or_id): def remove(self, job_or_id):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, Job) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.jobcls) else job_or_id
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 0, job_id)
def compact(self): def compact(self):
@ -141,7 +142,7 @@ class Queue(object):
job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
if job_id is None: if job_id is None:
break break
if Job.exists(job_id, self.connection): if self.jobcls.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
@ -162,7 +163,7 @@ class Queue(object):
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection, job = self.jobcls.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout) description=description, depends_on=depends_on, timeout=timeout)
@ -252,7 +253,7 @@ class Queue(object):
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = Job.fetch(job_id, connection=self.connection) dependent = self.jobcls.fetch(job_id, connection=self.connection)
self.enqueue_job(dependent) self.enqueue_job(dependent)
def pop_job_id(self): def pop_job_id(self):
@ -292,13 +293,13 @@ class Queue(object):
def dequeue(self): def dequeue(self):
"""Dequeues the front-most job from this queue. """Dequeues the front-most job from this queue.
Returns a Job instance, which can be executed or inspected. Returns a jobcls instance, which can be executed or inspected.
""" """
job_id = self.pop_job_id() job_id = self.pop_job_id()
if job_id is None: if job_id is None:
return None return None
try: try:
job = Job.fetch(job_id, connection=self.connection) job = self.jobcls.fetch(job_id, connection=self.connection)
except NoSuchJobError as e: except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively # and continue by reinvoking itself recursively
@ -313,7 +314,7 @@ class Queue(object):
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None): def dequeue_any(cls, queues, timeout, connection=None):
"""Class method returning the Job instance at the front of the given """Class method returning the jobcls instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
When all of the Queues are empty, depending on the `timeout` argument, When all of the Queues are empty, depending on the `timeout` argument,
@ -330,7 +331,7 @@ class Queue(object):
queue_key, job_id = map(as_text, result) queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection) queue = cls.from_queue_key(queue_key, connection=connection)
try: try:
job = Job.fetch(job_id, connection=connection) job = cls.jobcls.fetch(job_id, connection=connection)
except NoSuchJobError: except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively # and continue by reinvoking the same function recursively
@ -385,7 +386,7 @@ class FailedQueue(Queue):
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
try: try:
job = Job.fetch(job_id, connection=self.connection) job = self.jobcls.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing) # Silently ignore/remove this job and return (i.e. do nothing)
self.remove(job_id) self.remove(job_id)

Loading…
Cancel
Save