From 57990fa052f316fd897cb0e499208e382a3fd0b4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 18:22:47 +0700 Subject: [PATCH 01/23] Simplify worker.work() by moving some functionalities to relevant methods. --- rq/worker.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index df4812c..92d55bb 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -332,12 +332,7 @@ class Worker(object): if self.stopped: self.log.info('Stopping on request.') break - self.set_state('idle') - qnames = self.queue_names() - self.procline('Listening on %s' % ','.join(qnames)) - self.log.info('') - self.log.info('*** Listening on %s...' % - green(', '.join(qnames))) + timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -346,21 +341,10 @@ class Worker(object): except StopRequested: break - self.set_state('busy') - job, queue = result - self.set_current_job_id(job.id) - - # Use the public setter here, to immediately update Redis - job.set_status(Status.STARTED) - self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) - - self.heartbeat((job.timeout or 180) + 60) self.execute_job(job) self.heartbeat() - self.set_current_job_id(None) - + if job.get_status() == Status.FINISHED: queue.enqueue_dependents(job) @@ -372,11 +356,25 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None + qnames = self.queue_names() + + self.set_state('idle') + self.procline('Listening on %s' % ','.join(qnames)) + self.log.info('') + self.log.info('*** Listening on %s...' % + green(', '.join(qnames))) + while True: self.heartbeat() + try: result = Queue.dequeue_any(self.queues, timeout, connection=self.connection) + if result is not None: + job, queue = result + self.log.info('%s: %s (%s)' % (green(queue.name), + blue(job.description), job.id)) + break except DequeueTimeout: pass @@ -453,6 +451,12 @@ class Worker(object): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ + + self.set_state('busy') + self.set_current_job_id(job.id) + job.set_status(Status.STARTED) + self.heartbeat((job.timeout or 180) + 60) + self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) @@ -467,11 +471,13 @@ class Worker(object): job._result = rv job._status = Status.FINISHED job.ended_at = utcnow() + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() except Exception: From 835c353380b114cac9a4d9ab2978d233af5dcb29 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 18:40:09 +0700 Subject: [PATCH 02/23] Job status should be updated during job.perform(). --- rq/job.py | 6 +++++- rq/worker.py | 6 ++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 9e9f207..8409eee 100644 --- a/rq/job.py +++ b/rq/job.py @@ -448,9 +448,13 @@ class Job(object): """Invokes the job function with the job arguments.""" _job_stack.push(self.id) try: - self._result = self.func(*self.args, **self.kwargs) + self.set_status(Status.STARTED) + self._result = self.func(*self.args, **self.kwargs) + self.set_status(Status.FINISHED) + self.ended_at = utcnow() finally: assert self.id == _job_stack.pop() + return self._result def get_ttl(self, default_ttl=None): diff --git a/rq/worker.py b/rq/worker.py index 92d55bb..688b1a4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -453,8 +453,7 @@ class Worker(object): """ self.set_state('busy') - self.set_current_job_id(job.id) - job.set_status(Status.STARTED) + self.set_current_job_id(job.id) self.heartbeat((job.timeout or 180) + 60) self.procline('Processing %s from %s since %s' % ( @@ -469,8 +468,7 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails job._result = rv - job._status = Status.FINISHED - job.ended_at = utcnow() + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) From e77b3e8a21bee68b2419855209d1a740ce1dcd9c Mon Sep 17 00:00:00 2001 From: Malthe Borch Date: Thu, 27 Mar 2014 13:19:04 +0100 Subject: [PATCH 03/23] Pull argument parser defaults from environment variables. --- rq/scripts/__init__.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 575a8cb..28a7eab 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -1,3 +1,4 @@ +import os import importlib import redis from warnings import warn @@ -7,18 +8,25 @@ from rq import use_connection def add_standard_arguments(parser): parser.add_argument('--config', '-c', default=None, help='Module containing RQ settings.') - parser.add_argument('--url', '-u', default=None, + parser.add_argument('--url', '-u', + default=os.environ.get('RQ_REDIS_URL'), help='URL describing Redis connection details. ' - 'Overrides other connection arguments if supplied.') - parser.add_argument('--host', '-H', default=None, + 'Overrides other connection arguments if ' + 'supplied.') + parser.add_argument('--host', '-H', + default=os.environ.get('RQ_REDIS_HOST', 'localhost'), help='The Redis hostname (default: localhost)') - parser.add_argument('--port', '-p', default=None, + parser.add_argument('--port', '-p', + default=int(os.environ.get('RQ_REDIS_PORT', 6379)), help='The Redis portnumber (default: 6379)') - parser.add_argument('--db', '-d', type=int, default=None, + parser.add_argument('--db', '-d', type=int, + default=int(os.environ.get('RQ_REDIS_DB', 0)), help='The Redis database (default: 0)') - parser.add_argument('--password', '-a', default=None, + parser.add_argument('--password', '-a', + default=os.environ.get('RQ_REDIS_PASSWORD'), help='The Redis password (default: None)') - parser.add_argument('--socket', '-s', default=None, + parser.add_argument('--socket', '-s', + default=os.environ.get('RQ_REDIS_SOCKET'), help='The Redis Unix socket') From e16b89de3b357cbf26ba0b75904886645543bdd7 Mon Sep 17 00:00:00 2001 From: Christophe Olinger Date: Fri, 4 Apr 2014 08:51:18 +0200 Subject: [PATCH 04/23] Use get_state() instead of .state in rqinfo --- rq/scripts/rqinfo.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 73c1a7c..08b0283 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -101,9 +101,9 @@ def show_workers(args): for w in ws: worker_queues = filter_queues(w.queue_names()) if not args.raw: - print('%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(worker_queues))) + print('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) else: - print('worker %s %s %s' % (w.name, w.state, ','.join(worker_queues))) + print('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) else: # Create reverse lookup table queues = dict([(q, []) for q in qs]) @@ -116,7 +116,7 @@ def show_workers(args): max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 for q in queues: if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.state)), queues[q]))) + queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) else: queues_str = '–' print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) From 3667e37ed3c84b4a13ba97c12518b3a63c9107aa Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 4 Apr 2014 09:00:56 +0200 Subject: [PATCH 05/23] Don't expose the envvar values through `rqinfo -h`. --- rq/scripts/__init__.py | 54 ++++++++++++++++-------------------------- setup.py | 2 +- 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 28a7eab..ec188ee 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -1,32 +1,29 @@ -import os import importlib -import redis +import os +from functools import partial from warnings import warn + +import redis +from first import first + from rq import use_connection def add_standard_arguments(parser): parser.add_argument('--config', '-c', default=None, help='Module containing RQ settings.') - parser.add_argument('--url', '-u', - default=os.environ.get('RQ_REDIS_URL'), + parser.add_argument('--url', '-u', default=None, help='URL describing Redis connection details. ' - 'Overrides other connection arguments if ' - 'supplied.') - parser.add_argument('--host', '-H', - default=os.environ.get('RQ_REDIS_HOST', 'localhost'), + 'Overrides other connection arguments if supplied.') + parser.add_argument('--host', '-H', default=None, help='The Redis hostname (default: localhost)') - parser.add_argument('--port', '-p', - default=int(os.environ.get('RQ_REDIS_PORT', 6379)), + parser.add_argument('--port', '-p', default=None, help='The Redis portnumber (default: 6379)') - parser.add_argument('--db', '-d', type=int, - default=int(os.environ.get('RQ_REDIS_DB', 0)), + parser.add_argument('--db', '-d', type=int, default=None, help='The Redis database (default: 0)') - parser.add_argument('--password', '-a', - default=os.environ.get('RQ_REDIS_PASSWORD'), + parser.add_argument('--password', '-a', default=None, help='The Redis password (default: None)') - parser.add_argument('--socket', '-s', - default=os.environ.get('RQ_REDIS_SOCKET'), + parser.add_argument('--socket', '-s', default=None, help='The Redis Unix socket') @@ -40,31 +37,20 @@ def read_config_file(module): def setup_default_arguments(args, settings): """ Sets up args from settings or defaults """ - if args.url is None: - args.url = settings.get('REDIS_URL') + args.url = first([args.url, settings.get('REDIS_URL'), os.environ.get('RQ_REDIS_URL')]) if (args.host or args.port or args.socket or args.db or args.password): warn('Host, port, db, password options for Redis will not be ' 'supported in future versions of RQ. ' 'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning) - if args.host is None: - args.host = settings.get('REDIS_HOST', 'localhost') - - if args.port is None: - args.port = int(settings.get('REDIS_PORT', 6379)) - else: - args.port = int(args.port) - - socket = settings.get('REDIS_SOCKET', False) - if args.socket is None and socket: - args.socket = socket - - if args.db is None: - args.db = settings.get('REDIS_DB', 0) + strict_first = partial(first, key=lambda obj: obj is not None) - if args.password is None: - args.password = settings.get('REDIS_PASSWORD', None) + args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost']) + args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379])) + args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), False]) + args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0]) + args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')]) def setup_redis(args): diff --git a/setup.py b/setup.py index 500093a..91ab911 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_version(): def get_dependencies(): - deps = ['redis >= 2.4.13'] + deps = ['redis >= 2.4.13', 'first >= 2.0'] if sys.version_info < (2, 7) or \ (sys.version_info >= (3, 0) and sys.version_info < (3, 1)): deps += ['importlib'] From 7eb6c2ab9f74bf209439a9228447daf3cfda19b4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 4 Apr 2014 17:16:33 +0700 Subject: [PATCH 06/23] Job timeouts are now handled by "worker.death_penalty_class". --- rq/timeouts.py | 15 +++++++++++++-- rq/worker.py | 6 ++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/rq/timeouts.py b/rq/timeouts.py index f1e1848..a96d891 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -8,7 +8,9 @@ class JobTimeoutException(Exception): pass -class death_penalty_after(object): +class BaseDeathPenalty(object): + """Base class to setup job timeouts.""" + def __init__(self, timeout): self._timeout = timeout @@ -31,6 +33,15 @@ class death_penalty_after(object): # invoking context. return False + def setup_death_penalty(self): + raise NotImplementedError() + + def cancel_death_penalty(self): + raise NotImplementedError() + + +class UnixSignalDeathPenalty(BaseDeathPenalty): + def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' 'value (%d seconds).' % self._timeout) @@ -48,4 +59,4 @@ class death_penalty_after(object): default signal handling. """ signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) + signal.signal(signal.SIGALRM, signal.SIG_DFL) \ No newline at end of file diff --git a/rq/worker.py b/rq/worker.py index 688b1a4..05b4fa0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -18,7 +18,7 @@ from .job import Job, Status from .utils import make_colorizer, utcnow, utcformat from .logutils import setup_loghandlers from .exceptions import NoQueueError, DequeueTimeout -from .timeouts import death_penalty_after +from .timeouts import UnixSignalDeathPenalty from .version import VERSION from rq.compat import text_type, as_text @@ -59,6 +59,8 @@ def signal_name(signum): class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' + death_penalty_class = UnixSignalDeathPenalty + @classmethod def all(cls, connection=None): @@ -462,7 +464,7 @@ class Worker(object): with self.connection._pipeline() as pipeline: try: - with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): + with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT): rv = job.perform() # Pickle the result in the same try-except block since we need to From 3649ff863bc09e56a6947eb5a0b958af3b6f81df Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 4 Apr 2014 13:01:30 +0200 Subject: [PATCH 07/23] Don't rely on external package `first`. --- rq/scripts/__init__.py | 2 +- rq/utils.py | 39 +++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index ec188ee..bfbb3a1 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -4,9 +4,9 @@ from functools import partial from warnings import warn import redis -from first import first from rq import use_connection +from rq.utils import first def add_standard_arguments(parser): diff --git a/rq/utils.py b/rq/utils.py index f182210..7fd1b1e 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -181,3 +181,42 @@ def utcformat(dt): def utcparse(string): return datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ") + + +def first(iterable, default=None, key=None): + """ + Return first element of `iterable` that evaluates true, else return None + (or an optional default value). + + >>> first([0, False, None, [], (), 42]) + 42 + + >>> first([0, False, None, [], ()]) is None + True + + >>> first([0, False, None, [], ()], default='ohai') + 'ohai' + + >>> import re + >>> m = first(re.match(regex, 'abc') for regex in ['b.*', 'a(.*)']) + >>> m.group(1) + 'bc' + + The optional `key` argument specifies a one-argument predicate function + like that used for `filter()`. The `key` argument, if supplied, must be + in keyword form. For example: + + >>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0) + 4 + + """ + if key is None: + for el in iterable: + if el: + return el + else: + for el in iterable: + if key(el): + return el + + return default diff --git a/setup.py b/setup.py index 91ab911..500093a 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_version(): def get_dependencies(): - deps = ['redis >= 2.4.13', 'first >= 2.0'] + deps = ['redis >= 2.4.13'] if sys.version_info < (2, 7) or \ (sys.version_info >= (3, 0) and sys.version_info < (3, 1)): deps += ['importlib'] From 6494233058fa56e2314a179bbf9ac1df1feaeb2e Mon Sep 17 00:00:00 2001 From: Tomas Hanacek Date: Mon, 7 Apr 2014 19:10:29 +0200 Subject: [PATCH 08/23] rqworker default arguments of socket, worker_ttl, results_ttl bugfix --- rq/scripts/__init__.py | 2 +- rq/worker.py | 31 +++++++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index bfbb3a1..cab1f97 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,7 +48,7 @@ def setup_default_arguments(args, settings): args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost']) args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379])) - args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), False]) + args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), None]) args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0]) args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')]) diff --git a/rq/worker.py b/rq/worker.py index 05b4fa0..11df407 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -100,8 +100,8 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=DEFAULT_RESULT_TTL, connection=None, - exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa + default_result_ttl=None, connection=None, + exc_handler=None, default_worker_ttl=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -111,8 +111,15 @@ class Worker(object): self.queues = queues self.validate_queues() self._exc_handlers = [] + + if default_result_ttl is None: + default_result_ttl = DEFAULT_RESULT_TTL self.default_result_ttl = default_result_ttl + + if default_worker_ttl is None: + default_worker_ttl = DEFAULT_WORKER_TTL self.default_worker_ttl = default_worker_ttl + self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -334,7 +341,7 @@ class Worker(object): if self.stopped: self.log.info('Stopping on request.') break - + timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -359,21 +366,21 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() - - self.set_state('idle') + + self.set_state('idle') self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') self.log.info('*** Listening on %s...' % green(', '.join(qnames))) - + while True: self.heartbeat() - + try: result = Queue.dequeue_any(self.queues, timeout, connection=self.connection) if result is not None: - job, queue = result + job, queue = result self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) @@ -455,9 +462,9 @@ class Worker(object): """ self.set_state('busy') - self.set_current_job_id(job.id) + self.set_current_job_id(job.id) self.heartbeat((job.timeout or 180) + 60) - + self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) @@ -470,14 +477,14 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails job._result = rv - + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - + pipeline.execute() except Exception: From 4f918041e3dcf247c371020c64f3659b6b134de5 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 11 Apr 2014 16:47:31 +0700 Subject: [PATCH 09/23] Rearranged how explicit kwargs are passed into queue.enqueue(). Fixes #322 --- rq/queue.py | 17 +++++++---------- tests/fixtures.py | 4 ++++ tests/test_queue.py | 29 ++++++++++++++++++++++++++++- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 49d43d7..730022f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -190,17 +190,14 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) - timeout = None - description = None - result_ttl = None - depends_on = None - if 'args' in kwargs or 'kwargs' in kwargs or 'depends_on' in kwargs: + timeout = kwargs.pop('timeout', None) + description = kwargs.pop('description', None) + result_ttl = kwargs.pop('result_ttl', None) + depends_on = kwargs.pop('depends_on', None) + + if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa - timeout = kwargs.pop('timeout', None) - description = kwargs.pop('description', None) - args = kwargs.pop('args', None) - result_ttl = kwargs.pop('result_ttl', None) - depends_on = kwargs.pop('depends_on', None) + args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, diff --git a/tests/fixtures.py b/tests/fixtures.py index 337fc70..eaf426e 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -49,6 +49,10 @@ def access_self(): return job.id +def echo(*args, **kwargs): + return (args, kwargs) + + class Number(object): def __init__(self, value): self.value = value diff --git a/tests/test_queue.py b/tests/test_queue.py index 6dde3ea..8106b8d 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,5 +1,5 @@ from tests import RQTestCase -from tests.fixtures import Number, div_by_zero, say_hello, some_calculation +from tests.fixtures import Number, div_by_zero, echo, say_hello, some_calculation from rq import Queue, get_failed_queue from rq.job import Job, Status from rq.worker import Worker @@ -260,6 +260,33 @@ class TestQueue(RQTestCase): job = q.enqueue(say_hello) self.assertEqual(job.get_status(), Status.QUEUED) + def test_enqueue_explicit_args(self): + """enqueue() works for both implicit/explicit args.""" + q = Queue() + + # Implicit args/kwargs mode + job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz') + self.assertEqual(job.timeout, 1) + self.assertEqual(job.result_ttl, 1) + self.assertEqual( + job.perform(), + ((1,), {'bar': 'baz'}) + ) + + # Explicit kwargs mode + kwargs = { + 'timeout': 1, + 'result_ttl': 1, + } + job = q.enqueue(echo, timeout=2, result_ttl=2, args=[1], kwargs=kwargs) + self.assertEqual(job.timeout, 2) + self.assertEqual(job.result_ttl, 2) + self.assertEqual( + job.perform(), + ((1,), {'timeout': 1, 'result_ttl': 1}) + ) + + def test_all_queues(self): """All queues""" q1 = Queue('first-queue') From fdf4abcf69b6cafc7dc2c23053c7c37cd0817b07 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 14 Apr 2014 11:24:55 +0200 Subject: [PATCH 10/23] Fix pipeline call. --- rq/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index 8409eee..87bd83e 100644 --- a/rq/job.py +++ b/rq/job.py @@ -433,7 +433,7 @@ class Job(object): cancellation. Technically, this call is (currently) the same as just deleting the job hash. """ - pipeline = self.connection.pipeline() + pipeline = self.connection._pipeline() self.delete(pipeline=pipeline) pipeline.delete(self.dependents_key) pipeline.execute() From 057c4657efe5400ec4dc2df9ae509a0a2ac85e78 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 14 Apr 2014 11:25:24 +0200 Subject: [PATCH 11/23] Fix whitespace. --- rq/job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/job.py b/rq/job.py index 87bd83e..b25aac0 100644 --- a/rq/job.py +++ b/rq/job.py @@ -449,12 +449,12 @@ class Job(object): _job_stack.push(self.id) try: self.set_status(Status.STARTED) - self._result = self.func(*self.args, **self.kwargs) + self._result = self.func(*self.args, **self.kwargs) self.set_status(Status.FINISHED) - self.ended_at = utcnow() + self.ended_at = utcnow() finally: assert self.id == _job_stack.pop() - + return self._result def get_ttl(self, default_ttl=None): From 4d9c20d5d9ea274929af55ed17f5523547f84367 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 14 Apr 2014 11:26:02 +0200 Subject: [PATCH 12/23] Reimplement Queue.empty() in a Lua script. This makes the .empty() function perform all the computing in Redis itself, rather than in Python. This is both atomic, and faster. --- rq/queue.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 49d43d7..2f725e3 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -65,10 +65,25 @@ class Queue(object): def empty(self): """Removes all messages on the queue.""" - job_list = self.get_jobs() - self.connection.delete(self.key) - for job in job_list: - job.cancel() + script = b""" + local prefix = "rq:job:" + local q = KEYS[1] + local count = 0 + while true do + local job_id = redis.call("lpop", q) + if job_id == false then + break + end + + -- Delete the relevant keys + redis.call("del", prefix..job_id) + redis.call("del", prefix..job_id..":dependents") + count = count + 1 + end + return count + """ + script = self.connection.register_script(script) + return script(keys=[self.key]) def is_empty(self): """Returns whether the current queue is empty.""" From cb34acc279fcefe733a10b75b965c5240aa3acf6 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 14 Apr 2014 11:32:01 +0200 Subject: [PATCH 13/23] Add rqinfo -X command. This can be used to empty the failed queue right from the command line. --- CHANGES.md | 3 +++ rq/scripts/rqinfo.py | 44 +++++++++++++++++++++++++------------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f61488c..e45f487 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,9 @@ ### 0.4.0 (not released yet) +- Emptying the failed queue from the command line is now as simple as running + `rqinfo -X` or `rqinfo --empty-failed-queue`. + - Job data is unpickled lazily. Thanks, Malthe! - Removed dependency on the `times` library. Thanks, Malthe! diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 08b0283..f9c179d 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -1,16 +1,16 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import sys +import argparse import os +import sys import time -import argparse + from redis.exceptions import ConnectionError -from rq import Queue, Worker + +from rq import get_failed_queue, Queue, Worker +from rq.scripts import (add_standard_arguments, read_config_file, + setup_default_arguments, setup_redis) from rq.utils import gettermsize, make_colorizer -from rq.scripts import add_standard_arguments -from rq.scripts import setup_redis -from rq.scripts import read_config_file -from rq.scripts import setup_default_arguments red = make_colorizer('darkred') green = make_colorizer('darkgreen') @@ -116,7 +116,7 @@ def show_workers(args): max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 for q in queues: if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) + queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa else: queues_str = '–' print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) @@ -140,11 +140,12 @@ def parse_args(): parser = argparse.ArgumentParser(description='RQ command-line monitor.') add_standard_arguments(parser) parser.add_argument('--path', '-P', default='.', help='Specify the import path.') - parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') - parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') - parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') - parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') - parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') + parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa + parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa + parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa + parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa + parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa + parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa parser.add_argument('queues', nargs='*', help='The queues to poll') return parser.parse_args() @@ -173,15 +174,20 @@ def main(): setup_default_arguments(args, settings) setup_redis(args) + try: - if args.only_queues: - func = show_queues - elif args.only_workers: - func = show_workers + if args.empty_failed_queue: + num_jobs = get_failed_queue().empty() + print('{} jobs removed from failed queue'.format(num_jobs)) else: - func = show_both + if args.only_queues: + func = show_queues + elif args.only_workers: + func = show_workers + else: + func = show_both - interval(args.interval, func, args) + interval(args.interval, func, args) except ConnectionError as e: print(e) sys.exit(1) From 39258e2c1547f8454fbb07582af217162ff22a9b Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 22 Apr 2014 16:10:19 +0200 Subject: [PATCH 14/23] Release 0.4.0. --- CHANGES.md | 2 +- rq/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e45f487..dd91b44 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,5 @@ ### 0.4.0 -(not released yet) +(April 22nd, 2014) - Emptying the failed queue from the command line is now as simple as running `rqinfo -X` or `rqinfo --empty-failed-queue`. diff --git a/rq/version.py b/rq/version.py index 954b701..5052c1f 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.13' +VERSION = '0.4.0' From dfd23d6fb8febc656be57702fb9f0865e4c7f2c9 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 22 Apr 2014 22:19:52 +0200 Subject: [PATCH 15/23] Fix bug where RQ 0.4 could not unpickle jobs from RQ 0.3.x. --- rq/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rq/utils.py b/rq/utils.py index 7fd1b1e..30e9905 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -180,7 +180,11 @@ def utcformat(dt): def utcparse(string): - return datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ") + try: + return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ') + except ValueError: + # This catches RQ < 0.4 datetime format + return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%S.%f+00:00') def first(iterable, default=None, key=None): From e60584ef76dc5fcfcde23670c8a8a198ee45b96a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 22 Apr 2014 22:20:07 +0200 Subject: [PATCH 16/23] Use single quotes for string literals. --- rq/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/utils.py b/rq/utils.py index 30e9905..a4260d9 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -176,7 +176,7 @@ def utcnow(): def utcformat(dt): - return dt.strftime(u"%Y-%m-%dT%H:%M:%SZ") + return dt.strftime(u'%Y-%m-%dT%H:%M:%SZ') def utcparse(string): From 415662d42c7c1ef75307b86e098084b3fb3bd704 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 22 Apr 2014 22:21:37 +0200 Subject: [PATCH 17/23] Bump version. --- CHANGES.md | 6 ++++++ rq/version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index dd91b44..82bae81 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +### 0.4.1 +(April 22nd, 2014) + +- Fix bug where RQ 0.4 workers could not unpickle/process jobs from RQ < 0.4. + + ### 0.4.0 (April 22nd, 2014) diff --git a/rq/version.py b/rq/version.py index 5052c1f..52f089c 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.4.0' +VERSION = '0.4.1' From ef9456ddbd64dab1dbd80ac770a775994aa68bbf Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 28 Apr 2014 08:31:09 +0200 Subject: [PATCH 18/23] Bump version to 0.4.2. --- CHANGES.md | 6 ++++++ rq/version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 82bae81..9d33ceb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +### 0.4.2 +(April 28th, 2014) + +- Add missing depends_on kwarg to @job decorator. Thanks, Sasha! + + ### 0.4.1 (April 22nd, 2014) diff --git a/rq/version.py b/rq/version.py index 52f089c..1376913 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.4.1' +VERSION = '0.4.2' From cb278a95490b53c770fa3fa95fd6a604cd38fc79 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 28 Apr 2014 08:34:52 +0200 Subject: [PATCH 19/23] Add Python 3.4 to targets. --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index bb92511..e9d42df 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=py26,py27,py33,pypy +envlist=py26,py27,py33,py34,pypy [testenv] commands=py.test [] From 9def988a85d414e07e0ae42c9e294ccffad82b7d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 5 May 2014 10:08:21 +0200 Subject: [PATCH 20/23] Flip conditional sides of helper definitions (no semantic change). --- rq/compat/__init__.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index ac9b7a9..a7817b4 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -41,21 +41,11 @@ else: return cls -PY2 = sys.version_info[0] < 3 - -if PY2: - string_types = (str, unicode) - text_type = unicode - - def as_text(v): - return v - - def decode_redis_hash(h): - return h - -else: - string_types = (str,) +PY2 = sys.version_info[0] == 2 +if not PY2: + # Python 3.x and up text_type = str + string_types = (str,) def as_text(v): if v is None: @@ -69,3 +59,13 @@ else: def decode_redis_hash(h): return dict((as_text(k), h[k]) for k in h) +else: + # Python 2.x + text_type = unicode + string_types = (str, unicode) + + def as_text(v): + return v + + def decode_redis_hash(h): + return h From 4746602c57ebe258ede3bfef517d6719a59cb24b Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 5 May 2014 10:15:15 +0200 Subject: [PATCH 21/23] Explicit string check in Job constructor. --- rq/job.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rq/job.py b/rq/job.py index b25aac0..b39abaf 100644 --- a/rq/job.py +++ b/rq/job.py @@ -8,7 +8,7 @@ from .local import LocalStack from .connections import resolve_connection from .exceptions import UnpickleError, NoSuchJobError from .utils import import_attribute, utcnow, utcformat, utcparse -from rq.compat import text_type, decode_redis_hash, as_text +from rq.compat import text_type, decode_redis_hash, as_text, string_types def enum(name, *sequential, **named): @@ -96,8 +96,10 @@ class Job(object): job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): job._func_name = '%s.%s' % (func.__module__, func.__name__) - else: # we expect a string - job._func_name = func + elif isinstance(func, string_types): + job._func_name = as_text(func) + else: + raise TypeError('Expected a function/method/string, but got: {}'.format(func)) job._args = args job._kwargs = kwargs From 38ec259b6e436d7868af8e99d55305340d5375ce Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 5 May 2014 10:49:29 +0200 Subject: [PATCH 22/23] Enable the most modern Python syntax. --- examples/fib.py | 5 +++++ examples/run_example.py | 8 +++++++- examples/run_worker.py | 5 ++++- rq/__init__.py | 17 +++++++++-------- rq/compat/__init__.py | 6 +++++- rq/compat/connections.py | 7 ++++++- rq/connections.py | 8 +++++++- rq/contrib/legacy.py | 5 +++++ rq/contrib/sentry.py | 5 +++++ rq/decorators.py | 10 ++++++++-- rq/dummy.py | 6 +++++- rq/exceptions.py | 5 +++++ rq/job.py | 11 ++++++++++- rq/logutils.py | 4 ++++ rq/queue.py | 4 ++++ rq/scripts/__init__.py | 5 ++++- rq/scripts/rqgenload.py | 8 ++++++-- rq/scripts/rqinfo.py | 4 +++- rq/scripts/rqworker.py | 11 ++++++++--- rq/timeouts.py | 4 ++++ rq/utils.py | 3 +++ rq/version.py | 3 +++ rq/worker.py | 35 +++++++++++++++++++++-------------- tests/__init__.py | 11 ++++++++--- tests/dummy_settings.py | 4 ++++ tests/fixtures.py | 8 ++++++-- tests/helpers.py | 4 ++++ tests/test_connection.py | 10 +++++++--- tests/test_decorator.py | 9 +++++++-- tests/test_job.py | 28 +++++++++++++++++++--------- tests/test_queue.py | 14 ++++++++++---- tests/test_scripts.py | 7 ++++++- tests/test_sentry.py | 9 +++++++-- tests/test_worker.py | 16 +++++++++++----- 34 files changed, 230 insertions(+), 69 deletions(-) diff --git a/examples/fib.py b/examples/fib.py index 2130b3c..91606cb 100644 --- a/examples/fib.py +++ b/examples/fib.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + def slow_fib(n): if n <= 1: return 1 diff --git a/examples/run_example.py b/examples/run_example.py index cbcc6e9..d19a6dc 100644 --- a/examples/run_example.py +++ b/examples/run_example.py @@ -1,6 +1,12 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import os import time -from rq import Queue, Connection + +from rq import Connection, Queue + from fib import slow_fib diff --git a/examples/run_worker.py b/examples/run_worker.py index 7c8adae..4feb217 100644 --- a/examples/run_worker.py +++ b/examples/run_worker.py @@ -1,5 +1,8 @@ -from rq import Queue, Worker, Connection +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from rq import Connection, Queue, Worker if __name__ == '__main__': # Tell rq what Redis connection to use diff --git a/rq/__init__.py b/rq/__init__.py index 94e1dd1..95050f3 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,12 +1,13 @@ -from .connections import get_current_connection -from .connections import use_connection, push_connection, pop_connection -from .connections import Connection -from .queue import Queue, get_failed_queue -from .job import cancel_job, requeue_job -from .job import get_current_job -from .worker import Worker -from .version import VERSION +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from .connections import (Connection, get_current_connection, pop_connection, + push_connection, use_connection) +from .job import cancel_job, get_current_job, requeue_job +from .queue import get_failed_queue, Queue +from .version import VERSION +from .worker import Worker __all__ = [ 'use_connection', 'get_current_connection', diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index a7817b4..1571dbb 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import sys @@ -35,7 +39,7 @@ else: root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ for opname, opfunc in convert[root]: if opname not in roots: - opfunc.__name__ = opname + opfunc.__name__ = str(opname) opfunc.__doc__ = getattr(int, opname).__doc__ setattr(cls, opname, opfunc) return cls diff --git a/rq/compat/connections.py b/rq/compat/connections.py index 0374b5b..ac22af3 100644 --- a/rq/compat/connections.py +++ b/rq/compat/connections.py @@ -1,6 +1,11 @@ -from redis import Redis, StrictRedis +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from functools import partial +from redis import Redis, StrictRedis + def fix_return_type(func): # deliberately no functools.wraps() call here, since the function being diff --git a/rq/connections.py b/rq/connections.py index ee07070..ae4d036 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,7 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from contextlib import contextmanager + from redis import StrictRedis -from .local import LocalStack, release_local + from .compat.connections import patch_connection +from .local import LocalStack, release_local class NoRedisConnectionException(Exception): diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 2a20b9f..447c002 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + import logging from rq import get_current_connection from rq import Worker diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 4b776d1..84b9ef9 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + def register_sentry(client, worker): """Given a Raven client and an RQ worker, registers exception handlers with the worker so exceptions are logged to Sentry. diff --git a/rq/decorators.py b/rq/decorators.py index 90aebf7..b3dba8d 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,8 +1,14 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from functools import wraps -from .queue import Queue + +from rq.compat import string_types + from .connections import resolve_connection +from .queue import Queue from .worker import DEFAULT_RESULT_TTL -from rq.compat import string_types class job(object): diff --git a/rq/dummy.py b/rq/dummy.py index ee9022b..cdd1afc 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -1,8 +1,12 @@ +# -*- coding: utf-8 -*- """ Some dummy tasks that are well-suited for generating load for testing purposes. """ -import time +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import random +import time def do_nothing(): diff --git a/rq/exceptions.py b/rq/exceptions.py index 25e4f0e..94b22bf 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + class NoSuchJobError(Exception): pass diff --git a/rq/job.py b/rq/job.py index b39abaf..045b57a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import inspect from uuid import uuid4 try: @@ -13,7 +17,12 @@ from rq.compat import text_type, decode_redis_hash, as_text, string_types def enum(name, *sequential, **named): values = dict(zip(sequential, range(len(sequential))), **named) - return type(name, (), values) + + # NOTE: Yes, we *really* want to cast using str() here. + # On Python 2 type() requires a byte string (which is str() on Python 2). + # On Python 3 it does not matter, so we'll use str(), which acts as + # a no-op. + return type(str(name), (), values) Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', diff --git a/rq/logutils.py b/rq/logutils.py index 25e9774..40c1db2 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import logging # Make sure that dictConfig is available diff --git a/rq/queue.py b/rq/queue.py index 317d0f2..3b7bec3 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import uuid from .connections import resolve_connection diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index cab1f97..a797286 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -1,10 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import importlib import os from functools import partial from warnings import warn import redis - from rq import use_connection from rq.utils import first diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py index b643c49..97bb5de 100755 --- a/rq/scripts/rqgenload.py +++ b/rq/scripts/rqgenload.py @@ -1,7 +1,11 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import optparse -from rq import use_connection, Queue -from rq import dummy + +from rq import dummy, Queue, use_connection def parse_args(): diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index f9c179d..2f6af7a 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -1,12 +1,14 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import argparse import os import sys import time from redis.exceptions import ConnectionError - from rq import get_failed_queue, Queue, Worker from rq.scripts import (add_standard_arguments, read_config_file, setup_default_arguments, setup_redis) diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index c7e3fce..84f68f7 100644 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -1,15 +1,20 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import argparse import logging import logging.config import os import sys -from rq import Queue -from rq.logutils import setup_loghandlers from redis.exceptions import ConnectionError +from rq import Queue from rq.contrib.legacy import cleanup_ghosts -from rq.scripts import add_standard_arguments, read_config_file, setup_default_arguments, setup_redis +from rq.logutils import setup_loghandlers +from rq.scripts import (add_standard_arguments, read_config_file, + setup_default_arguments, setup_redis) from rq.utils import import_attribute logger = logging.getLogger(__name__) diff --git a/rq/timeouts.py b/rq/timeouts.py index a96d891..ae0fd48 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -1,3 +1,7 @@ + # -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import signal diff --git a/rq/utils.py b/rq/utils.py index a4260d9..e1786cb 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -5,6 +5,9 @@ Miscellaneous helper functions. The formatter for ANSI colored console output is heavily based on Pygments terminal colorizing code, originally by Georg Brandl. """ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import importlib import datetime import logging diff --git a/rq/version.py b/rq/version.py index 1376913..1497719 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1,4 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) VERSION = '0.4.2' diff --git a/rq/worker.py b/rq/worker.py index 11df407..7bc7113 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,26 +1,33 @@ -import sys -import os +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import errno +import logging +import os import random -import time -try: - from procname import setprocname -except ImportError: - def setprocname(*args, **kwargs): # noqa - pass -import socket import signal +import socket +import sys +import time import traceback -import logging -from .queue import Queue, get_failed_queue + +from rq.compat import as_text, text_type + from .connections import get_current_connection +from .exceptions import DequeueTimeout, NoQueueError from .job import Job, Status -from .utils import make_colorizer, utcnow, utcformat from .logutils import setup_loghandlers -from .exceptions import NoQueueError, DequeueTimeout +from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty +from .utils import make_colorizer, utcformat, utcnow from .version import VERSION -from rq.compat import text_type, as_text + +try: + from procname import setprocname +except ImportError: + def setprocname(*args, **kwargs): # noqa + pass green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') diff --git a/tests/__init__.py b/tests/__init__.py index ac66204..e704eaa 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,13 +1,18 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import logging + +from redis import StrictRedis +from rq import pop_connection, push_connection from rq.compat import is_python_version + if is_python_version((2, 7), (3, 2)): import unittest else: import unittest2 as unittest # noqa -from redis import StrictRedis -from rq import push_connection, pop_connection - def find_empty_redis_database(): """Tries to connect to a random Redis database (starting from 4), and diff --git a/tests/dummy_settings.py b/tests/dummy_settings.py index 1404250..dbc935f 100644 --- a/tests/dummy_settings.py +++ b/tests/dummy_settings.py @@ -1 +1,5 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + REDIS_HOST = "testhost.example.com" diff --git a/tests/fixtures.py b/tests/fixtures.py index eaf426e..a375562 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,11 +1,15 @@ +# -*- coding: utf-8 -*- """ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import time -from rq import Connection + +from rq import Connection, get_current_job from rq.decorators import job -from rq import get_current_job def say_hello(name=None): diff --git a/tests/helpers.py b/tests/helpers.py index 16f1717..3ecf0dd 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from datetime import timedelta diff --git a/tests/test_connection.py b/tests/test_connection.py index e0d149d..445e33f 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,7 +1,11 @@ -from tests import RQTestCase, find_empty_redis_database +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import Connection, Queue + +from tests import find_empty_redis_database, RQTestCase from tests.fixtures import do_nothing -from rq import Queue -from rq import Connection def new_connection(): diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 2521e2e..91c53e9 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -1,10 +1,15 @@ -from tests import RQTestCase -from tests.fixtures import decorated_job +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) from rq.decorators import job from rq.job import Job from rq.worker import DEFAULT_RESULT_TTL +from tests import RQTestCase +from tests.fixtures import decorated_job + + class TestDecorator(RQTestCase): def setUp(self): diff --git a/tests/test_job.py b/tests/test_job.py index e93b8cb..bbe5f7a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,16 +1,23 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from datetime import datetime + +from rq.compat import as_text, PY2 +from rq.exceptions import NoSuchJobError, UnpickleError +from rq.job import get_current_job, Job +from rq.queue import Queue +from rq.utils import utcformat + from tests import RQTestCase -from tests.fixtures import Number, some_calculation, say_hello, access_self +from tests.fixtures import access_self, Number, say_hello, some_calculation from tests.helpers import strip_microseconds + try: from cPickle import loads, dumps except ImportError: from pickle import loads, dumps -from rq.compat import as_text -from rq.job import Job, get_current_job -from rq.exceptions import NoSuchJobError, UnpickleError -from rq.queue import Queue -from rq.utils import utcformat class TestJob(RQTestCase): @@ -240,16 +247,19 @@ class TestJob(RQTestCase): def test_description_is_persisted(self): """Ensure that job's custom description is set properly""" - job = Job.create(func=say_hello, args=('Lionel',), description=u'Say hello!') + job = Job.create(func=say_hello, args=('Lionel',), description='Say hello!') job.save() Job.fetch(job.id, connection=self.testconn) - self.assertEqual(job.description, u'Say hello!') + self.assertEqual(job.description, 'Say hello!') # Ensure job description is constructed from function call string job = Job.create(func=say_hello, args=('Lionel',)) job.save() Job.fetch(job.id, connection=self.testconn) - self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + if PY2: + self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')") + else: + self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") def test_job_access_within_job_function(self): """The current job is accessible within the job function.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 8106b8d..a999b5e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,9 +1,15 @@ -from tests import RQTestCase -from tests.fixtures import Number, div_by_zero, echo, say_hello, some_calculation -from rq import Queue, get_failed_queue +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import get_failed_queue, Queue +from rq.exceptions import InvalidJobOperationError from rq.job import Job, Status from rq.worker import Worker -from rq.exceptions import InvalidJobOperationError + +from tests import RQTestCase +from tests.fixtures import (div_by_zero, echo, Number, say_hello, + some_calculation) class TestQueue(RQTestCase): diff --git a/tests/test_scripts.py b/tests/test_scripts.py index ae661c7..88689c3 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -1,9 +1,14 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from rq.compat import is_python_version +from rq.scripts import read_config_file + if is_python_version((2, 7), (3, 2)): from unittest import TestCase else: from unittest2 import TestCase # noqa -from rq.scripts import read_config_file class TestScripts(TestCase): diff --git a/tests/test_sentry.py b/tests/test_sentry.py index 3fa4737..ba8d1ec 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -1,7 +1,12 @@ -from tests import RQTestCase -from rq import Queue, Worker, get_failed_queue +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import get_failed_queue, Queue, Worker from rq.contrib.sentry import register_sentry +from tests import RQTestCase + class FakeSentry(object): def captureException(self, *args, **kwds): diff --git a/tests/test_worker.py b/tests/test_worker.py index b7147fe..1255442 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,12 +1,18 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import os -from tests import RQTestCase, slow -from tests.fixtures import say_hello, div_by_zero, create_file, \ - create_file_after_timeout -from tests.helpers import strip_microseconds -from rq import Queue, Worker, get_failed_queue + +from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status +from tests import RQTestCase, slow +from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, + say_hello) +from tests.helpers import strip_microseconds + class TestWorker(RQTestCase): def test_create_worker(self): From ab9e6b852e527482234aafcba857d7d85f90ca01 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 5 May 2014 10:49:43 +0200 Subject: [PATCH 23/23] Fix PEP8 complaints. --- rq/compat/__init__.py | 16 ++++++------ rq/contrib/sentry.py | 16 ++++++------ rq/job.py | 13 ++++++---- rq/logutils.py | 2 +- rq/queue.py | 9 +++---- rq/scripts/rqgenload.py | 56 ++++++++++++++++++++--------------------- rq/scripts/rqinfo.py | 2 +- rq/timeouts.py | 6 ++--- rq/worker.py | 13 +++------- tests/__init__.py | 4 +-- tests/helpers.py | 1 - tests/test_decorator.py | 2 +- tests/test_queue.py | 17 +++++-------- tests/test_worker.py | 15 ++++++----- 14 files changed, 80 insertions(+), 92 deletions(-) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 1571dbb..5e660b5 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -21,17 +21,17 @@ else: """Class decorator that fills in missing ordering methods""" convert = { '__lt__': [('__gt__', lambda self, other: other < self), - ('__le__', lambda self, other: not other < self), - ('__ge__', lambda self, other: not self < other)], + ('__le__', lambda self, other: not other < self), + ('__ge__', lambda self, other: not self < other)], '__le__': [('__ge__', lambda self, other: other <= self), - ('__lt__', lambda self, other: not other <= self), - ('__gt__', lambda self, other: not self <= other)], + ('__lt__', lambda self, other: not other <= self), + ('__gt__', lambda self, other: not self <= other)], '__gt__': [('__lt__', lambda self, other: other > self), - ('__ge__', lambda self, other: not other > self), - ('__le__', lambda self, other: not self > other)], + ('__ge__', lambda self, other: not other > self), + ('__le__', lambda self, other: not self > other)], '__ge__': [('__le__', lambda self, other: other >= self), - ('__gt__', lambda self, other: not other >= self), - ('__lt__', lambda self, other: not self >= other)] + ('__gt__', lambda self, other: not other >= self), + ('__lt__', lambda self, other: not self >= other)] } roots = set(dir(cls)) & set(convert) if not roots: diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 84b9ef9..5608e63 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -9,13 +9,13 @@ def register_sentry(client, worker): """ def send_to_sentry(job, *exc_info): client.captureException( - exc_info=exc_info, - extra={ - 'job_id': job.id, - 'func': job.func_name, - 'args': job.args, - 'kwargs': job.kwargs, - 'description': job.description, - }) + exc_info=exc_info, + extra={ + 'job_id': job.id, + 'func': job.func_name, + 'args': job.args, + 'kwargs': job.kwargs, + 'description': job.description, + }) worker.push_exc_handler(send_to_sentry) diff --git a/rq/job.py b/rq/job.py index 045b57a..29562d6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,15 +4,18 @@ from __future__ import (absolute_import, division, print_function, import inspect from uuid import uuid4 + +from rq.compat import as_text, decode_redis_hash, string_types, text_type + +from .connections import resolve_connection +from .exceptions import NoSuchJobError, UnpickleError +from .local import LocalStack +from .utils import import_attribute, utcformat, utcnow, utcparse + try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa from pickle import loads, dumps, UnpicklingError # noqa -from .local import LocalStack -from .connections import resolve_connection -from .exceptions import UnpickleError, NoSuchJobError -from .utils import import_attribute, utcnow, utcformat, utcparse -from rq.compat import text_type, decode_redis_hash, as_text, string_types def enum(name, *sequential, **named): diff --git a/rq/logutils.py b/rq/logutils.py index 40c1db2..0da1d9a 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -28,7 +28,7 @@ def setup_loghandlers(level=None): "handlers": { "console": { "level": "DEBUG", - #"class": "logging.StreamHandler", + # "class": "logging.StreamHandler", "class": "rq.utils.ColorizingStreamHandler", "formatter": "console", "exclude": ["%(asctime)s"], diff --git a/rq/queue.py b/rq/queue.py index 3b7bec3..cc7fd13 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -148,12 +148,10 @@ class Queue(object): if Job.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - - def push_job_id(self, job_id): # noqa + def push_job_id(self, job_id): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): """Creates a job to represent the delayed function call and enqueues @@ -213,10 +211,10 @@ class Queue(object): description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) - + if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa - args = kwargs.pop('args', None) + args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, @@ -347,7 +345,6 @@ class Queue(object): raise e return job, queue - # Total ordering defition (the rest of the required Python methods are # auto-generated by the @total_ordering decorator) def __eq__(self, other): # noqa diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py index 97bb5de..5c87fbb 100755 --- a/rq/scripts/rqgenload.py +++ b/rq/scripts/rqgenload.py @@ -14,6 +14,7 @@ def parse_args(): opts, args = parser.parse_args() return (opts, args, parser) + def main(): import sys sys.path.insert(0, '.') @@ -25,12 +26,12 @@ def main(): queues = ('default', 'high', 'low') sample_calls = [ - (dummy.do_nothing, [], {}), - (dummy.sleep, [1], {}), - (dummy.fib, [8], {}), # normal result - (dummy.fib, [24], {}), # takes pretty long - (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc - (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) + (dummy.do_nothing, [], {}), + (dummy.sleep, [1], {}), + (dummy.fib, [8], {}), # normal result + (dummy.fib, [24], {}), # takes pretty long + (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc + (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) ] for i in range(opts.count): @@ -40,28 +41,27 @@ def main(): q = Queue(random.choice(queues)) q.enqueue(f, *args, **kwargs) - #q = Queue('foo') - #q.enqueue(do_nothing) - #q.enqueue(sleep, 3) - #q = Queue('bar') - #q.enqueue(yield_stuff) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) + # q = Queue('foo') + # q.enqueue(do_nothing) + # q.enqueue(sleep, 3) + # q = Queue('bar') + # q.enqueue(yield_stuff) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) if __name__ == '__main__': main() - diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 2f6af7a..48d318a 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -111,7 +111,7 @@ def show_workers(args): queues = dict([(q, []) for q in qs]) for w in ws: for q in w.queues: - if not q in queues: + if q not in queues: continue queues[q].append(w) diff --git a/rq/timeouts.py b/rq/timeouts.py index ae0fd48..afe385d 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -14,7 +14,7 @@ class JobTimeoutException(Exception): class BaseDeathPenalty(object): """Base class to setup job timeouts.""" - + def __init__(self, timeout): self._timeout = timeout @@ -45,7 +45,7 @@ class BaseDeathPenalty(object): class UnixSignalDeathPenalty(BaseDeathPenalty): - + def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' 'value (%d seconds).' % self._timeout) @@ -63,4 +63,4 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): default signal handling. """ signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) \ No newline at end of file + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 7bc7113..a44f412 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -68,7 +68,6 @@ class Worker(object): redis_workers_keys = 'rq:workers' death_penalty_class = UnixSignalDeathPenalty - @classmethod def all(cls, connection=None): """Returns an iterable of all Workers. @@ -140,8 +139,7 @@ class Worker(object): if exc_handler is not None: self.push_exc_handler(exc_handler) - - def validate_queues(self): # noqa + def validate_queues(self): """Sanity check for the given queues.""" if not iterable(self.queues): raise ValueError('Argument queues not iterable.') @@ -157,8 +155,7 @@ class Worker(object): """Returns the Redis keys representing this worker's queues.""" return map(lambda q: q.key, self.queues) - - @property # noqa + @property def name(self): """Returns the name of the worker, under which it is registered to the monitoring system. @@ -201,8 +198,7 @@ class Worker(object): """ setprocname('rq: %s' % (message,)) - - def register_birth(self): # noqa + def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s' % (self.name,)) if self.connection.exists(self.key) and \ @@ -326,8 +322,7 @@ class Worker(object): signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) - - def work(self, burst=False): # noqa + def work(self, burst=False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all diff --git a/tests/__init__.py b/tests/__init__.py index e704eaa..93f11b3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -79,5 +79,5 @@ class RQTestCase(unittest.TestCase): # Pop the connection to Redis testconn = pop_connection() - assert testconn == cls.testconn, 'Wow, something really nasty ' \ - 'happened to the Redis connection stack. Check your setup.' + assert testconn == cls.testconn, \ + 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' diff --git a/tests/helpers.py b/tests/helpers.py index 3ecf0dd..1475cf0 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,4 +7,3 @@ from datetime import timedelta def strip_microseconds(date): return date - timedelta(microseconds=date.microsecond) - diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 91c53e9..afb008f 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -44,7 +44,7 @@ class TestDecorator(RQTestCase): """Ensure that passing in result_ttl to the decorator sets the result_ttl on the job """ - #Ensure default + # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL) diff --git a/tests/test_queue.py b/tests/test_queue.py index a999b5e..1d467a6 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -23,8 +23,7 @@ class TestQueue(RQTestCase): q = Queue() self.assertEquals(q.name, 'default') - - def test_equality(self): # noqa + def test_equality(self): """Mathematical equality of queues.""" q1 = Queue('foo') q2 = Queue('foo') @@ -35,8 +34,7 @@ class TestQueue(RQTestCase): self.assertNotEquals(q1, q3) self.assertNotEquals(q2, q3) - - def test_empty_queue(self): # noqa + def test_empty_queue(self): """Emptying queues.""" q = Queue('example') @@ -109,8 +107,7 @@ class TestQueue(RQTestCase): self.assertEquals(q.count, 2) - - def test_enqueue(self): # noqa + def test_enqueue(self): """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -142,8 +139,7 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, q.name) self.assertIsNotNone(job.enqueued_at) - - def test_pop_job_id(self): # noqa + def test_pop_job_id(self): """Popping job IDs from queues.""" # Set up q = Queue() @@ -269,8 +265,8 @@ class TestQueue(RQTestCase): def test_enqueue_explicit_args(self): """enqueue() works for both implicit/explicit args.""" q = Queue() - - # Implicit args/kwargs mode + + # Implicit args/kwargs mode job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz') self.assertEqual(job.timeout, 1) self.assertEqual(job.result_ttl, 1) @@ -292,7 +288,6 @@ class TestQueue(RQTestCase): ((1,), {'timeout': 1, 'result_ttl': 1}) ) - def test_all_queues(self): """All queues""" q1 = Queue('first-queue') diff --git a/tests/test_worker.py b/tests/test_worker.py index 1255442..27e85bb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -26,16 +26,16 @@ class TestWorker(RQTestCase): fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) self.assertEquals(w.work(burst=True), False, - 'Did not expect any work on the queue.') + 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') def test_worker_ttl(self): """Worker ttl.""" w = Worker([]) - w.register_birth() # ugly: our test should only call public APIs + w.register_birth() # ugly: our test should only call public APIs [worker_key] = self.testconn.smembers(Worker.redis_workers_keys) self.assertIsNotNone(self.testconn.ttl(worker_key)) w.register_death() @@ -46,7 +46,7 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue('tests.fixtures.say_hello', name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Frank!') def test_work_is_unreadable(self): @@ -175,10 +175,9 @@ class TestWorker(RQTestCase): w = Worker([q]) # Put it on the queue with a timeout value - res = q.enqueue( - create_file_after_timeout, - args=(sentinel_file, 4), - timeout=1) + res = q.enqueue(create_file_after_timeout, + args=(sentinel_file, 4), + timeout=1) try: os.unlink(sentinel_file)