diff --git a/rq/connections.py b/rq/connections.py index ae4d036..03b8c05 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -43,7 +43,7 @@ def use_connection(redis=None): use of use_connection() and stacked connection contexts. """ assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection().' + 'You should not mix Connection contexts with use_connection()' release_local(_connection_stack) if redis is None: @@ -67,7 +67,7 @@ def resolve_connection(connection=None): connection = get_current_connection() if connection is None: - raise NoRedisConnectionException('Could not resolve a Redis connection.') + raise NoRedisConnectionException('Could not resolve a Redis connection') return connection diff --git a/rq/job.py b/rq/job.py index ebccb13..1828fe2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -50,7 +50,7 @@ def unpickle(pickled_string): try: obj = loads(pickled_string) except Exception as e: - raise UnpickleError('Could not unpickle.', pickled_string, e) + raise UnpickleError('Could not unpickle', pickled_string, e) return obj @@ -99,9 +99,9 @@ class Job(object): kwargs = {} if not isinstance(args, (tuple, list)): - raise TypeError('{0!r} is not a valid args list.'.format(args)) + raise TypeError('{0!r} is not a valid args list'.format(args)) if not isinstance(kwargs, dict): - raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) + raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) job = cls(connection=connection) if id is not None: @@ -116,7 +116,7 @@ class Job(object): job._instance = func.__self__ job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): - job._func_name = '%s.%s' % (func.__module__, func.__name__) + job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) elif isinstance(func, string_types): job._func_name = as_text(func) elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance @@ -212,7 +212,7 @@ class Job(object): def data(self): if self._data is UNEVALUATED: if self._func_name is UNEVALUATED: - raise ValueError('Cannot build the job data.') + raise ValueError('Cannot build the job data') if self._instance is UNEVALUATED: self._instance = None @@ -317,7 +317,7 @@ class Job(object): self.meta = {} def __repr__(self): # noqa - return 'Job(%r, enqueued_at=%r)' % (self._id, self.enqueued_at) + return 'Job({0!r}, enqueued_at={1!r})'.format(self._id, self.enqueued_at) # Data access def get_id(self): # noqa @@ -331,7 +331,7 @@ class Job(object): def set_id(self, value): """Sets a job ID for the given job.""" if not isinstance(value, string_types): - raise TypeError('id must be a string, not {0}.'.format(type(value))) + raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value id = property(get_id, set_id) @@ -344,7 +344,7 @@ class Job(object): @classmethod def dependents_key_for(cls, job_id): """The Redis key that is used to store job hash under.""" - return 'rq:job:%s:dependents' % (job_id,) + return 'rq:job:{0}:dependents'.format(job_id) @property def key(self): @@ -393,7 +393,7 @@ class Job(object): key = self.key obj = decode_redis_hash(self.connection.hgetall(key)) if len(obj) == 0: - raise NoSuchJobError('No such job: %s' % (key,)) + raise NoSuchJobError('No such job: {0}'.format(key)) def to_date(date_str): if date_str is None: @@ -526,7 +526,7 @@ class Job(object): arg_list += sorted(kwargs) args = ', '.join(arg_list) - return '%s(%s)' % (self.func_name, args) + return '{0}({1})'.format(self.func_name, args) def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually @@ -565,7 +565,7 @@ class Job(object): connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) def __str__(self): - return '' % (self.id, self.description) + return ''.format(self.id, self.description) # Job equality def __eq__(self, other): # noqa diff --git a/rq/queue.py b/rq/queue.py index bf5a186..16f5b0a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -49,7 +49,7 @@ class Queue(object): """ prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): - raise ValueError('Not a valid RQ queue key: %s' % (queue_key,)) + raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) name = queue_key[len(prefix):] return cls(name, connection=connection) @@ -58,7 +58,7 @@ class Queue(object): self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name - self._key = '%s%s' % (prefix, name) + self._key = '{0}{1}'.format(prefix, name) self._default_timeout = default_timeout self._async = async @@ -71,7 +71,7 @@ class Queue(object): return self.count def __iter__(self): - yield self + yield self @property def key(self): @@ -230,7 +230,7 @@ class Queue(object): """ if not isinstance(f, string_types) and f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed ' - 'by workers.') + 'by workers') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) @@ -243,7 +243,7 @@ class Queue(object): at_front = kwargs.pop('at_front', False) if 'args' in kwargs or 'kwargs' in kwargs: - assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa + assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) @@ -314,7 +314,7 @@ class Queue(object): connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: - raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.') + raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) @@ -385,22 +385,22 @@ class Queue(object): # auto-generated by the @total_ordering decorator) def __eq__(self, other): # noqa if not isinstance(other, Queue): - raise TypeError('Cannot compare queues to other objects.') + raise TypeError('Cannot compare queues to other objects') return self.name == other.name def __lt__(self, other): if not isinstance(other, Queue): - raise TypeError('Cannot compare queues to other objects.') + raise TypeError('Cannot compare queues to other objects') return self.name < other.name def __hash__(self): return hash(self.name) def __repr__(self): # noqa - return 'Queue(%r)' % (self.name,) + return 'Queue({0!r})'.format(self.name) def __str__(self): - return '' % (self.name,) + return ''.format(self.name) class FailedQueue(Queue): @@ -436,7 +436,7 @@ class FailedQueue(Queue): # Delete it from the failed queue (raise an error if that failed) if self.remove(job) == 0: - raise InvalidJobOperationError('Cannot requeue non-failed jobs.') + raise InvalidJobOperationError('Cannot requeue non-failed jobs') job.set_status(JobStatus.QUEUED) job.exc_info = None diff --git a/rq/registry.py b/rq/registry.py index ded86da..8d66c0d 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -69,7 +69,7 @@ class StartedJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) - self.key = 'rq:wip:%s' % name + self.key = 'rq:wip:{0}'.format(name) def cleanup(self, timestamp=None): """Remove expired jobs from registry and add them to FailedQueue. @@ -108,7 +108,7 @@ class FinishedJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(FinishedJobRegistry, self).__init__(name, connection) - self.key = 'rq:finished:%s' % name + self.key = 'rq:finished:{0}'.format(name) def cleanup(self, timestamp=None): """Remove expired jobs from registry. @@ -128,7 +128,7 @@ class DeferredJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(DeferredJobRegistry, self).__init__(name, connection) - self.key = 'rq:deferred:%s' % name + self.key = 'rq:deferred:{0}'.format(name) def cleanup(self): """This method is only here to prevent errors because this method is diff --git a/rq/timeouts.py b/rq/timeouts.py index a7b1242..a6afdf2 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -48,7 +48,7 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % self._timeout) + 'value ({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises diff --git a/rq/worker.py b/rq/worker.py index e53aff4..27df2dd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -102,7 +102,7 @@ class Worker(object): """ prefix = cls.redis_worker_namespace_prefix if not worker_key.startswith(prefix): - raise ValueError('Not a valid RQ worker key: %s' % (worker_key,)) + raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key)) if connection is None: connection = get_current_connection() @@ -166,13 +166,12 @@ class Worker(object): raise NoQueueError('{0} is not a queue'.format(queue)) def process_queue_args(self, queue_args): - """ allow for a string, a queue an iterable of strings - or an iterable of queues""" + """Allow for a string, a queue an iterable of strings or an iterable of queues""" if isinstance(queue_args, text_type): - return self.queue_class(name = queue_args) + return self.queue_class(name=queue_args) else: - return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) - else queue_arg for queue_arg in queue_args] + return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) else queue_arg + for queue_arg in queue_args] def queue_names(self): """Returns the queue names of this worker's queues.""" @@ -193,7 +192,7 @@ class Worker(object): if self._name is None: hostname = socket.gethostname() shortname, _, _ = hostname.partition('.') - self._name = '%s.%s' % (shortname, self.pid) + self._name = '{0}.{1}'.format(shortname, self.pid) return self._name @property @@ -223,15 +222,15 @@ class Worker(object): This can be used to make `ps -ef` output more readable. """ - setprocname('rq: %s' % (message,)) + setprocname('rq: {0}'.format(message)) def register_birth(self): """Registers its own birth.""" - self.log.debug('Registering birth of worker %s' % (self.name,)) + self.log.debug('Registering birth of worker {0}'.format(self.name)) if self.connection.exists(self.key) and \ not self.connection.hexists(self.key, 'death'): - raise ValueError('There exists an active worker named \'%s\' ' - 'already.' % (self.name,)) + msg = 'There exists an active worker named {0!r} already' + raise ValueError(msg.format(self.name)) key = self.key queues = ','.join(self.queue_names()) with self.connection._pipeline() as p: @@ -326,18 +325,18 @@ class Worker(object): def request_force_stop(signum, frame): """Terminates the application (cold shutdown). """ - self.log.warning('Cold shut down.') + self.log.warning('Cold shut down') # Take down the horse with the worker if self.horse_pid: - msg = 'Taking down horse %d with me.' % self.horse_pid + msg = 'Taking down horse {0} with me'.format(self.horse_pid) self.log.debug(msg) try: os.kill(self.horse_pid, signal.SIGKILL) except OSError as e: # ESRCH ("No such process") is fine with us if e.errno != errno.ESRCH: - self.log.debug('Horse already down.') + self.log.debug('Horse already down') raise raise SystemExit() @@ -345,12 +344,12 @@ class Worker(object): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). """ - self.log.debug('Got signal %s.' % signal_name(signum)) + self.log.debug('Got signal {0}'.format(signal_name(signum))) signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) - msg = 'Warm shut down requested.' + msg = 'Warm shut down requested' self.log.warning(msg) # If shutdown is requested in the middle of a job, wait until @@ -374,12 +373,12 @@ class Worker(object): while not self.stopped and is_suspended(self.connection): if burst: - self.log.info('Suspended in burst mode -- exiting.' - 'Note: There could still be unperformed jobs on the queue') + self.log.info('Suspended in burst mode, exiting') + self.log.info('Note: There could still be unfinished jobs on the queue') raise StopRequested if not notified: - self.log.info('Worker suspended, use "rq resume" command to resume') + self.log.info('Worker suspended, run `rq resume` to resume') before_state = self.get_state() self.set_state(WorkerStatus.SUSPENDED) notified = True @@ -402,7 +401,7 @@ class Worker(object): did_perform_work = False self.register_birth() - self.log.info("RQ worker, '%s', started, version %s" % (self.key, VERSION)) + self.log.info("RQ worker {0!r} started, version %s".format(self.key, VERSION)) self.set_state(WorkerStatus.STARTED) try: @@ -414,7 +413,7 @@ class Worker(object): self.clean_registries() if self.stopped: - self.log.info('Stopping on request.') + self.log.info('Stopping on request') break timeout = None if burst else max(1, self.default_worker_ttl - 60) @@ -422,7 +421,7 @@ class Worker(object): result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: - self.log.info("RQ worker, '%s', done, quitting." % self.key) + self.log.info("RQ worker {0!r} done, quitting".format(self.key)) break except StopRequested: break @@ -446,10 +445,9 @@ class Worker(object): qnames = self.queue_names() self.set_state(WorkerStatus.IDLE) - self.procline('Listening on %s' % ','.join(qnames)) + self.procline('Listening on {0}'.format(','.join(qnames))) self.log.info('') - self.log.info('*** Listening on %s...' % - green(', '.join(qnames))) + self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames)))) while True: self.heartbeat() @@ -459,8 +457,8 @@ class Worker(object): connection=self.connection) if result is not None: job, queue = result - self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + self.log.info('{0}: {1} ({2})'.format(green(queue.name), + blue(job.description), job.id)) break except DequeueTimeout: @@ -497,7 +495,7 @@ class Worker(object): self.main_work_horse(job) else: self._horse_pid = child_pid - self.procline('Forked %d at %d' % (child_pid, time.time())) + self.procline('Forked {0} at {0}'.format(child_pid, time.time())) while True: try: self.set_state('busy') @@ -552,9 +550,8 @@ class Worker(object): job.set_status(JobStatus.STARTED, pipeline=pipeline) pipeline.execute() - self.procline('Processing %s from %s since %s' % ( - job.func_name, - job.origin, time.time())) + msg = 'Processing {0} from {1} since {2}' + self.procline(msg.format(job.func_name, job.origin, time.time())) def perform_job(self, job): """Performs the actual work of a job. Will/should only be called @@ -599,14 +596,14 @@ class Worker(object): if rv is None: self.log.info('Job OK') else: - self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),)) + self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv)))) if result_ttl == 0: - self.log.info('Result discarded immediately.') + self.log.info('Result discarded immediately') elif result_ttl > 0: - self.log.info('Result is kept for %d seconds.' % result_ttl) + self.log.info('Result is kept for {0} seconds'.format(result_ttl)) else: - self.log.warning('Result will never expire, clean up result key manually.') + self.log.warning('Result will never expire, clean up result key manually') return True @@ -622,7 +619,7 @@ class Worker(object): }) for handler in reversed(self._exc_handlers): - self.log.debug('Invoking exception handler %s' % (handler,)) + self.log.debug('Invoking exception handler {0}'.format(handler)) fallthrough = handler(job, *exc_info) # Only handlers with explicit return values should disable further @@ -636,7 +633,7 @@ class Worker(object): def move_to_failed_queue(self, job, *exc_info): """Default exception handler: move the job to the failed queue.""" exc_string = ''.join(traceback.format_exception(*exc_info)) - self.log.warning('Moving job to %s queue.' % self.failed_queue.name) + self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) def push_exc_handler(self, handler_func):