Merge pull request #342 from friedcell/master

make job class dynamic
main
Vincent Driessen 11 years ago
commit b048e23875

@ -26,6 +26,7 @@ def compact(lst):
@total_ordering @total_ordering
class Queue(object): class Queue(object):
job_class = Job
DEFAULT_TIMEOUT = 180 # Default timeout seconds. DEFAULT_TIMEOUT = 180 # Default timeout seconds.
redis_queue_namespace_prefix = 'rq:queue:' redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues' redis_queues_keys = 'rq:queues'
@ -95,7 +96,7 @@ class Queue(object):
def fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
return Job.fetch(job_id, connection=self.connection) return self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
@ -131,7 +132,7 @@ class Queue(object):
def remove(self, job_or_id): def remove(self, job_or_id):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, Job) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 0, job_id)
def compact(self): def compact(self):
@ -145,7 +146,7 @@ class Queue(object):
job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
if job_id is None: if job_id is None:
break break
if Job.exists(job_id, self.connection): if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id): def push_job_id(self, job_id):
@ -164,7 +165,7 @@ class Queue(object):
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout) description=description, depends_on=depends_on, timeout=timeout)
@ -254,7 +255,7 @@ class Queue(object):
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = Job.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
self.enqueue_job(dependent) self.enqueue_job(dependent)
def pop_job_id(self): def pop_job_id(self):
@ -294,13 +295,13 @@ class Queue(object):
def dequeue(self): def dequeue(self):
"""Dequeues the front-most job from this queue. """Dequeues the front-most job from this queue.
Returns a Job instance, which can be executed or inspected. Returns a job_class instance, which can be executed or inspected.
""" """
job_id = self.pop_job_id() job_id = self.pop_job_id()
if job_id is None: if job_id is None:
return None return None
try: try:
job = Job.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError as e: except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively # and continue by reinvoking itself recursively
@ -315,7 +316,7 @@ class Queue(object):
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None): def dequeue_any(cls, queues, timeout, connection=None):
"""Class method returning the Job instance at the front of the given """Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
When all of the Queues are empty, depending on the `timeout` argument, When all of the Queues are empty, depending on the `timeout` argument,
@ -332,7 +333,7 @@ class Queue(object):
queue_key, job_id = map(as_text, result) queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection) queue = cls.from_queue_key(queue_key, connection=connection)
try: try:
job = Job.fetch(job_id, connection=connection) job = cls.job_class.fetch(job_id, connection=connection)
except NoSuchJobError: except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively # and continue by reinvoking the same function recursively
@ -386,7 +387,7 @@ class FailedQueue(Queue):
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
try: try:
job = Job.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing) # Silently ignore/remove this job and return (i.e. do nothing)
self.remove(job_id) self.remove(job_id)

@ -67,6 +67,8 @@ class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:' redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers' redis_workers_keys = 'rq:workers'
death_penalty_class = UnixSignalDeathPenalty death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue
job_class = Job
@classmethod @classmethod
def all(cls, connection=None): def all(cls, connection=None):
@ -101,7 +103,7 @@ class Worker(object):
worker._state = connection.hget(worker.key, 'state') or '?' worker._state = connection.hget(worker.key, 'state') or '?'
worker._job_id = connection.hget(worker.key, 'current_job') or None worker._job_id = connection.hget(worker.key, 'current_job') or None
if queues: if queues:
worker.queues = [Queue(queue, connection=connection) worker.queues = [self.queue_class(queue, connection=connection)
for queue in queues.split(',')] for queue in queues.split(',')]
return worker return worker
@ -111,7 +113,7 @@ class Worker(object):
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
if isinstance(queues, Queue): if isinstance(queues, self.queue_class):
queues = [queues] queues = [queues]
self._name = name self._name = name
self.queues = queues self.queues = queues
@ -144,7 +146,7 @@ class Worker(object):
if not iterable(self.queues): if not iterable(self.queues):
raise ValueError('Argument queues not iterable.') raise ValueError('Argument queues not iterable.')
for queue in self.queues: for queue in self.queues:
if not isinstance(queue, Queue): if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.') raise NoQueueError('Give each worker at least one Queue.')
def queue_names(self): def queue_names(self):
@ -269,7 +271,7 @@ class Worker(object):
if job_id is None: if job_id is None:
return None return None
return Job.fetch(job_id, self.connection) return self.job_class.fetch(job_id, self.connection)
@property @property
def stopped(self): def stopped(self):
@ -379,7 +381,7 @@ class Worker(object):
self.heartbeat() self.heartbeat()
try: try:
result = Queue.dequeue_any(self.queues, timeout, result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection) connection=self.connection)
if result is not None: if result is not None:
job, queue = result job, queue = result
@ -473,7 +475,7 @@ class Worker(object):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
try: try:
with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to

Loading…
Cancel
Save