|
|
|
@ -2,7 +2,6 @@
|
|
|
|
|
from __future__ import (absolute_import, division, print_function,
|
|
|
|
|
unicode_literals)
|
|
|
|
|
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
import errno
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
@ -13,6 +12,7 @@ import sys
|
|
|
|
|
import time
|
|
|
|
|
import traceback
|
|
|
|
|
import warnings
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
|
|
from rq.compat import as_text, string_types, text_type
|
|
|
|
|
|
|
|
|
@ -20,11 +20,12 @@ from .connections import get_current_connection
|
|
|
|
|
from .exceptions import DequeueTimeout
|
|
|
|
|
from .job import Job, JobStatus
|
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
|
from .queue import get_failed_queue, Queue
|
|
|
|
|
from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry
|
|
|
|
|
from .queue import Queue, get_failed_queue
|
|
|
|
|
from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
|
|
|
|
|
from .suspension import is_suspended
|
|
|
|
|
from .timeouts import UnixSignalDeathPenalty
|
|
|
|
|
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse
|
|
|
|
|
from .utils import (ensure_list, enum, import_attribute, make_colorizer,
|
|
|
|
|
utcformat, utcnow, utcparse)
|
|
|
|
|
from .version import VERSION
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
@ -126,7 +127,9 @@ class Worker(object):
|
|
|
|
|
if connection is None:
|
|
|
|
|
connection = get_current_connection()
|
|
|
|
|
self.connection = connection
|
|
|
|
|
queues = self.process_queue_args(queues)
|
|
|
|
|
|
|
|
|
|
queues = [self.queue_class(name=q) if isinstance(q, text_type) else q
|
|
|
|
|
for q in ensure_list(queues)]
|
|
|
|
|
self._name = name
|
|
|
|
|
self.queues = queues
|
|
|
|
|
self.validate_queues()
|
|
|
|
@ -163,15 +166,7 @@ class Worker(object):
|
|
|
|
|
"""Sanity check for the given queues."""
|
|
|
|
|
for queue in self.queues:
|
|
|
|
|
if not isinstance(queue, self.queue_class):
|
|
|
|
|
raise TypeError('{0} is not a Queue or a string'.format(queue))
|
|
|
|
|
|
|
|
|
|
def process_queue_args(self, queue_args):
|
|
|
|
|
"""Allow for a string, a queue an iterable of strings or an iterable of queues"""
|
|
|
|
|
if isinstance(queue_args, text_type):
|
|
|
|
|
return self.queue_class(name=queue_args)
|
|
|
|
|
else:
|
|
|
|
|
return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) else queue_arg
|
|
|
|
|
for queue_arg in queue_args]
|
|
|
|
|
raise TypeError('{0} is not of type {1} or text type'.format(queue, self.queue_class))
|
|
|
|
|
|
|
|
|
|
def queue_names(self):
|
|
|
|
|
"""Returns the queue names of this worker's queues."""
|
|
|
|
|