diff --git a/rq/dummy.py b/rq/dummy.py index b2ad055..93c2a61 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -3,12 +3,15 @@ Some dummy tasks that are well-suited for generating load for testing purposes. """ import time + def do_nothing(): pass + def sleep(secs): time.sleep(secs) + def endless_loop(): x = 7 while True: @@ -18,14 +21,17 @@ def endless_loop(): if x == 0: x = 82 + def div_by_zero(): - 1/0 + 1 / 0 + def fib(n): if n <= 1: return 1 else: - return fib(n-2) + fib(n-1) + return fib(n - 2) + fib(n - 1) + def yield_stuff(): yield 7 diff --git a/rq/exceptions.py b/rq/exceptions.py index 813bcde..793bcc5 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,11 +1,12 @@ class NoSuchJobError(Exception): pass + class NoQueueError(Exception): pass + class UnpickleError(Exception): def __init__(self, message, raw_data): super(UnpickleError, self).__init__(message) self.raw_data = raw_data - diff --git a/rq/proxy.py b/rq/proxy.py index 6e6bdfc..f075767 100644 --- a/rq/proxy.py +++ b/rq/proxy.py @@ -1,5 +1,3 @@ -import redis - class NoRedisConnectionException(Exception): pass diff --git a/rq/queue.py b/rq/queue.py index 3ab16d4..359aa1a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -69,18 +69,21 @@ class Queue(object): return conn.llen(self.key) - def push_job_id(self, job_id): + def push_job_id(self, job_id): # noqa """Pushes a job ID on the corresponding Redis queue.""" conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): - """Creates a job to represent the delayed function call and enqueues it. + """Creates a job to represent the delayed function call and enqueues + it. Expects the function to call, along with the arguments and keyword arguments. """ if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') + raise ValueError( + 'Functions from the __main__ module cannot be processed ' + 'by workers.') job = Job.for_call(f, *args, **kwargs) return self.enqueue_job(job) @@ -108,10 +111,10 @@ class Queue(object): @classmethod def lpop(cls, queue_keys, blocking): - """Helper method. Intermediate method to abstract away from some Redis - API details, where LPOP accepts only a single key, whereas BLPOP accepts - multiple. So if we want the non-blocking LPOP, we need to iterate over - all queues, do individual LPOPs, and return the result. + """Helper method. Intermediate method to abstract away from some + Redis API details, where LPOP accepts only a single key, whereas BLPOP + accepts multiple. So if we want the non-blocking LPOP, we need to + iterate over all queues, do individual LPOPs, and return the result. Until Redis receives a specific method for this, we'll have to wrap it this way. @@ -149,8 +152,8 @@ class Queue(object): @classmethod def dequeue_any(cls, queues, blocking): - """Class method returning the Job instance at the front of the given set - of Queues, where the order of the queues is important. + """Class method returning the Job instance at the front of the given + set of Queues, where the order of the queues is important. When all of the Queues are empty, depending on the `blocking` argument, either blocks execution of this function until new messages arrive on @@ -179,7 +182,7 @@ class Queue(object): # Total ordering defition (the rest of the required Python methods are # auto-generated by the @total_ordering decorator) - def __eq__(self, other): + def __eq__(self, other): # noqa if not isinstance(other, Queue): raise TypeError('Cannot compare queues to other objects.') return self.name == other.name @@ -193,7 +196,7 @@ class Queue(object): return hash(self.name) - def __repr__(self): + def __repr__(self): # noqa return 'Queue(%r)' % (self.name,) def __str__(self): diff --git a/rq/utils.py b/rq/utils.py index 054bb5c..2eef8d2 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -2,15 +2,16 @@ """ Miscellaneous helper functions. -The formatter for ANSI colored console output is heavily based on Pygments terminal -colorizing code, originally by Georg Brandl. +The formatter for ANSI colored console output is heavily based on Pygments +terminal colorizing code, originally by Georg Brandl. """ import os + def gettermsize(): def ioctl_GWINSZ(fd): try: - import fcntl, termios, struct + import fcntl, termios, struct # noqa cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: @@ -37,17 +38,17 @@ class _Colorizer(object): esc = "\x1b[" self.codes = {} - self.codes[""] = "" - self.codes["reset"] = esc + "39;49;00m" + self.codes[""] = "" + self.codes["reset"] = esc + "39;49;00m" - self.codes["bold"] = esc + "01m" - self.codes["faint"] = esc + "02m" - self.codes["standout"] = esc + "03m" + self.codes["bold"] = esc + "01m" + self.codes["faint"] = esc + "02m" + self.codes["standout"] = esc + "03m" self.codes["underline"] = esc + "04m" - self.codes["blink"] = esc + "05m" - self.codes["overline"] = esc + "06m" + self.codes["blink"] = esc + "05m" + self.codes["overline"] = esc + "06m" - dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", + dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] @@ -60,10 +61,10 @@ class _Colorizer(object): del d, l, x - self.codes["darkteal"] = self.codes["turquoise"] + self.codes["darkteal"] = self.codes["turquoise"] self.codes["darkyellow"] = self.codes["brown"] - self.codes["fuscia"] = self.codes["fuchsia"] - self.codes["white"] = self.codes["bold"] + self.codes["fuscia"] = self.codes["fuchsia"] + self.codes["white"] = self.codes["bold"] def reset_color(self): return self.codes["reset"] @@ -98,6 +99,7 @@ class _Colorizer(object): colorizer = _Colorizer() + def make_colorizer(color): """Creates a function that colorizes text with the given color. diff --git a/rq/worker.py b/rq/worker.py index e3863ed..94420fe 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -11,7 +11,7 @@ import traceback from cPickle import dumps try: from logbook import Logger - Logger = Logger # Does nothing except it shuts up pyflakes annoying error + Logger = Logger # Does nothing except it shuts up pyflakes annoying error except ImportError: from logging import Logger from .queue import Queue @@ -23,9 +23,11 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') + def iterable(x): return hasattr(x, '__iter__') + def compact(l): return [x for x in l if x is not None] @@ -33,6 +35,7 @@ _signames = dict((getattr(signal, signame), signame) \ for signame in dir(signal) \ if signame.startswith('SIG') and '_' not in signame) + def signal_name(signum): # Hackety-hack-hack: is there really no better way to reverse lookup the # signal name? If you read this and know a way: please provide a patch :) @@ -55,9 +58,9 @@ class Worker(object): @classmethod def find_by_key(cls, worker_key): - """Returns a Worker instance, based on the naming conventions for naming - the internal Redis keys. Can be used to reverse-lookup Workers by their - Redis keys. + """Returns a Worker instance, based on the naming conventions for + naming the internal Redis keys. Can be used to reverse-lookup Workers + by their Redis keys. """ prefix = cls.redis_worker_namespace_prefix name = worker_key[len(prefix):] @@ -76,7 +79,7 @@ class Worker(object): return worker - def __init__(self, queues, name=None, rv_ttl=500): + def __init__(self, queues, name=None, rv_ttl=500): # noqa if isinstance(queues, Queue): queues = [queues] self._name = name @@ -91,7 +94,7 @@ class Worker(object): self.failed_queue = Queue('failed') - def validate_queues(self): + def validate_queues(self): # noqa """Sanity check for the given queues.""" if not iterable(self.queues): raise ValueError('Argument queues not iterable.') @@ -108,7 +111,7 @@ class Worker(object): return map(lambda q: q.key, self.queues) - @property + @property # noqa def name(self): """Returns the name of the worker, under which it is registered to the monitoring system. @@ -152,11 +155,13 @@ class Worker(object): procname.setprocname('rq: %s' % (message,)) - def register_birth(self): + def register_birth(self): # noqa """Registers its own birth.""" self.log.debug('Registering birth of worker %s' % (self.name,)) if conn.exists(self.key) and not conn.hexists(self.key, 'death'): - raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,)) + raise ValueError( + 'There exists an active worker named \'%s\' ' + 'already.' % (self.name,)) key = self.key now = time.time() queues = ','.join(self.queue_names()) @@ -203,7 +208,8 @@ class Worker(object): # Take down the horse with the worker if self.horse_pid: - self.log.debug('Taking down horse %d with me.' % self.horse_pid) + msg = 'Taking down horse %d with me.' % self.horse_pid + self.log.debug(msg) try: os.kill(self.horse_pid, signal.SIGKILL) except OSError as e: @@ -226,7 +232,8 @@ class Worker(object): self.log.debug('Ignoring signal %s.' % signal_name(signum)) return - self.log.warning('Warm shut down. Press Ctrl+C again for a cold shutdown.') + msg = 'Warm shut down. Press Ctrl+C again for a cold shutdown.' + self.log.warning(msg) self._stopped = True self.log.debug('Stopping after current horse is finished.') @@ -234,7 +241,7 @@ class Worker(object): signal.signal(signal.SIGTERM, request_stop) - def work(self, burst=False): + def work(self, burst=False): # noqa """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -257,14 +264,17 @@ class Worker(object): qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') - self.log.info('*** Listening on %s...' % (green(', '.join(qnames)))) + self.log.info('*** Listening on %s...' % \ + green(', '.join(qnames))) wait_for_job = not burst try: result = Queue.dequeue_any(self.queues, wait_for_job) if result is None: break except UnpickleError as e: - self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) + msg = '*** Ignoring unpickleable data on %s.' % \ + green(e.queue.name) + self.log.warning(msg) self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') @@ -272,7 +282,8 @@ class Worker(object): continue job, queue = result - self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) + self.log.info('%s: %s (%s)' % (green(queue.name), + blue(job.description), job.id)) self.state = 'busy' self.fork_and_perform_job(job) @@ -301,11 +312,11 @@ class Worker(object): break except OSError as e: # In case we encountered an OSError due to EINTR (which is - # caused by a SIGINT or SIGTERM signal during os.waitpid()), - # we simply ignore it and enter the next iteration of the - # loop, waiting for the child to end. In any other case, - # this is some other unexpected OS error, which we don't - # want to catch, so we re-raise those ones. + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. if e.errno != errno.EINTR: raise