diff --git a/rq/queue.py b/rq/queue.py index 37a8c32..c5bed7a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,7 +22,7 @@ def compact(lst): @total_ordering class Queue(object): - jobcls = Job + job_class = Job DEFAULT_TIMEOUT = 180 # Default timeout seconds. redis_queue_namespace_prefix = 'rq:queue:' redis_queues_keys = 'rq:queues' @@ -92,7 +92,7 @@ class Queue(object): def fetch_job(self, job_id): try: - return self.jobcls.fetch(job_id, connection=self.connection) + return self.job_class.fetch(job_id, connection=self.connection) except NoSuchJobError: self.remove(job_id) @@ -128,7 +128,7 @@ class Queue(object): def remove(self, job_or_id): """Removes Job from queue, accepts either a Job instance or ID.""" - job_id = job_or_id.id if isinstance(job_or_id, self.jobcls) else job_or_id + job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id return self.connection._lrem(self.key, 0, job_id) def compact(self): @@ -142,7 +142,7 @@ class Queue(object): job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) if job_id is None: break - if self.jobcls.exists(job_id, self.connection): + if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) @@ -163,7 +163,7 @@ class Queue(object): timeout = timeout or self._default_timeout # TODO: job with dependency shouldn't have "queued" as status - job = self.jobcls.create(func, args, kwargs, connection=self.connection, + job = self.job_class.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout) @@ -253,7 +253,7 @@ class Queue(object): job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break - dependent = self.jobcls.fetch(job_id, connection=self.connection) + dependent = self.job_class.fetch(job_id, connection=self.connection) self.enqueue_job(dependent) def pop_job_id(self): @@ -293,13 +293,13 @@ class Queue(object): def dequeue(self): """Dequeues the front-most job from this queue. - Returns a jobcls instance, which can be executed or inspected. + Returns a job_class instance, which can be executed or inspected. """ job_id = self.pop_job_id() if job_id is None: return None try: - job = self.jobcls.fetch(job_id, connection=self.connection) + job = self.job_class.fetch(job_id, connection=self.connection) except NoSuchJobError as e: # Silently pass on jobs that don't exist (anymore), # and continue by reinvoking itself recursively @@ -314,7 +314,7 @@ class Queue(object): @classmethod def dequeue_any(cls, queues, timeout, connection=None): - """Class method returning the jobcls instance at the front of the given + """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. When all of the Queues are empty, depending on the `timeout` argument, @@ -331,7 +331,7 @@ class Queue(object): queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection) try: - job = cls.jobcls.fetch(job_id, connection=connection) + job = cls.job_class.fetch(job_id, connection=connection) except NoSuchJobError: # Silently pass on jobs that don't exist (anymore), # and continue by reinvoking the same function recursively @@ -386,7 +386,7 @@ class FailedQueue(Queue): def requeue(self, job_id): """Requeues the job with the given job ID.""" try: - job = self.jobcls.fetch(job_id, connection=self.connection) + job = self.job_class.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) self.remove(job_id) diff --git a/rq/worker.py b/rq/worker.py index 11df407..aae3b01 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -60,6 +60,8 @@ class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' death_penalty_class = UnixSignalDeathPenalty + queue_class = Queue + job_class = Job @classmethod @@ -95,7 +97,7 @@ class Worker(object): worker._state = connection.hget(worker.key, 'state') or '?' worker._job_id = connection.hget(worker.key, 'current_job') or None if queues: - worker.queues = [Queue(queue, connection=connection) + worker.queues = [self.queue_class(queue, connection=connection) for queue in queues.split(',')] return worker @@ -105,7 +107,7 @@ class Worker(object): if connection is None: connection = get_current_connection() self.connection = connection - if isinstance(queues, Queue): + if isinstance(queues, self.queue_class): queues = [queues] self._name = name self.queues = queues @@ -139,7 +141,7 @@ class Worker(object): if not iterable(self.queues): raise ValueError('Argument queues not iterable.') for queue in self.queues: - if not isinstance(queue, Queue): + if not isinstance(queue, self.queue_class): raise NoQueueError('Give each worker at least one Queue.') def queue_names(self): @@ -266,7 +268,7 @@ class Worker(object): if job_id is None: return None - return Job.fetch(job_id, self.connection) + return self.job_class.fetch(job_id, self.connection) @property def stopped(self): @@ -377,7 +379,7 @@ class Worker(object): self.heartbeat() try: - result = Queue.dequeue_any(self.queues, timeout, + result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection) if result is not None: job, queue = result @@ -471,7 +473,7 @@ class Worker(object): with self.connection._pipeline() as pipeline: try: - with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT): + with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() # Pickle the result in the same try-except block since we need to