diff --git a/CHANGES.md b/CHANGES.md index f61488c..9d33ceb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,20 @@ +### 0.4.2 +(April 28th, 2014) + +- Add missing depends_on kwarg to @job decorator. Thanks, Sasha! + + +### 0.4.1 +(April 22nd, 2014) + +- Fix bug where RQ 0.4 workers could not unpickle/process jobs from RQ < 0.4. + + ### 0.4.0 -(not released yet) +(April 22nd, 2014) + +- Emptying the failed queue from the command line is now as simple as running + `rqinfo -X` or `rqinfo --empty-failed-queue`. - Job data is unpickled lazily. Thanks, Malthe! diff --git a/examples/fib.py b/examples/fib.py index 2130b3c..91606cb 100644 --- a/examples/fib.py +++ b/examples/fib.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + def slow_fib(n): if n <= 1: return 1 diff --git a/examples/run_example.py b/examples/run_example.py index cbcc6e9..d19a6dc 100644 --- a/examples/run_example.py +++ b/examples/run_example.py @@ -1,6 +1,12 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import os import time -from rq import Queue, Connection + +from rq import Connection, Queue + from fib import slow_fib diff --git a/examples/run_worker.py b/examples/run_worker.py index 7c8adae..4feb217 100644 --- a/examples/run_worker.py +++ b/examples/run_worker.py @@ -1,5 +1,8 @@ -from rq import Queue, Worker, Connection +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from rq import Connection, Queue, Worker if __name__ == '__main__': # Tell rq what Redis connection to use diff --git a/rq/__init__.py b/rq/__init__.py index 94e1dd1..95050f3 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,12 +1,13 @@ -from .connections import get_current_connection -from .connections import use_connection, push_connection, pop_connection -from .connections import Connection -from .queue import Queue, get_failed_queue -from .job import cancel_job, requeue_job -from .job import get_current_job -from .worker import Worker -from .version import VERSION +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from .connections import (Connection, get_current_connection, pop_connection, + push_connection, use_connection) +from .job import cancel_job, get_current_job, requeue_job +from .queue import get_failed_queue, Queue +from .version import VERSION +from .worker import Worker __all__ = [ 'use_connection', 'get_current_connection', diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index ac9b7a9..5e660b5 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import sys @@ -17,17 +21,17 @@ else: """Class decorator that fills in missing ordering methods""" convert = { '__lt__': [('__gt__', lambda self, other: other < self), - ('__le__', lambda self, other: not other < self), - ('__ge__', lambda self, other: not self < other)], + ('__le__', lambda self, other: not other < self), + ('__ge__', lambda self, other: not self < other)], '__le__': [('__ge__', lambda self, other: other <= self), - ('__lt__', lambda self, other: not other <= self), - ('__gt__', lambda self, other: not self <= other)], + ('__lt__', lambda self, other: not other <= self), + ('__gt__', lambda self, other: not self <= other)], '__gt__': [('__lt__', lambda self, other: other > self), - ('__ge__', lambda self, other: not other > self), - ('__le__', lambda self, other: not self > other)], + ('__ge__', lambda self, other: not other > self), + ('__le__', lambda self, other: not self > other)], '__ge__': [('__le__', lambda self, other: other >= self), - ('__gt__', lambda self, other: not other >= self), - ('__lt__', lambda self, other: not self >= other)] + ('__gt__', lambda self, other: not other >= self), + ('__lt__', lambda self, other: not self >= other)] } roots = set(dir(cls)) & set(convert) if not roots: @@ -35,27 +39,17 @@ else: root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ for opname, opfunc in convert[root]: if opname not in roots: - opfunc.__name__ = opname + opfunc.__name__ = str(opname) opfunc.__doc__ = getattr(int, opname).__doc__ setattr(cls, opname, opfunc) return cls -PY2 = sys.version_info[0] < 3 - -if PY2: - string_types = (str, unicode) - text_type = unicode - - def as_text(v): - return v - - def decode_redis_hash(h): - return h - -else: - string_types = (str,) +PY2 = sys.version_info[0] == 2 +if not PY2: + # Python 3.x and up text_type = str + string_types = (str,) def as_text(v): if v is None: @@ -69,3 +63,13 @@ else: def decode_redis_hash(h): return dict((as_text(k), h[k]) for k in h) +else: + # Python 2.x + text_type = unicode + string_types = (str, unicode) + + def as_text(v): + return v + + def decode_redis_hash(h): + return h diff --git a/rq/compat/connections.py b/rq/compat/connections.py index 0374b5b..ac22af3 100644 --- a/rq/compat/connections.py +++ b/rq/compat/connections.py @@ -1,6 +1,11 @@ -from redis import Redis, StrictRedis +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from functools import partial +from redis import Redis, StrictRedis + def fix_return_type(func): # deliberately no functools.wraps() call here, since the function being diff --git a/rq/connections.py b/rq/connections.py index ee07070..ae4d036 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,7 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from contextlib import contextmanager + from redis import StrictRedis -from .local import LocalStack, release_local + from .compat.connections import patch_connection +from .local import LocalStack, release_local class NoRedisConnectionException(Exception): diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 2a20b9f..447c002 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + import logging from rq import get_current_connection from rq import Worker diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 4b776d1..5608e63 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -1,16 +1,21 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + def register_sentry(client, worker): """Given a Raven client and an RQ worker, registers exception handlers with the worker so exceptions are logged to Sentry. """ def send_to_sentry(job, *exc_info): client.captureException( - exc_info=exc_info, - extra={ - 'job_id': job.id, - 'func': job.func_name, - 'args': job.args, - 'kwargs': job.kwargs, - 'description': job.description, - }) + exc_info=exc_info, + extra={ + 'job_id': job.id, + 'func': job.func_name, + 'args': job.args, + 'kwargs': job.kwargs, + 'description': job.description, + }) worker.push_exc_handler(send_to_sentry) diff --git a/rq/decorators.py b/rq/decorators.py index 07a7a83..3d940be 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,7 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from functools import wraps + +from rq.compat import string_types + from .queue import Queue from .worker import DEFAULT_RESULT_TTL -from rq.compat import string_types class job(object): diff --git a/rq/dummy.py b/rq/dummy.py index ee9022b..cdd1afc 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -1,8 +1,12 @@ +# -*- coding: utf-8 -*- """ Some dummy tasks that are well-suited for generating load for testing purposes. """ -import time +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import random +import time def do_nothing(): diff --git a/rq/exceptions.py b/rq/exceptions.py index 25e4f0e..94b22bf 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,3 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + + class NoSuchJobError(Exception): pass diff --git a/rq/job.py b/rq/job.py index 9e9f207..29562d6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,19 +1,31 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import inspect from uuid import uuid4 + +from rq.compat import as_text, decode_redis_hash, string_types, text_type + +from .connections import resolve_connection +from .exceptions import NoSuchJobError, UnpickleError +from .local import LocalStack +from .utils import import_attribute, utcformat, utcnow, utcparse + try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa from pickle import loads, dumps, UnpicklingError # noqa -from .local import LocalStack -from .connections import resolve_connection -from .exceptions import UnpickleError, NoSuchJobError -from .utils import import_attribute, utcnow, utcformat, utcparse -from rq.compat import text_type, decode_redis_hash, as_text def enum(name, *sequential, **named): values = dict(zip(sequential, range(len(sequential))), **named) - return type(name, (), values) + + # NOTE: Yes, we *really* want to cast using str() here. + # On Python 2 type() requires a byte string (which is str() on Python 2). + # On Python 3 it does not matter, so we'll use str(), which acts as + # a no-op. + return type(str(name), (), values) Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', @@ -96,8 +108,10 @@ class Job(object): job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): job._func_name = '%s.%s' % (func.__module__, func.__name__) - else: # we expect a string - job._func_name = func + elif isinstance(func, string_types): + job._func_name = as_text(func) + else: + raise TypeError('Expected a function/method/string, but got: {}'.format(func)) job._args = args job._kwargs = kwargs @@ -433,7 +447,7 @@ class Job(object): cancellation. Technically, this call is (currently) the same as just deleting the job hash. """ - pipeline = self.connection.pipeline() + pipeline = self.connection._pipeline() self.delete(pipeline=pipeline) pipeline.delete(self.dependents_key) pipeline.execute() @@ -448,9 +462,13 @@ class Job(object): """Invokes the job function with the job arguments.""" _job_stack.push(self.id) try: + self.set_status(Status.STARTED) self._result = self.func(*self.args, **self.kwargs) + self.set_status(Status.FINISHED) + self.ended_at = utcnow() finally: assert self.id == _job_stack.pop() + return self._result def get_ttl(self, default_ttl=None): diff --git a/rq/logutils.py b/rq/logutils.py index 25e9774..0da1d9a 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import logging # Make sure that dictConfig is available @@ -24,7 +28,7 @@ def setup_loghandlers(level=None): "handlers": { "console": { "level": "DEBUG", - #"class": "logging.StreamHandler", + # "class": "logging.StreamHandler", "class": "rq.utils.ColorizingStreamHandler", "formatter": "console", "exclude": ["%(asctime)s"], diff --git a/rq/queue.py b/rq/queue.py index 49d43d7..cc7fd13 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,3 +1,7 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import uuid from .connections import resolve_connection @@ -65,10 +69,25 @@ class Queue(object): def empty(self): """Removes all messages on the queue.""" - job_list = self.get_jobs() - self.connection.delete(self.key) - for job in job_list: - job.cancel() + script = b""" + local prefix = "rq:job:" + local q = KEYS[1] + local count = 0 + while true do + local job_id = redis.call("lpop", q) + if job_id == false then + break + end + + -- Delete the relevant keys + redis.call("del", prefix..job_id) + redis.call("del", prefix..job_id..":dependents") + count = count + 1 + end + return count + """ + script = self.connection.register_script(script) + return script(keys=[self.key]) def is_empty(self): """Returns whether the current queue is empty.""" @@ -129,12 +148,10 @@ class Queue(object): if Job.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - - def push_job_id(self, job_id): # noqa + def push_job_id(self, job_id): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): """Creates a job to represent the delayed function call and enqueues @@ -190,17 +207,14 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) - timeout = None - description = None - result_ttl = None - depends_on = None - if 'args' in kwargs or 'kwargs' in kwargs or 'depends_on' in kwargs: + timeout = kwargs.pop('timeout', None) + description = kwargs.pop('description', None) + result_ttl = kwargs.pop('result_ttl', None) + depends_on = kwargs.pop('depends_on', None) + + if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa - timeout = kwargs.pop('timeout', None) - description = kwargs.pop('description', None) args = kwargs.pop('args', None) - result_ttl = kwargs.pop('result_ttl', None) - depends_on = kwargs.pop('depends_on', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, @@ -331,7 +345,6 @@ class Queue(object): raise e return job, queue - # Total ordering defition (the rest of the required Python methods are # auto-generated by the @total_ordering decorator) def __eq__(self, other): # noqa diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 575a8cb..a797286 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -1,7 +1,15 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import importlib -import redis +import os +from functools import partial from warnings import warn + +import redis from rq import use_connection +from rq.utils import first def add_standard_arguments(parser): @@ -32,31 +40,20 @@ def read_config_file(module): def setup_default_arguments(args, settings): """ Sets up args from settings or defaults """ - if args.url is None: - args.url = settings.get('REDIS_URL') + args.url = first([args.url, settings.get('REDIS_URL'), os.environ.get('RQ_REDIS_URL')]) if (args.host or args.port or args.socket or args.db or args.password): warn('Host, port, db, password options for Redis will not be ' 'supported in future versions of RQ. ' 'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning) - if args.host is None: - args.host = settings.get('REDIS_HOST', 'localhost') - - if args.port is None: - args.port = int(settings.get('REDIS_PORT', 6379)) - else: - args.port = int(args.port) - - socket = settings.get('REDIS_SOCKET', False) - if args.socket is None and socket: - args.socket = socket - - if args.db is None: - args.db = settings.get('REDIS_DB', 0) + strict_first = partial(first, key=lambda obj: obj is not None) - if args.password is None: - args.password = settings.get('REDIS_PASSWORD', None) + args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost']) + args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379])) + args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), None]) + args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0]) + args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')]) def setup_redis(args): diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py index b643c49..5c87fbb 100755 --- a/rq/scripts/rqgenload.py +++ b/rq/scripts/rqgenload.py @@ -1,7 +1,11 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import optparse -from rq import use_connection, Queue -from rq import dummy + +from rq import dummy, Queue, use_connection def parse_args(): @@ -10,6 +14,7 @@ def parse_args(): opts, args = parser.parse_args() return (opts, args, parser) + def main(): import sys sys.path.insert(0, '.') @@ -21,12 +26,12 @@ def main(): queues = ('default', 'high', 'low') sample_calls = [ - (dummy.do_nothing, [], {}), - (dummy.sleep, [1], {}), - (dummy.fib, [8], {}), # normal result - (dummy.fib, [24], {}), # takes pretty long - (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc - (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) + (dummy.do_nothing, [], {}), + (dummy.sleep, [1], {}), + (dummy.fib, [8], {}), # normal result + (dummy.fib, [24], {}), # takes pretty long + (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc + (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) ] for i in range(opts.count): @@ -36,28 +41,27 @@ def main(): q = Queue(random.choice(queues)) q.enqueue(f, *args, **kwargs) - #q = Queue('foo') - #q.enqueue(do_nothing) - #q.enqueue(sleep, 3) - #q = Queue('bar') - #q.enqueue(yield_stuff) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) - #q.enqueue(do_nothing) + # q = Queue('foo') + # q.enqueue(do_nothing) + # q.enqueue(sleep, 3) + # q = Queue('bar') + # q.enqueue(yield_stuff) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) + # q.enqueue(do_nothing) if __name__ == '__main__': main() - diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 73c1a7c..48d318a 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -1,16 +1,18 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import sys +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import argparse import os +import sys import time -import argparse + from redis.exceptions import ConnectionError -from rq import Queue, Worker +from rq import get_failed_queue, Queue, Worker +from rq.scripts import (add_standard_arguments, read_config_file, + setup_default_arguments, setup_redis) from rq.utils import gettermsize, make_colorizer -from rq.scripts import add_standard_arguments -from rq.scripts import setup_redis -from rq.scripts import read_config_file -from rq.scripts import setup_default_arguments red = make_colorizer('darkred') green = make_colorizer('darkgreen') @@ -101,22 +103,22 @@ def show_workers(args): for w in ws: worker_queues = filter_queues(w.queue_names()) if not args.raw: - print('%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(worker_queues))) + print('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) else: - print('worker %s %s %s' % (w.name, w.state, ','.join(worker_queues))) + print('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) else: # Create reverse lookup table queues = dict([(q, []) for q in qs]) for w in ws: for q in w.queues: - if not q in queues: + if q not in queues: continue queues[q].append(w) max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 for q in queues: if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.state)), queues[q]))) + queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa else: queues_str = '–' print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) @@ -140,11 +142,12 @@ def parse_args(): parser = argparse.ArgumentParser(description='RQ command-line monitor.') add_standard_arguments(parser) parser.add_argument('--path', '-P', default='.', help='Specify the import path.') - parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') - parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') - parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') - parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') - parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') + parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa + parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa + parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa + parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa + parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa + parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa parser.add_argument('queues', nargs='*', help='The queues to poll') return parser.parse_args() @@ -173,15 +176,20 @@ def main(): setup_default_arguments(args, settings) setup_redis(args) + try: - if args.only_queues: - func = show_queues - elif args.only_workers: - func = show_workers + if args.empty_failed_queue: + num_jobs = get_failed_queue().empty() + print('{} jobs removed from failed queue'.format(num_jobs)) else: - func = show_both + if args.only_queues: + func = show_queues + elif args.only_workers: + func = show_workers + else: + func = show_both - interval(args.interval, func, args) + interval(args.interval, func, args) except ConnectionError as e: print(e) sys.exit(1) diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index c7e3fce..84f68f7 100644 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -1,15 +1,20 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import argparse import logging import logging.config import os import sys -from rq import Queue -from rq.logutils import setup_loghandlers from redis.exceptions import ConnectionError +from rq import Queue from rq.contrib.legacy import cleanup_ghosts -from rq.scripts import add_standard_arguments, read_config_file, setup_default_arguments, setup_redis +from rq.logutils import setup_loghandlers +from rq.scripts import (add_standard_arguments, read_config_file, + setup_default_arguments, setup_redis) from rq.utils import import_attribute logger = logging.getLogger(__name__) diff --git a/rq/timeouts.py b/rq/timeouts.py index f1e1848..afe385d 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -1,3 +1,7 @@ + # -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import signal @@ -8,7 +12,9 @@ class JobTimeoutException(Exception): pass -class death_penalty_after(object): +class BaseDeathPenalty(object): + """Base class to setup job timeouts.""" + def __init__(self, timeout): self._timeout = timeout @@ -31,6 +37,15 @@ class death_penalty_after(object): # invoking context. return False + def setup_death_penalty(self): + raise NotImplementedError() + + def cancel_death_penalty(self): + raise NotImplementedError() + + +class UnixSignalDeathPenalty(BaseDeathPenalty): + def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' 'value (%d seconds).' % self._timeout) diff --git a/rq/utils.py b/rq/utils.py index f182210..e1786cb 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -5,6 +5,9 @@ Miscellaneous helper functions. The formatter for ANSI colored console output is heavily based on Pygments terminal colorizing code, originally by Georg Brandl. """ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import importlib import datetime import logging @@ -176,8 +179,51 @@ def utcnow(): def utcformat(dt): - return dt.strftime(u"%Y-%m-%dT%H:%M:%SZ") + return dt.strftime(u'%Y-%m-%dT%H:%M:%SZ') def utcparse(string): - return datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ") + try: + return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ') + except ValueError: + # This catches RQ < 0.4 datetime format + return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%S.%f+00:00') + + +def first(iterable, default=None, key=None): + """ + Return first element of `iterable` that evaluates true, else return None + (or an optional default value). + + >>> first([0, False, None, [], (), 42]) + 42 + + >>> first([0, False, None, [], ()]) is None + True + + >>> first([0, False, None, [], ()], default='ohai') + 'ohai' + + >>> import re + >>> m = first(re.match(regex, 'abc') for regex in ['b.*', 'a(.*)']) + >>> m.group(1) + 'bc' + + The optional `key` argument specifies a one-argument predicate function + like that used for `filter()`. The `key` argument, if supplied, must be + in keyword form. For example: + + >>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0) + 4 + + """ + if key is None: + for el in iterable: + if el: + return el + else: + for el in iterable: + if key(el): + return el + + return default diff --git a/rq/version.py b/rq/version.py index 954b701..1497719 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1,4 @@ -VERSION = '0.3.13' +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +VERSION = '0.4.2' diff --git a/rq/worker.py b/rq/worker.py index df4812c..a44f412 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,26 +1,33 @@ -import sys -import os +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import errno +import logging +import os import random -import time -try: - from procname import setprocname -except ImportError: - def setprocname(*args, **kwargs): # noqa - pass -import socket import signal +import socket +import sys +import time import traceback -import logging -from .queue import Queue, get_failed_queue + +from rq.compat import as_text, text_type + from .connections import get_current_connection +from .exceptions import DequeueTimeout, NoQueueError from .job import Job, Status -from .utils import make_colorizer, utcnow, utcformat from .logutils import setup_loghandlers -from .exceptions import NoQueueError, DequeueTimeout -from .timeouts import death_penalty_after +from .queue import get_failed_queue, Queue +from .timeouts import UnixSignalDeathPenalty +from .utils import make_colorizer, utcformat, utcnow from .version import VERSION -from rq.compat import text_type, as_text + +try: + from procname import setprocname +except ImportError: + def setprocname(*args, **kwargs): # noqa + pass green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -59,6 +66,7 @@ def signal_name(signum): class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' + death_penalty_class = UnixSignalDeathPenalty @classmethod def all(cls, connection=None): @@ -98,8 +106,8 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=DEFAULT_RESULT_TTL, connection=None, - exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa + default_result_ttl=None, connection=None, + exc_handler=None, default_worker_ttl=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -109,8 +117,15 @@ class Worker(object): self.queues = queues self.validate_queues() self._exc_handlers = [] + + if default_result_ttl is None: + default_result_ttl = DEFAULT_RESULT_TTL self.default_result_ttl = default_result_ttl + + if default_worker_ttl is None: + default_worker_ttl = DEFAULT_WORKER_TTL self.default_worker_ttl = default_worker_ttl + self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -124,8 +139,7 @@ class Worker(object): if exc_handler is not None: self.push_exc_handler(exc_handler) - - def validate_queues(self): # noqa + def validate_queues(self): """Sanity check for the given queues.""" if not iterable(self.queues): raise ValueError('Argument queues not iterable.') @@ -141,8 +155,7 @@ class Worker(object): """Returns the Redis keys representing this worker's queues.""" return map(lambda q: q.key, self.queues) - - @property # noqa + @property def name(self): """Returns the name of the worker, under which it is registered to the monitoring system. @@ -185,8 +198,7 @@ class Worker(object): """ setprocname('rq: %s' % (message,)) - - def register_birth(self): # noqa + def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s' % (self.name,)) if self.connection.exists(self.key) and \ @@ -310,8 +322,7 @@ class Worker(object): signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) - - def work(self, burst=False): # noqa + def work(self, burst=False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -332,12 +343,7 @@ class Worker(object): if self.stopped: self.log.info('Stopping on request.') break - self.set_state('idle') - qnames = self.queue_names() - self.procline('Listening on %s' % ','.join(qnames)) - self.log.info('') - self.log.info('*** Listening on %s...' % - green(', '.join(qnames))) + timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -346,21 +352,10 @@ class Worker(object): except StopRequested: break - self.set_state('busy') - job, queue = result - self.set_current_job_id(job.id) - - # Use the public setter here, to immediately update Redis - job.set_status(Status.STARTED) - self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) - - self.heartbeat((job.timeout or 180) + 60) self.execute_job(job) self.heartbeat() - self.set_current_job_id(None) - + if job.get_status() == Status.FINISHED: queue.enqueue_dependents(job) @@ -372,11 +367,25 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None + qnames = self.queue_names() + + self.set_state('idle') + self.procline('Listening on %s' % ','.join(qnames)) + self.log.info('') + self.log.info('*** Listening on %s...' % + green(', '.join(qnames))) + while True: self.heartbeat() + try: result = Queue.dequeue_any(self.queues, timeout, connection=self.connection) + if result is not None: + job, queue = result + self.log.info('%s: %s (%s)' % (green(queue.name), + blue(job.description), job.id)) + break except DequeueTimeout: pass @@ -453,25 +462,31 @@ class Worker(object): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ + + self.set_state('busy') + self.set_current_job_id(job.id) + self.heartbeat((job.timeout or 180) + 60) + self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) with self.connection._pipeline() as pipeline: try: - with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): + with self.death_penalty_class(job.timeout or Queue.DEFAULT_TIMEOUT): rv = job.perform() # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails job._result = rv - job._status = Status.FINISHED - job.ended_at = utcnow() + + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() except Exception: diff --git a/tests/__init__.py b/tests/__init__.py index 0a23efc..93f11b3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,14 +1,18 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import logging -import mock + +from redis import StrictRedis +from rq import pop_connection, push_connection from rq.compat import is_python_version + if is_python_version((2, 7), (3, 2)): import unittest else: import unittest2 as unittest # noqa -from redis import StrictRedis -from rq import push_connection, pop_connection - def find_empty_redis_database(): """Tries to connect to a random Redis database (starting from 4), and @@ -75,5 +79,5 @@ class RQTestCase(unittest.TestCase): # Pop the connection to Redis testconn = pop_connection() - assert testconn == cls.testconn, 'Wow, something really nasty ' \ - 'happened to the Redis connection stack. Check your setup.' + assert testconn == cls.testconn, \ + 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' diff --git a/tests/dummy_settings.py b/tests/dummy_settings.py index 1404250..dbc935f 100644 --- a/tests/dummy_settings.py +++ b/tests/dummy_settings.py @@ -1 +1,5 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + REDIS_HOST = "testhost.example.com" diff --git a/tests/fixtures.py b/tests/fixtures.py index 337fc70..a375562 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,11 +1,15 @@ +# -*- coding: utf-8 -*- """ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import time -from rq import Connection + +from rq import Connection, get_current_job from rq.decorators import job -from rq import get_current_job def say_hello(name=None): @@ -49,6 +53,10 @@ def access_self(): return job.id +def echo(*args, **kwargs): + return (args, kwargs) + + class Number(object): def __init__(self, value): self.value = value diff --git a/tests/helpers.py b/tests/helpers.py index 16f1717..1475cf0 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,6 +1,9 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from datetime import timedelta def strip_microseconds(date): return date - timedelta(microseconds=date.microsecond) - diff --git a/tests/test_connection.py b/tests/test_connection.py index e0d149d..445e33f 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,7 +1,11 @@ -from tests import RQTestCase, find_empty_redis_database +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import Connection, Queue + +from tests import find_empty_redis_database, RQTestCase from tests.fixtures import do_nothing -from rq import Queue -from rq import Connection def new_connection(): diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 8af464f..45b2a1d 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -1,12 +1,16 @@ -from redis import StrictRedis - -from tests import RQTestCase, mock -from tests.fixtures import decorated_job +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from redis import StrictRedis from rq.decorators import job from rq.job import Job from rq.worker import DEFAULT_RESULT_TTL +from tests import mock, RQTestCase +from tests.fixtures import decorated_job + + class TestDecorator(RQTestCase): def setUp(self): @@ -41,7 +45,7 @@ class TestDecorator(RQTestCase): """Ensure that passing in result_ttl to the decorator sets the result_ttl on the job """ - #Ensure default + # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL) diff --git a/tests/test_job.py b/tests/test_job.py index e93b8cb..bbe5f7a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,16 +1,23 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from datetime import datetime + +from rq.compat import as_text, PY2 +from rq.exceptions import NoSuchJobError, UnpickleError +from rq.job import get_current_job, Job +from rq.queue import Queue +from rq.utils import utcformat + from tests import RQTestCase -from tests.fixtures import Number, some_calculation, say_hello, access_self +from tests.fixtures import access_self, Number, say_hello, some_calculation from tests.helpers import strip_microseconds + try: from cPickle import loads, dumps except ImportError: from pickle import loads, dumps -from rq.compat import as_text -from rq.job import Job, get_current_job -from rq.exceptions import NoSuchJobError, UnpickleError -from rq.queue import Queue -from rq.utils import utcformat class TestJob(RQTestCase): @@ -240,16 +247,19 @@ class TestJob(RQTestCase): def test_description_is_persisted(self): """Ensure that job's custom description is set properly""" - job = Job.create(func=say_hello, args=('Lionel',), description=u'Say hello!') + job = Job.create(func=say_hello, args=('Lionel',), description='Say hello!') job.save() Job.fetch(job.id, connection=self.testconn) - self.assertEqual(job.description, u'Say hello!') + self.assertEqual(job.description, 'Say hello!') # Ensure job description is constructed from function call string job = Job.create(func=say_hello, args=('Lionel',)) job.save() Job.fetch(job.id, connection=self.testconn) - self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + if PY2: + self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')") + else: + self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") def test_job_access_within_job_function(self): """The current job is accessible within the job function.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 6dde3ea..1d467a6 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,9 +1,15 @@ -from tests import RQTestCase -from tests.fixtures import Number, div_by_zero, say_hello, some_calculation -from rq import Queue, get_failed_queue +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import get_failed_queue, Queue +from rq.exceptions import InvalidJobOperationError from rq.job import Job, Status from rq.worker import Worker -from rq.exceptions import InvalidJobOperationError + +from tests import RQTestCase +from tests.fixtures import (div_by_zero, echo, Number, say_hello, + some_calculation) class TestQueue(RQTestCase): @@ -17,8 +23,7 @@ class TestQueue(RQTestCase): q = Queue() self.assertEquals(q.name, 'default') - - def test_equality(self): # noqa + def test_equality(self): """Mathematical equality of queues.""" q1 = Queue('foo') q2 = Queue('foo') @@ -29,8 +34,7 @@ class TestQueue(RQTestCase): self.assertNotEquals(q1, q3) self.assertNotEquals(q2, q3) - - def test_empty_queue(self): # noqa + def test_empty_queue(self): """Emptying queues.""" q = Queue('example') @@ -103,8 +107,7 @@ class TestQueue(RQTestCase): self.assertEquals(q.count, 2) - - def test_enqueue(self): # noqa + def test_enqueue(self): """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -136,8 +139,7 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, q.name) self.assertIsNotNone(job.enqueued_at) - - def test_pop_job_id(self): # noqa + def test_pop_job_id(self): """Popping job IDs from queues.""" # Set up q = Queue() @@ -260,6 +262,32 @@ class TestQueue(RQTestCase): job = q.enqueue(say_hello) self.assertEqual(job.get_status(), Status.QUEUED) + def test_enqueue_explicit_args(self): + """enqueue() works for both implicit/explicit args.""" + q = Queue() + + # Implicit args/kwargs mode + job = q.enqueue(echo, 1, timeout=1, result_ttl=1, bar='baz') + self.assertEqual(job.timeout, 1) + self.assertEqual(job.result_ttl, 1) + self.assertEqual( + job.perform(), + ((1,), {'bar': 'baz'}) + ) + + # Explicit kwargs mode + kwargs = { + 'timeout': 1, + 'result_ttl': 1, + } + job = q.enqueue(echo, timeout=2, result_ttl=2, args=[1], kwargs=kwargs) + self.assertEqual(job.timeout, 2) + self.assertEqual(job.result_ttl, 2) + self.assertEqual( + job.perform(), + ((1,), {'timeout': 1, 'result_ttl': 1}) + ) + def test_all_queues(self): """All queues""" q1 = Queue('first-queue') diff --git a/tests/test_scripts.py b/tests/test_scripts.py index ae661c7..88689c3 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -1,9 +1,14 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + from rq.compat import is_python_version +from rq.scripts import read_config_file + if is_python_version((2, 7), (3, 2)): from unittest import TestCase else: from unittest2 import TestCase # noqa -from rq.scripts import read_config_file class TestScripts(TestCase): diff --git a/tests/test_sentry.py b/tests/test_sentry.py index 3fa4737..ba8d1ec 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -1,7 +1,12 @@ -from tests import RQTestCase -from rq import Queue, Worker, get_failed_queue +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import get_failed_queue, Queue, Worker from rq.contrib.sentry import register_sentry +from tests import RQTestCase + class FakeSentry(object): def captureException(self, *args, **kwds): diff --git a/tests/test_worker.py b/tests/test_worker.py index b7147fe..27e85bb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,12 +1,18 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + import os -from tests import RQTestCase, slow -from tests.fixtures import say_hello, div_by_zero, create_file, \ - create_file_after_timeout -from tests.helpers import strip_microseconds -from rq import Queue, Worker, get_failed_queue + +from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status +from tests import RQTestCase, slow +from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, + say_hello) +from tests.helpers import strip_microseconds + class TestWorker(RQTestCase): def test_create_worker(self): @@ -20,16 +26,16 @@ class TestWorker(RQTestCase): fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) self.assertEquals(w.work(burst=True), False, - 'Did not expect any work on the queue.') + 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') def test_worker_ttl(self): """Worker ttl.""" w = Worker([]) - w.register_birth() # ugly: our test should only call public APIs + w.register_birth() # ugly: our test should only call public APIs [worker_key] = self.testconn.smembers(Worker.redis_workers_keys) self.assertIsNotNone(self.testconn.ttl(worker_key)) w.register_death() @@ -40,7 +46,7 @@ class TestWorker(RQTestCase): w = Worker([q]) job = q.enqueue('tests.fixtures.say_hello', name='Frank') self.assertEquals(w.work(burst=True), True, - 'Expected at least some work done.') + 'Expected at least some work done.') self.assertEquals(job.result, 'Hi there, Frank!') def test_work_is_unreadable(self): @@ -169,10 +175,9 @@ class TestWorker(RQTestCase): w = Worker([q]) # Put it on the queue with a timeout value - res = q.enqueue( - create_file_after_timeout, - args=(sentinel_file, 4), - timeout=1) + res = q.enqueue(create_file_after_timeout, + args=(sentinel_file, 4), + timeout=1) try: os.unlink(sentinel_file) diff --git a/tox.ini b/tox.ini index c167801..81bb844 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=py26,py27,py33,pypy +envlist=py26,py27,py33,py34,pypy [testenv] commands=py.test []