Job and Queue classes as variables (compatible with existing naming)

main
Marko Mrdjenovic 11 years ago
parent 67dd1cbf34
commit 5cba260cca

@ -22,7 +22,7 @@ def compact(lst):
@total_ordering @total_ordering
class Queue(object): class Queue(object):
jobcls = Job job_class = Job
DEFAULT_TIMEOUT = 180 # Default timeout seconds. DEFAULT_TIMEOUT = 180 # Default timeout seconds.
redis_queue_namespace_prefix = 'rq:queue:' redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues' redis_queues_keys = 'rq:queues'
@ -92,7 +92,7 @@ class Queue(object):
def fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
return self.jobcls.fetch(job_id, connection=self.connection) return self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
@ -128,7 +128,7 @@ class Queue(object):
def remove(self, job_or_id): def remove(self, job_or_id):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.jobcls) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 0, job_id)
def compact(self): def compact(self):
@ -142,7 +142,7 @@ class Queue(object):
job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
if job_id is None: if job_id is None:
break break
if self.jobcls.exists(job_id, self.connection): if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
@ -163,7 +163,7 @@ class Queue(object):
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = self.jobcls.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout) description=description, depends_on=depends_on, timeout=timeout)
@ -253,7 +253,7 @@ class Queue(object):
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = self.jobcls.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
self.enqueue_job(dependent) self.enqueue_job(dependent)
def pop_job_id(self): def pop_job_id(self):
@ -293,13 +293,13 @@ class Queue(object):
def dequeue(self): def dequeue(self):
"""Dequeues the front-most job from this queue. """Dequeues the front-most job from this queue.
Returns a jobcls instance, which can be executed or inspected. Returns a job_class instance, which can be executed or inspected.
""" """
job_id = self.pop_job_id() job_id = self.pop_job_id()
if job_id is None: if job_id is None:
return None return None
try: try:
job = self.jobcls.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError as e: except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively # and continue by reinvoking itself recursively
@ -314,7 +314,7 @@ class Queue(object):
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None): def dequeue_any(cls, queues, timeout, connection=None):
"""Class method returning the jobcls instance at the front of the given """Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
When all of the Queues are empty, depending on the `timeout` argument, When all of the Queues are empty, depending on the `timeout` argument,
@ -331,7 +331,7 @@ class Queue(object):
queue_key, job_id = map(as_text, result) queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection) queue = cls.from_queue_key(queue_key, connection=connection)
try: try:
job = cls.jobcls.fetch(job_id, connection=connection) job = cls.job_class.fetch(job_id, connection=connection)
except NoSuchJobError: except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively # and continue by reinvoking the same function recursively
@ -386,7 +386,7 @@ class FailedQueue(Queue):
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
try: try:
job = self.jobcls.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing) # Silently ignore/remove this job and return (i.e. do nothing)
self.remove(job_id) self.remove(job_id)

@ -60,6 +60,8 @@ class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:' redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers' redis_workers_keys = 'rq:workers'
death_penalty_class = UnixSignalDeathPenalty death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue
job_class = Job
@classmethod @classmethod
@ -95,7 +97,7 @@ class Worker(object):
worker._state = connection.hget(worker.key, 'state') or '?' worker._state = connection.hget(worker.key, 'state') or '?'
worker._job_id = connection.hget(worker.key, 'current_job') or None worker._job_id = connection.hget(worker.key, 'current_job') or None
if queues: if queues:
worker.queues = [Queue(queue, connection=connection) worker.queues = [self.queue_class(queue, connection=connection)
for queue in queues.split(',')] for queue in queues.split(',')]
return worker return worker
@ -105,7 +107,7 @@ class Worker(object):
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
if isinstance(queues, Queue): if isinstance(queues, self.queue_class):
queues = [queues] queues = [queues]
self._name = name self._name = name
self.queues = queues self.queues = queues
@ -139,7 +141,7 @@ class Worker(object):
if not iterable(self.queues): if not iterable(self.queues):
raise ValueError('Argument queues not iterable.') raise ValueError('Argument queues not iterable.')
for queue in self.queues: for queue in self.queues:
if not isinstance(queue, Queue): if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.') raise NoQueueError('Give each worker at least one Queue.')
def queue_names(self): def queue_names(self):
@ -266,7 +268,7 @@ class Worker(object):
if job_id is None: if job_id is None:
return None return None
return Job.fetch(job_id, self.connection) return self.job_class.fetch(job_id, self.connection)
@property @property
def stopped(self): def stopped(self):
@ -377,7 +379,7 @@ class Worker(object):
self.heartbeat() self.heartbeat()
try: try:
result = Queue.dequeue_any(self.queues, timeout, result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection) connection=self.connection)
if result is not None: if result is not None:
job, queue = result job, queue = result
@ -471,7 +473,7 @@ class Worker(object):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
try: try:
with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to

Loading…
Cancel
Save