diff --git a/rq/connections.py b/rq/connections.py index f4a72e4..ee07070 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -18,8 +18,8 @@ def Connection(connection=None): finally: popped = pop_connection() assert popped == connection, \ - 'Unexpected Redis connection was popped off the stack. ' \ - 'Check your Redis connection setup.' + 'Unexpected Redis connection was popped off the stack. ' \ + 'Check your Redis connection setup.' def push_connection(redis): @@ -37,7 +37,7 @@ def use_connection(redis=None): use of use_connection() and stacked connection contexts. """ assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection().' + 'You should not mix Connection contexts with use_connection().' release_local(_connection_stack) if redis is None: @@ -61,13 +61,11 @@ def resolve_connection(connection=None): connection = get_current_connection() if connection is None: - raise NoRedisConnectionException( - 'Could not resolve a Redis connection.') + raise NoRedisConnectionException('Could not resolve a Redis connection.') return connection _connection_stack = LocalStack() -__all__ = ['Connection', - 'get_current_connection', 'push_connection', 'pop_connection', - 'use_connection'] +__all__ = ['Connection', 'get_current_connection', 'push_connection', + 'pop_connection', 'use_connection'] diff --git a/rq/decorators.py b/rq/decorators.py index d57b6cc..b433904 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -4,10 +4,10 @@ from .connections import resolve_connection from .worker import DEFAULT_RESULT_TTL from rq.compat import string_types -class job(object): +class job(object): def __init__(self, queue, connection=None, timeout=None, - result_ttl=DEFAULT_RESULT_TTL): + result_ttl=DEFAULT_RESULT_TTL): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string @@ -32,6 +32,6 @@ class job(object): else: queue = self.queue return queue.enqueue_call(f, args=args, kwargs=kwargs, - timeout=self.timeout, result_ttl=self.result_ttl) + timeout=self.timeout, result_ttl=self.result_ttl) f.delay = delay return f diff --git a/rq/exceptions.py b/rq/exceptions.py index 982a580..25e4f0e 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -15,5 +15,6 @@ class UnpickleError(Exception): super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data + class DequeueTimeout(Exception): pass diff --git a/rq/job.py b/rq/job.py index fdc4b57..a168738 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,7 +4,7 @@ import times from uuid import uuid4 try: from cPickle import loads, dumps, UnpicklingError -except ImportError: # noqa +except ImportError: # noqa from pickle import loads, dumps, UnpicklingError # noqa from .local import LocalStack from .connections import resolve_connection @@ -16,8 +16,9 @@ def enum(name, *sequential, **named): values = dict(zip(sequential, range(len(sequential))), **named) return type(name, (), values) -Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', - STARTED='started') +Status = enum('Status', + QUEUED='queued', FINISHED='finished', FAILED='failed', + STARTED='started') def unpickle(pickled_string): @@ -287,15 +288,12 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} - def save(self, pipeline=None): - """Persists the current job instance to its corresponding Redis key.""" - key = self.key - connection = pipeline if pipeline is not None else self.connection - + def dump(self): + """Returns a serialization of the current job instance""" obj = {} obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') @@ -322,7 +320,14 @@ class Job(object): if self.meta: obj['meta'] = dumps(self.meta) - connection.hmset(key, obj) + return obj + + def save(self, pipeline=None): + """Persists the current job instance to its corresponding Redis key.""" + key = self.key + connection = pipeline if pipeline is not None else self.connection + + connection.hmset(key, self.dump()) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -350,7 +355,6 @@ class Job(object): assert self.id == _job_stack.pop() return self._result - def get_ttl(self, default_ttl=None): """Returns ttl for a job that determines how long a job and its result will be persisted. In the future, this method will also be responsible @@ -379,13 +383,12 @@ class Job(object): - If it's a positive number, set the job to expire in X seconds. - If result_ttl is negative, don't set an expiry to it (persist forever) - """ + """ if ttl == 0: self.cancel() elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) - def __str__(self): return '' % (self.id, self.description) diff --git a/rq/local.py b/rq/local.py index 555a6d1..61f896f 100644 --- a/rq/local.py +++ b/rq/local.py @@ -13,13 +13,13 @@ # current thread ident. try: from greenlet import getcurrent as get_ident -except ImportError: # noqa +except ImportError: # noqa try: from thread import get_ident # noqa - except ImportError: # noqa + except ImportError: # noqa try: from _thread import get_ident # noqa - except ImportError: # noqa + except ImportError: # noqa from dummy_thread import get_ident # noqa @@ -119,6 +119,7 @@ class LocalStack(object): def _get__ident_func__(self): return self._local.__ident_func__ + def _set__ident_func__(self, value): # noqa object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) diff --git a/rq/timeouts.py b/rq/timeouts.py index d26528a..f1e1848 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -33,7 +33,7 @@ class death_penalty_after(object): def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % self._timeout) + 'value (%d seconds).' % self._timeout) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises diff --git a/rq/utils.py b/rq/utils.py index 44bbe65..8219c9f 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -16,8 +16,7 @@ def gettermsize(): def ioctl_GWINSZ(fd): try: import fcntl, termios, struct # noqa - cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, - '1234')) + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr @@ -53,7 +52,7 @@ class _Colorizer(object): self.codes["overline"] = esc + "06m" dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", - "purple", "teal", "lightgray"] + "purple", "teal", "lightgray"] light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] @@ -139,7 +138,7 @@ class ColorizingStreamHandler(logging.StreamHandler): def __init__(self, exclude=None, *args, **kwargs): self.exclude = exclude - if is_python_version((2,6)): + if is_python_version((2, 6)): logging.StreamHandler.__init__(self, *args, **kwargs) else: super(ColorizingStreamHandler, self).__init__(*args, **kwargs) diff --git a/rq/worker.py b/rq/worker.py index 3ba4250..64f6501 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -43,9 +43,9 @@ def iterable(x): def compact(l): return [x for x in l if x is not None] -_signames = dict((getattr(signal, signame), signame) \ - for signame in dir(signal) \ - if signame.startswith('SIG') and '_' not in signame) +_signames = dict((getattr(signal, signame), signame) + for signame in dir(signal) + if signame.startswith('SIG') and '_' not in signame) def signal_name(signum): @@ -68,8 +68,8 @@ class Worker(object): if connection is None: connection = get_current_connection() reported_working = connection.smembers(cls.redis_workers_keys) - workers = [cls.find_by_key(as_text(key), connection) for key in - reported_working] + workers = [cls.find_by_key(as_text(key), connection) + for key in reported_working] return compact(workers) @classmethod @@ -95,13 +95,12 @@ class Worker(object): worker._state = connection.hget(worker.key, 'state') or '?' if queues: worker.queues = [Queue(queue, connection=connection) - for queue in queues.split(',')] + for queue in queues.split(',')] return worker - def __init__(self, queues, name=None, - default_result_ttl=DEFAULT_RESULT_TTL, connection=None, - exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa + default_result_ttl=DEFAULT_RESULT_TTL, connection=None, + exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -193,9 +192,8 @@ class Worker(object): self.log.debug('Registering birth of worker %s' % (self.name,)) if self.connection.exists(self.key) and \ not self.connection.hexists(self.key, 'death'): - raise ValueError( - 'There exists an active worker named \'%s\' ' - 'already.' % (self.name,)) + raise ValueError('There exists an active worker named \'%s\' ' + 'already.' % (self.name,)) key = self.key now = time.time() queues = ','.join(self.queue_names()) @@ -304,8 +302,8 @@ class Worker(object): qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') - self.log.info('*** Listening on %s...' % \ - green(', '.join(qnames))) + self.log.info('*** Listening on %s...' % + green(', '.join(qnames))) timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -324,7 +322,7 @@ class Worker(object): # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + blue(job.description), job.id)) self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) @@ -336,19 +334,17 @@ class Worker(object): self.register_death() return did_perform_work - def dequeue_job_and_maintain_ttl(self, timeout): while True: try: return Queue.dequeue_any(self.queues, timeout, - connection=self.connection) + connection=self.connection) except DequeueTimeout: pass self.log.debug('Sending heartbeat to prevent worker timeout.') self.connection.expire(self.key, self.default_worker_ttl) - def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes @@ -443,12 +439,10 @@ class Worker(object): return True - def handle_exception(self, job, *exc_info): """Walks the exception handler stack to delegate exception handling.""" - exc_string = ''.join( - traceback.format_exception_only(*exc_info[:2]) + - traceback.format_exception(*exc_info)) + exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) + + traceback.format_exception(*exc_info)) self.log.error(exc_string) for handler in reversed(self._exc_handlers):