Merge branch 'master' into cancel_remove

main
glaslos 9 years ago
commit 02844034d7

3
.gitignore vendored

@ -10,6 +10,3 @@
.tox .tox
.vagrant .vagrant
Vagrantfile Vagrantfile
# PyCharm
.idea

@ -0,0 +1,6 @@
Cal Leeming <cal@iops.io> <cal.leeming@simplicitymedialtd.co.uk>
Mark LaPerriere <marklap@gmail.com> <mark.a.laperriere@disney.com>
Selwin Ong <selwin.ong@gmail.com> <selwin@ui.co.id>
Vincent Driessen <me@nvie.com> <vincent@3rdcloud.com>
Vincent Driessen <me@nvie.com> <vincent@datafox.nl>
zhangliyong <lyzhang87@gmail.com> <zhangliyong@umeng.com>

@ -1,3 +1,4 @@
sudo: false
language: python language: python
services: services:
- redis - redis
@ -14,6 +15,6 @@ install:
- pip install coveralls --use-mirrors - pip install coveralls --use-mirrors
#- pip install pytest # installed by Travis by default already #- pip install pytest # installed by Travis by default already
script: script:
- py.test --cov rq - RUN_SLOW_TESTS_TOO=1 py.test --cov rq
after_success: after_success:
- coveralls - coveralls

@ -1,3 +1,81 @@
### 0.5.6
- Job results are now logged on `DEBUG` level. Thanks @tbaugis!
- Modified `patch_connection` so Redis connection can be easily mocked
- Customer exception handlers are now called if Redis connection is lost. Thanks @jlopex!
- Jobs can now depend on jobs in a different queue. Thanks @jlopex!
### 0.5.5
(August 25th, 2015)
- Add support for `--exception-handler` command line flag
- Fix compatibility with click>=5.0
- Fix maximum recursion depth problem for very large queues that contain jobs
that all fail
### 0.5.4
(July 8th, 2015)
- Fix compatibility with raven>=5.4.0
### 0.5.3
(June 3rd, 2015)
- Better API for instantiating Workers. Thanks @RyanMTB!
- Better support for unicode kwargs. Thanks @nealtodd and @brownstein!
- Workers now automatically cleans up job registries every hour
- Jobs in `FailedQueue` now have their statuses set properly
- `enqueue_call()` no longer ignores `ttl`. Thanks @mbodock!
- Improved logging. Thanks @trevorprater!
### 0.5.2
(April 14th, 2015)
- Support SSL connection to Redis (requires redis-py>=2.10)
- Fix to prevent deep call stacks with large queues
### 0.5.1
(March 9th, 2015)
- Resolve performance issue when queues contain many jobs
- Restore the ability to specify connection params in config
- Record `birth_date` and `death_date` on Worker
- Add support for SSL URLs in Redis (and `REDIS_SSL` config option)
- Fix encoding issues with non-ASCII characters in function arguments
- Fix Redis transaction management issue with job dependencies
### 0.5.0
(Jan 30th, 2015)
- RQ workers can now be paused and resumed using `rq suspend` and
`rq resume` commands. Thanks Jonathan Tushman!
- Jobs that are being performed are now stored in `StartedJobRegistry`
for monitoring purposes. This also prevents currently active jobs from
being orphaned/lost in the case of hard shutdowns.
- You can now monitor finished jobs by checking `FinishedJobRegistry`.
Thanks Nic Cope for helping!
- Jobs with unmet dependencies are now created with `deferred` as their
status. You can monitor deferred jobs by checking `DeferredJobRegistry`.
- It is now possible to enqueue a job at the beginning of queue using
`queue.enqueue(func, at_front=True)`. Thanks Travis Johnson!
- Command line scripts have all been refactored to use `click`. Thanks Lyon Zhang!
- Added a new `SimpleWorker` that does not fork when executing jobs.
Useful for testing purposes. Thanks Cal Leeming!
- Added `--queue-class` and `--job-class` arguments to `rqworker` script.
Thanks David Bonner!
- Many other minor bug fixes and enhancements.
### 0.4.6 ### 0.4.6
(May 21st, 2014) (May 21st, 2014)

@ -0,0 +1,18 @@
all:
@grep -Ee '^[a-z].*:' Makefile | cut -d: -f1 | grep -vF all
clean:
rm -rf build/ dist/
release: clean
# Check if latest tag is the current head we're releasing
echo "Latest tag = $$(git tag | sort -nr | head -n1)"
echo "HEAD SHA = $$(git sha head)"
echo "Latest tag SHA = $$(git tag | sort -nr | head -n1 | xargs git sha)"
@test "$$(git sha head)" = "$$(git tag | sort -nr | head -n1 | xargs git sha)"
make force_release
force_release: clean
git push --tags
python setup.py sdist bdist_wheel
twine upload dist/*

@ -3,13 +3,15 @@ them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It should be integrated in your web stack to have a low barrier to entry. It should be integrated in your web stack
easily. easily.
RQ requires Redis >= 2.6.0. RQ requires Redis >= 2.7.0.
[![Build status](https://travis-ci.org/nvie/rq.svg?branch=master)](https://secure.travis-ci.org/nvie/rq) [![Build status](https://travis-ci.org/nvie/rq.svg?branch=master)](https://secure.travis-ci.org/nvie/rq)
[![Downloads](https://pypip.in/d/rq/badge.svg)](https://pypi.python.org/pypi/rq) [![Downloads](https://img.shields.io/pypi/dm/rq.svg)](https://pypi.python.org/pypi/rq)
[![Can I Use Python 3?](https://caniusepython3.com/project/rq.svg)](https://caniusepython3.com/project/rq) [![Can I Use Python 3?](https://caniusepython3.com/project/rq.svg)](https://caniusepython3.com/project/rq)
[![Coverage Status](https://img.shields.io/coveralls/nvie/rq.svg)](https://coveralls.io/r/nvie/rq) [![Coverage Status](https://img.shields.io/coveralls/nvie/rq.svg)](https://coveralls.io/r/nvie/rq)
Full documentation can be found [here][d].
## Getting started ## Getting started
@ -33,7 +35,7 @@ def count_words_at_url(url):
You do use the excellent [requests][r] package, don't you? You do use the excellent [requests][r] package, don't you?
Then, create a RQ queue: Then, create an RQ queue:
```python ```python
from rq import Queue, use_connection from rq import Queue, use_connection
@ -85,7 +87,7 @@ and [this snippet][3], and has been created as a lightweight alternative to the
heaviness of Celery or other AMQP-based queueing implementations. heaviness of Celery or other AMQP-based queueing implementations.
[r]: http://python-requests.org [r]: http://python-requests.org
[d]: http://nvie.github.com/rq/docs/ [d]: http://python-rq.org/
[m]: http://pypi.python.org/pypi/mailer [m]: http://pypi.python.org/pypi/mailer
[p]: http://docs.python.org/library/pickle.html [p]: http://docs.python.org/library/pickle.html
[1]: http://www.celeryproject.org/ [1]: http://www.celeryproject.org/

@ -1,2 +1,2 @@
redis redis>=2.7
click click>=3.0.0

@ -16,17 +16,33 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, from .helpers import (get_redis_from_config, read_config_file, refresh,
show_both, show_queues, show_workers) setup_loghandlers_from_args, show_both, show_queues,
show_workers)
# Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
click.disable_unicode_literals_warning = True
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL', url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.') help='URL describing Redis connection details.')
config_option = click.option('--config', '-c',
help='Module containing RQ settings.')
def connect(url): def connect(url, config=None):
return StrictRedis.from_url(url or 'redis://localhost:6379/0') if url:
return StrictRedis.from_url(url)
settings = read_config_file(config) if config else {}
return get_redis_from_config(settings)
@click.group() @click.group()
@ -120,7 +136,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@main.command() @main.command()
@url_option @url_option
@click.option('--config', '-c', help='Module containing RQ settings.') @config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name') @click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@ -132,10 +148,11 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl,
verbose, quiet, sentry_dsn, pid, queues): verbose, quiet, sentry_dsn, exception_handler, pid, queues):
"""Starts an RQ worker.""" """Starts an RQ worker."""
if path: if path:
@ -143,7 +160,6 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
settings = read_config_file(config) if config else {} settings = read_config_file(config) if config else {}
# Worker specific default arguments # Worker specific default arguments
url = url or settings.get('REDIS_URL')
queues = queues or settings.get('QUEUES', ['default']) queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN') sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
@ -153,19 +169,28 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
setup_loghandlers_from_args(verbose, quiet) setup_loghandlers_from_args(verbose, quiet)
conn = connect(url) conn = connect(url, config)
cleanup_ghosts(conn) cleanup_ghosts(conn)
worker_class = import_attribute(worker_class) worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class) queue_class = import_attribute(queue_class)
exception_handlers = []
for h in exception_handler:
exception_handlers.append(import_attribute(h))
if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)
try: try:
queues = [queue_class(queue, connection=conn) for queue in queues] queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues, w = worker_class(queues,
name=name, name=name,
connection=conn, connection=conn,
default_worker_ttl=worker_ttl, default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl, default_result_ttl=results_ttl,
job_class=job_class) job_class=job_class,
exception_handlers=exception_handlers or None)
# Should we configure Sentry? # Should we configure Sentry?
if sentry_dsn: if sentry_dsn:
@ -178,3 +203,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)
@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)
connection = connect(url, config)
connection_suspend(connection, duration)
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
@main.command()
@url_option
@config_option
def resume(url, config):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
click.echo("Resuming workers.")

@ -7,8 +7,11 @@ import time
from functools import partial from functools import partial
import click import click
import redis
from redis import StrictRedis
from rq import Queue, Worker from rq import Queue, Worker
from rq.logutils import setup_loghandlers from rq.logutils import setup_loghandlers
from rq.worker import WorkerStatus
red = partial(click.style, fg='red') red = partial(click.style, fg='red')
green = partial(click.style, fg='green') green = partial(click.style, fg='green')
@ -23,6 +26,35 @@ def read_config_file(module):
if k.upper() == k]) if k.upper() == k])
def get_redis_from_config(settings):
"""Returns a StrictRedis instance from a dictionary of settings."""
if settings.get('REDIS_URL') is not None:
return StrictRedis.from_url(settings['REDIS_URL'])
kwargs = {
'host': settings.get('REDIS_HOST', 'localhost'),
'port': settings.get('REDIS_PORT', 6379),
'db': settings.get('REDIS_DB', 0),
'password': settings.get('REDIS_PASSWORD', None),
}
use_ssl = settings.get('REDIS_SSL', False)
if use_ssl:
# If SSL is required, we need to depend on redis-py being 2.10 at
# least
def safeint(x):
try:
return int(x)
except ValueError:
return 0
version_info = tuple(safeint(x) for x in redis.__version__.split('.'))
if not version_info >= (2, 10):
raise RuntimeError('Using SSL requires a redis-py version >= 2.10')
kwargs['ssl'] = use_ssl
return StrictRedis(**kwargs)
def pad(s, pad_to_length): def pad(s, pad_to_length):
"""Pads the given string to the given length.""" """Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,) return ('%-' + '%ds' % pad_to_length) % (s,)
@ -39,8 +71,9 @@ def get_scale(x):
def state_symbol(state): def state_symbol(state):
symbols = { symbols = {
'busy': red('busy'), WorkerStatus.BUSY: red('busy'),
'idle': green('idle'), WorkerStatus.IDLE: green('idle'),
WorkerStatus.SUSPENDED: yellow('suspended'),
} }
try: try:
return symbols[state] return symbols[state]
@ -101,7 +134,7 @@ def show_workers(queues, raw, by_queue):
else: else:
qs = Queue.all() qs = Queue.all()
ws = Worker.all() ws = Worker.all()
filter_queues = lambda x: x filter_queues = (lambda x: x)
if not by_queue: if not by_queue:
for w in ws: for w in ws:

@ -18,12 +18,14 @@ def fix_return_type(func):
return _inner return _inner
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
def patch_connection(connection): def patch_connection(connection):
if not isinstance(connection, StrictRedis): if not isinstance(connection, StrictRedis):
raise ValueError('A StrictRedis or Redis connection is required.') raise ValueError('A StrictRedis or Redis connection is required.')
# Don't patch already patches objects # Don't patch already patches objects
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
return connection return connection
@ -35,6 +37,7 @@ def patch_connection(connection):
connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
if hasattr(connection, 'pttl'): if hasattr(connection, 'pttl'):
connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
elif isinstance(connection, StrictRedis): elif isinstance(connection, StrictRedis):
connection._setex = connection.setex connection._setex = connection.setex
connection._lrem = connection.lrem connection._lrem = connection.lrem

@ -43,7 +43,7 @@ def use_connection(redis=None):
use of use_connection() and stacked connection contexts. use of use_connection() and stacked connection contexts.
""" """
assert len(_connection_stack) <= 1, \ assert len(_connection_stack) <= 1, \
'You should not mix Connection contexts with use_connection().' 'You should not mix Connection contexts with use_connection()'
release_local(_connection_stack) release_local(_connection_stack)
if redis is None: if redis is None:
@ -67,7 +67,7 @@ def resolve_connection(connection=None):
connection = get_current_connection() connection = get_current_connection()
if connection is None: if connection is None:
raise NoRedisConnectionException('Could not resolve a Redis connection.') raise NoRedisConnectionException('Could not resolve a Redis connection')
return connection return connection

@ -1,24 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import warnings
def register_sentry(client, worker): def register_sentry(client, worker):
"""Given a Raven client and an RQ worker, registers exception handlers """Given a Raven client and an RQ worker, registers exception handlers
with the worker so exceptions are logged to Sentry. with the worker so exceptions are logged to Sentry.
""" """
def uses_supported_transport(url):
supported_transports = set(['sync+', 'requests+'])
return any(url.startswith(prefix) for prefix in supported_transports)
if not any(uses_supported_transport(s) for s in client.servers):
msg = ('Sentry error delivery is known to be unreliable when not '
'delivered synchronously from RQ workers. You are encouraged '
'to change your DSN to use the sync+ or requests+ transport '
'prefix.')
warnings.warn(msg, UserWarning, stacklevel=2)
def send_to_sentry(job, *exc_info): def send_to_sentry(job, *exc_info):
client.captureException( client.captureException(
exc_info=exc_info, exc_info=exc_info,

@ -6,8 +6,8 @@ from functools import wraps
from rq.compat import string_types from rq.compat import string_types
from .defaults import DEFAULT_RESULT_TTL
from .queue import Queue from .queue import Queue
from .worker import DEFAULT_RESULT_TTL
class job(object): class job(object):

@ -0,0 +1,2 @@
DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500

@ -11,10 +11,6 @@ class InvalidJobOperationError(Exception):
pass pass
class NoQueueError(Exception):
pass
class UnpickleError(Exception): class UnpickleError(Exception):
def __init__(self, message, raw_data, inner_exception=None): def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message, inner_exception) super(UnpickleError, self).__init__(message, inner_exception)

@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse from .utils import enum, import_attribute, utcformat, utcnow, utcparse
try: try:
import cPickle as pickle import cPickle as pickle
@ -25,18 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads loads = pickle.loads
def enum(name, *sequential, **named): JobStatus = enum(
values = dict(zip(sequential, range(len(sequential))), **named) 'JobStatus',
QUEUED='queued',
# NOTE: Yes, we *really* want to cast using str() here. FINISHED='finished',
# On Python 2 type() requires a byte string (which is str() on Python 2). FAILED='failed',
# On Python 3 it does not matter, so we'll use str(), which acts as STARTED='started',
# a no-op. DEFERRED='deferred'
return type(str(name), (), values) )
Status = enum('Status',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
# Sentinel value to mark that some of our lazily evaluated properties have not # Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated. # yet been evaluated.
@ -54,7 +50,7 @@ def unpickle(pickled_string):
try: try:
obj = loads(pickled_string) obj = loads(pickled_string)
except Exception as e: except Exception as e:
raise UnpickleError('Could not unpickle.', pickled_string, e) raise UnpickleError('Could not unpickle', pickled_string, e)
return obj return obj
@ -92,8 +88,8 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None, result_ttl=None, ttl=None, status=None, description=None,
id=None): depends_on=None, timeout=None, id=None, origin=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -103,21 +99,24 @@ class Job(object):
kwargs = {} kwargs = {}
if not isinstance(args, (tuple, list)): if not isinstance(args, (tuple, list)):
raise TypeError('{0!r} is not a valid args list.'.format(args)) raise TypeError('{0!r} is not a valid args list'.format(args))
if not isinstance(kwargs, dict): if not isinstance(kwargs, dict):
raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
job = cls(connection=connection) job = cls(connection=connection)
if id is not None: if id is not None:
job.set_id(id) job.set_id(id)
if origin is not None:
job.origin = origin
# Set the core job tuple properties # Set the core job tuple properties
job._instance = None job._instance = None
if inspect.ismethod(func): if inspect.ismethod(func):
job._instance = func.__self__ job._instance = func.__self__
job._func_name = func.__name__ job._func_name = func.__name__
elif inspect.isfunction(func) or inspect.isbuiltin(func): elif inspect.isfunction(func) or inspect.isbuiltin(func):
job._func_name = '%s.%s' % (func.__module__, func.__name__) job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
elif isinstance(func, string_types): elif isinstance(func, string_types):
job._func_name = as_text(func) job._func_name = as_text(func)
elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
@ -131,6 +130,7 @@ class Job(object):
# Extra meta data # Extra meta data
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job.ttl = ttl
job.timeout = timeout job.timeout = timeout
job._status = status job._status = status
@ -166,19 +166,19 @@ class Job(object):
@property @property
def is_finished(self): def is_finished(self):
return self.get_status() == Status.FINISHED return self.get_status() == JobStatus.FINISHED
@property @property
def is_queued(self): def is_queued(self):
return self.get_status() == Status.QUEUED return self.get_status() == JobStatus.QUEUED
@property @property
def is_failed(self): def is_failed(self):
return self.get_status() == Status.FAILED return self.get_status() == JobStatus.FAILED
@property @property
def is_started(self): def is_started(self):
return self.get_status() == Status.STARTED return self.get_status() == JobStatus.STARTED
@property @property
def dependency(self): def dependency(self):
@ -212,7 +212,7 @@ class Job(object):
def data(self): def data(self):
if self._data is UNEVALUATED: if self._data is UNEVALUATED:
if self._func_name is UNEVALUATED: if self._func_name is UNEVALUATED:
raise ValueError('Cannot build the job data.') raise ValueError('Cannot build the job data')
if self._instance is UNEVALUATED: if self._instance is UNEVALUATED:
self._instance = None self._instance = None
@ -311,12 +311,13 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self.ttl = None
self._status = None self._status = None
self._dependency_id = None self._dependency_id = None
self.meta = {} self.meta = {}
def __repr__(self): # noqa def __repr__(self): # noqa
return 'Job(%r, enqueued_at=%r)' % (self._id, self.enqueued_at) return 'Job({0!r}, enqueued_at={1!r})'.format(self._id, self.enqueued_at)
# Data access # Data access
def get_id(self): # noqa def get_id(self): # noqa
@ -330,7 +331,7 @@ class Job(object):
def set_id(self, value): def set_id(self, value):
"""Sets a job ID for the given job.""" """Sets a job ID for the given job."""
if not isinstance(value, string_types): if not isinstance(value, string_types):
raise TypeError('id must be a string, not {0}.'.format(type(value))) raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value self._id = value
id = property(get_id, set_id) id = property(get_id, set_id)
@ -343,7 +344,7 @@ class Job(object):
@classmethod @classmethod
def dependents_key_for(cls, job_id): def dependents_key_for(cls, job_id):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job hash under."""
return 'rq:job:%s:dependents' % (job_id,) return 'rq:job:{0}:dependents'.format(job_id)
@property @property
def key(self): def key(self):
@ -392,7 +393,7 @@ class Job(object):
key = self.key key = self.key
obj = decode_redis_hash(self.connection.hgetall(key)) obj = decode_redis_hash(self.connection.hgetall(key))
if len(obj) == 0: if len(obj) == 0:
raise NoSuchJobError('No such job: %s' % (key,)) raise NoSuchJobError('No such job: {0}'.format(key))
def to_date(date_str): def to_date(date_str):
if date_str is None: if date_str is None:
@ -416,6 +417,7 @@ class Job(object):
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None) self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None)) self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def to_dict(self): def to_dict(self):
@ -446,6 +448,8 @@ class Job(object):
obj['dependency_id'] = self._dependency_id obj['dependency_id'] = self._dependency_id
if self.meta: if self.meta:
obj['meta'] = dumps(self.meta) obj['meta'] = dumps(self.meta)
if self.ttl:
obj['ttl'] = self.ttl
return obj return obj
@ -455,6 +459,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict()) connection.hmset(key, self.to_dict())
self.cleanup(self.ttl, pipeline=connection)
def cancel(self): def cancel(self):
"""Cancels the given job, which will prevent the job from ever being """Cancels the given job, which will prevent the job from ever being
@ -480,6 +485,8 @@ class Job(object):
# Job execution # Job execution
def perform(self): # noqa def perform(self): # noqa
"""Invokes the job function with the job arguments.""" """Invokes the job function with the job arguments."""
self.connection.persist(self.key)
self.ttl = -1
_job_stack.push(self.id) _job_stack.push(self.id)
try: try:
self._result = self.func(*self.args, **self.kwargs) self._result = self.func(*self.args, **self.kwargs)
@ -488,8 +495,15 @@ class Job(object):
return self._result return self._result
def get_ttl(self, default_ttl=None): def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result """Returns ttl for a job that determines how long a job will be
will be persisted. In the future, this method will also be responsible persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs. for determining ttl for repeated jobs.
""" """
return default_ttl if self.result_ttl is None else self.result_ttl return default_ttl if self.result_ttl is None else self.result_ttl
@ -502,22 +516,28 @@ class Job(object):
if self.func_name is None: if self.func_name is None:
return None return None
arg_list = [repr(arg) for arg in self.args] arg_list = [as_text(repr(arg)) for arg in self.args]
arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()]
kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()]
# Sort here because python 3.3 & 3.4 makes different call_string
arg_list += sorted(kwargs)
args = ', '.join(arg_list) args = ', '.join(arg_list)
return '%s(%s)' % (self.func_name, args)
return '{0}({1})'.format(self.func_name, args)
def cleanup(self, ttl=None, pipeline=None): def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually """Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its called after successful execution. How long we persist the job and its
result depends on the value of result_ttl: result depends on the value of ttl:
- If result_ttl is 0, cleanup the job immediately. - If ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds. - If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist - If ttl is negative, don't set an expiry to it (persist
forever) forever)
""" """
if ttl == 0: if ttl == 0:
self.cancel() self.cancel()
elif not ttl:
return
elif ttl > 0: elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
@ -530,17 +550,23 @@ class Job(object):
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the current job in its dependency's dependents set. This method adds the job in its dependency's dependents set
and adds the job to DeferredJobRegistry.
""" """
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, connection=self.connection)
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job {0}: {1}>'.format(self.id, self.description)
# Job equality # Job equality
def __eq__(self, other): # noqa def __eq__(self, other): # noqa
return self.id == other.id return isinstance(other, self.__class__) and self.id == other.id
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)

@ -4,15 +4,15 @@ from __future__ import (absolute_import, division, print_function,
import uuid import uuid
from .connections import resolve_connection from redis import WatchError
from .job import Job, Status
from .utils import import_attribute, utcnow
from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError) NoSuchJobError, UnpickleError)
from .compat import total_ordering, string_types, as_text from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from redis import WatchError
def get_failed_queue(connection=None): def get_failed_queue(connection=None):
@ -50,7 +50,7 @@ class Queue(object):
""" """
prefix = cls.redis_queue_namespace_prefix prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix): if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: %s' % (queue_key,)) raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix):] name = queue_key[len(prefix):]
return cls(name, connection=connection) return cls(name, connection=connection)
@ -59,7 +59,7 @@ class Queue(object):
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = '%s%s' % (prefix, name) self._key = '{0}{1}'.format(prefix, name)
self._default_timeout = default_timeout self._default_timeout = default_timeout
self._async = async self._async = async
@ -71,6 +71,9 @@ class Queue(object):
def __len__(self): def __len__(self):
return self.count return self.count
def __iter__(self):
yield self
@property @property
def key(self): def key(self):
"""Returns the Redis key for this Queue.""" """Returns the Redis key for this Queue."""
@ -143,13 +146,13 @@ class Queue(object):
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None: if pipeline is not None:
pipeline.lrem(self.key, 0, job_id) pipeline.lrem(self.key, 1, job_id)
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 1, job_id)
def compact(self): def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while """Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics. guaranteeing FIFO semantics.
""" """
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
@ -161,14 +164,18 @@ class Queue(object):
if self.job_class.exists(job_id, self.connection): if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id, pipeline=None): def push_job_id(self, job_id, pipeline=None, at_front=False):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue.
'at_front' allows you to push the job onto the front instead of the back of the queue"""
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.rpush(self.key, job_id) if at_front:
connection.lpush(self.key, job_id)
else:
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None, result_ttl=None, ttl=None, description=None,
job_id=None): depends_on=None, job_id=None, at_front=False):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -178,11 +185,11 @@ class Queue(object):
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(
job = self.job_class.create(func, args, kwargs, connection=self.connection, func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout, description=description, depends_on=depends_on,
id=job_id) timeout=timeout, id=job_id, origin=self.name)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it. # parent's dependents instead of enqueueing it.
@ -190,12 +197,14 @@ class Queue(object):
# modifying the dependency. In this case we simply retry # modifying the dependency. In this case we simply retry
if depends_on is not None: if depends_on is not None:
if not isinstance(depends_on, self.job_class): if not isinstance(depends_on, self.job_class):
depends_on = Job.fetch(id=depends_on, connection=self.connection) depends_on = Job(id=depends_on, connection=self.connection)
with self.connection.pipeline() as pipe: with self.connection._pipeline() as pipe:
while True: while True:
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
pipe.multi()
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
pipe.execute() pipe.execute()
@ -204,7 +213,14 @@ class Queue(object):
except WatchError: except WatchError:
continue continue
return self.enqueue_job(job) job = self.enqueue_job(job, at_front=at_front)
if not self._async:
job.perform()
job.save()
job.cleanup(DEFAULT_RESULT_TTL)
return job
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
@ -222,61 +238,73 @@ class Queue(object):
""" """
if not isinstance(f, string_types) and f.__module__ == '__main__': if not isinstance(f, string_types) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed ' raise ValueError('Functions from the __main__ module cannot be processed '
'by workers.') 'by workers')
# Detect explicit invocations, i.e. of the form: # Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None) job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id) job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True): def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
pipe = pipeline if pipeline is not None else self.connection._pipeline()
# Add Queue key set # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key) pipe.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipe)
if set_meta_data: job.origin = self.name
job.origin = self.name job.enqueued_at = utcnow()
job.enqueued_at = utcnow()
if job.timeout is None: if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT job.timeout = self.DEFAULT_TIMEOUT
job.save() job.save(pipeline=pipe)
if pipeline is None:
pipe.execute()
if self._async: if self._async:
self.push_job_id(job.id) self.push_job_id(job.id, at_front=at_front)
else:
job.perform()
job.save()
return job return job
def enqueue_dependents(self, job): def enqueue_dependents(self, job):
"""Enqueues all jobs in the given job's dependents set and clears it.""" """Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined # TODO: can probably be pipelined
from .registry import DeferredJobRegistry
while True: while True:
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = self.job_class.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
self.enqueue_job(dependent) registry = DeferredJobRegistry(dependent.origin, self.connection)
with self.connection._pipeline() as pipeline:
registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipeline)
else:
queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipeline)
pipeline.execute()
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""
@ -299,7 +327,7 @@ class Queue(object):
connection = resolve_connection(connection) connection = resolve_connection(connection)
if timeout is not None: # blocking variant if timeout is not None: # blocking variant
if timeout == 0: if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.') raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
result = connection.blpop(queue_keys, timeout) result = connection.blpop(queue_keys, timeout)
if result is None: if result is None:
raise DequeueTimeout(timeout, queue_keys) raise DequeueTimeout(timeout, queue_keys)
@ -317,22 +345,22 @@ class Queue(object):
Returns a job_class instance, which can be executed or inspected. Returns a job_class instance, which can be executed or inspected.
""" """
job_id = self.pop_job_id() while True:
if job_id is None: job_id = self.pop_job_id()
return None if job_id is None:
try: return None
job = self.job_class.fetch(job_id, connection=self.connection) try:
except NoSuchJobError as e: job = self.job_class.fetch(job_id, connection=self.connection)
# Silently pass on jobs that don't exist (anymore), except NoSuchJobError as e:
# and continue by reinvoking itself recursively # Silently pass on jobs that don't exist (anymore),
return self.dequeue() continue
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting
e.job_id = job_id e.job_id = job_id
e.queue = self e.queue = self
raise e raise e
return job return job
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None): def dequeue_any(cls, queues, timeout, connection=None):
@ -346,63 +374,71 @@ class Queue(object):
See the documentation of cls.lpop for the interpretation of timeout. See the documentation of cls.lpop for the interpretation of timeout.
""" """
queue_keys = [q.key for q in queues] while True:
result = cls.lpop(queue_keys, timeout, connection=connection) queue_keys = [q.key for q in queues]
if result is None: result = cls.lpop(queue_keys, timeout, connection=connection)
return None if result is None:
queue_key, job_id = map(as_text, result) return None
queue = cls.from_queue_key(queue_key, connection=connection) queue_key, job_id = map(as_text, result)
try: queue = cls.from_queue_key(queue_key, connection=connection)
job = cls.job_class.fetch(job_id, connection=connection) try:
except NoSuchJobError: job = cls.job_class.fetch(job_id, connection=connection)
# Silently pass on jobs that don't exist (anymore), except NoSuchJobError:
# and continue by reinvoking the same function recursively # Silently pass on jobs that don't exist (anymore),
return cls.dequeue_any(queues, timeout, connection=connection) # and continue in the look
except UnpickleError as e: continue
# Attach queue information on the exception for improved error except UnpickleError as e:
# reporting # Attach queue information on the exception for improved error
e.job_id = job_id # reporting
e.queue = queue e.job_id = job_id
raise e e.queue = queue
return job, queue raise e
return job, queue
return None, None
# Total ordering defition (the rest of the required Python methods are # Total ordering defition (the rest of the required Python methods are
# auto-generated by the @total_ordering decorator) # auto-generated by the @total_ordering decorator)
def __eq__(self, other): # noqa def __eq__(self, other): # noqa
if not isinstance(other, Queue): if not isinstance(other, Queue):
raise TypeError('Cannot compare queues to other objects.') raise TypeError('Cannot compare queues to other objects')
return self.name == other.name return self.name == other.name
def __lt__(self, other): def __lt__(self, other):
if not isinstance(other, Queue): if not isinstance(other, Queue):
raise TypeError('Cannot compare queues to other objects.') raise TypeError('Cannot compare queues to other objects')
return self.name < other.name return self.name < other.name
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)
def __repr__(self): # noqa def __repr__(self): # noqa
return 'Queue(%r)' % (self.name,) return 'Queue({0!r})'.format(self.name)
def __str__(self): def __str__(self):
return '<Queue \'%s\'>' % (self.name,) return '<Queue {0!r}>'.format(self.name)
class FailedQueue(Queue): class FailedQueue(Queue):
def __init__(self, connection=None): def __init__(self, connection=None):
super(FailedQueue, self).__init__(Status.FAILED, connection=connection) super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
def quarantine(self, job, exc_info): def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed """Puts the given Job in quarantine (i.e. put it on the failed
queue). queue).
This is different from normal job enqueueing, since certain meta data
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
data must be inserted (`ended_at` and `exc_info`).
""" """
job.ended_at = utcnow()
job.exc_info = exc_info with self.connection._pipeline() as pipeline:
return self.enqueue_job(job, set_meta_data=False) # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline)
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()
return job
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
@ -415,9 +451,9 @@ class FailedQueue(Queue):
# Delete it from the failed queue (raise an error if that failed) # Delete it from the failed queue (raise an error if that failed)
if self.remove(job) == 0: if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.') raise InvalidJobOperationError('Cannot requeue non-failed jobs')
job.set_status(Status.QUEUED) job.set_status(JobStatus.QUEUED)
job.exc_info = None job.exc_info = None
q = Queue(job.origin, connection=self.connection) q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job) q.enqueue_job(job)

@ -1,16 +1,16 @@
from .compat import as_text from .compat import as_text
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .job import Job, JobStatus
from .queue import FailedQueue from .queue import FailedQueue
from .utils import current_timestamp from .utils import current_timestamp
class BaseRegistry(object): class BaseRegistry(object):
""" """
Base implementation of job registry, implemented in Redis sorted set. Each job Base implementation of a job registry, implemented in Redis sorted set.
is stored as a key in the registry, scored by expiration time (unix timestamp). Each job is stored as a key in the registry, scored by expiration time
(unix timestamp).
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
@ -27,9 +27,9 @@ class BaseRegistry(object):
self.cleanup() self.cleanup()
return self.connection.zcard(self.key) return self.connection.zcard(self.key)
def add(self, job, timeout, pipeline=None): def add(self, job, ttl=0, pipeline=None):
"""Adds a job to StartedJobRegistry with expiry time of now + timeout.""" """Adds a job to a registry with expiry time of now + ttl."""
score = current_timestamp() + timeout score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None: if pipeline is not None:
return pipeline.zadd(self.key, score, job.id) return pipeline.zadd(self.key, score, job.id)
@ -39,10 +39,16 @@ class BaseRegistry(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id) return connection.zrem(self.key, job.id)
def get_expired_job_ids(self): def get_expired_job_ids(self, timestamp=None):
"""Returns job ids whose score are less than current timestamp.""" """Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
specified as seconds since the Unix epoch. timestamp defaults to call
time if unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, current_timestamp())] self.connection.zrangebyscore(self.key, 0, score)]
def get_job_ids(self, start=0, end=-1): def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids.""" """Returns list of all job ids."""
@ -59,24 +65,36 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed Jobs are added to registry right before they are executed and removed
right after completion (success or failure). right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection) super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name self.key = 'rq:wip:{0}'.format(name)
def cleanup(self): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue.""" """Remove expired jobs from registry and add them to FailedQueue.
job_ids = self.get_expired_job_ids()
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.
"""
score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)
if job_ids: if job_ids:
failed_queue = FailedQueue(connection=self.connection) failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline) try:
pipeline.zremrangebyscore(self.key, 0, current_timestamp()) job = Job.fetch(job_id, connection=self.connection)
job.status = JobStatus.FAILED
job.save(pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass
pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()
return job_ids return job_ids
@ -90,8 +108,38 @@ class FinishedJobRegistry(BaseRegistry):
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
super(FinishedJobRegistry, self).__init__(name, connection) super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name self.key = 'rq:finished:{0}'.format(name)
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
"""
def __init__(self, name='default', connection=None):
super(DeferredJobRegistry, self).__init__(name, connection)
self.key = 'rq:deferred:{0}'.format(name)
def cleanup(self): def cleanup(self):
"""Remove expired jobs from registry.""" """This method is only here to prevent errors because this method is
self.connection.zremrangebyscore(self.key, 0, current_timestamp()) automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()
registry = StartedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()

@ -0,0 +1,18 @@
WORKERS_SUSPENDED = 'rq:suspended'
def is_suspended(connection):
return connection.exists(WORKERS_SUSPENDED)
def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)
def resume(connection):
return connection.delete(WORKERS_SUSPENDED)

@ -48,7 +48,7 @@ class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame): def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout ' raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout) 'value ({0} seconds)'.format(self._timeout))
def setup_death_penalty(self): def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises """Sets up an alarm signal and a signal handler that raises

@ -9,12 +9,13 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import calendar import calendar
import importlib
import datetime import datetime
import importlib
import logging import logging
import sys import sys
from collections import Iterable
from .compat import is_python_version, as_text from .compat import as_text, is_python_version, string_types
class _Colorizer(object): class _Colorizer(object):
@ -59,7 +60,7 @@ class _Colorizer(object):
return self.codes["reset"] return self.codes["reset"]
def colorize(self, color_key, text): def colorize(self, color_key, text):
if not sys.stdout.isatty(): if self.notty:
return text return text
else: else:
return self.codes[color_key] + text + self.codes["reset"] return self.codes[color_key] + text + self.codes["reset"]
@ -205,6 +206,29 @@ def first(iterable, default=None, key=None):
return default return default
def is_nonstring_iterable(obj):
"""Returns whether the obj is an iterable, but not a string"""
return isinstance(obj, Iterable) and not isinstance(obj, string_types)
def ensure_list(obj):
"""
When passed an iterable of objects, does nothing, otherwise, it returns
a list with just that object in it.
"""
return obj if is_nonstring_iterable(obj) else [obj]
def current_timestamp(): def current_timestamp():
"""Returns current UTC timestamp""" """Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '0.4.6'
VERSION = '0.5.6'

@ -12,18 +12,22 @@ import sys
import time import time
import traceback import traceback
import warnings import warnings
from datetime import timedelta
from rq.compat import as_text, string_types, text_type from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection from .connections import get_current_connection
from .exceptions import DequeueTimeout, NoQueueError from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .job import Job, Status from .exceptions import DequeueTimeout
from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue from .queue import Queue, get_failed_queue
from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow from .utils import (ensure_list, enum, import_attribute, make_colorizer,
utcformat, utcnow, utcparse)
from .version import VERSION from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry
try: try:
from procname import setprocname from procname import setprocname
@ -35,8 +39,7 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue') blue = make_colorizer('darkblue')
DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -65,6 +68,15 @@ def signal_name(signum):
return 'SIG_UNKNOWN' return 'SIG_UNKNOWN'
WorkerStatus = enum(
'WorkerStatus',
STARTED='started',
SUSPENDED='suspended',
BUSY='busy',
IDLE='idle'
)
class Worker(object): class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:' redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers' redis_workers_keys = 'rq:workers'
@ -91,7 +103,7 @@ class Worker(object):
""" """
prefix = cls.redis_worker_namespace_prefix prefix = cls.redis_worker_namespace_prefix
if not worker_key.startswith(prefix): if not worker_key.startswith(prefix):
raise ValueError('Not a valid RQ worker key: %s' % (worker_key,)) raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key))
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
@ -110,13 +122,14 @@ class Worker(object):
return worker return worker
def __init__(self, queues, name=None, def __init__(self, queues, name=None,
default_result_ttl=None, connection=None, default_result_ttl=None, connection=None, exc_handler=None,
exc_handler=None, default_worker_ttl=None, job_class=None): # noqa exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
if isinstance(queues, self.queue_class):
queues = [queues] queues = [self.queue_class(name=q) if isinstance(q, text_type) else q
for q in ensure_list(queues)]
self._name = name self._name = name
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
@ -133,15 +146,26 @@ class Worker(object):
self._state = 'starting' self._state = 'starting'
self._is_horse = False self._is_horse = False
self._horse_pid = 0 self._horse_pid = 0
self._stopped = False self._stop_requested = False
self.log = logger self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection) self.failed_queue = get_failed_queue(connection=self.connection)
self.last_cleaned_at = None
# By default, push the "move-to-failed-queue" exception handler onto # By default, push the "move-to-failed-queue" exception handler onto
# the stack # the stack
self.push_exc_handler(self.move_to_failed_queue) if exception_handlers is None:
if exc_handler is not None: self.push_exc_handler(self.move_to_failed_queue)
self.push_exc_handler(exc_handler) if exc_handler is not None:
self.push_exc_handler(exc_handler)
warnings.warn(
"use of exc_handler is deprecated, pass a list to exception_handlers instead.",
DeprecationWarning
)
elif isinstance(exception_handlers, list):
for h in exception_handlers:
self.push_exc_handler(h)
elif exception_handlers is not None:
self.push_exc_handler(exception_handlers)
if job_class is not None: if job_class is not None:
if isinstance(job_class, string_types): if isinstance(job_class, string_types):
@ -150,19 +174,17 @@ class Worker(object):
def validate_queues(self): def validate_queues(self):
"""Sanity check for the given queues.""" """Sanity check for the given queues."""
if not iterable(self.queues):
raise ValueError('Argument queues not iterable.')
for queue in self.queues: for queue in self.queues:
if not isinstance(queue, self.queue_class): if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.') raise TypeError('{0} is not of type {1} or text type'.format(queue, self.queue_class))
def queue_names(self): def queue_names(self):
"""Returns the queue names of this worker's queues.""" """Returns the queue names of this worker's queues."""
return map(lambda q: q.name, self.queues) return list(map(lambda q: q.name, self.queues))
def queue_keys(self): def queue_keys(self):
"""Returns the Redis keys representing this worker's queues.""" """Returns the Redis keys representing this worker's queues."""
return map(lambda q: q.key, self.queues) return list(map(lambda q: q.key, self.queues))
@property @property
def name(self): def name(self):
@ -175,7 +197,7 @@ class Worker(object):
if self._name is None: if self._name is None:
hostname = socket.gethostname() hostname = socket.gethostname()
shortname, _, _ = hostname.partition('.') shortname, _, _ = hostname.partition('.')
self._name = '%s.%s' % (shortname, self.pid) self._name = '{0}.{1}'.format(shortname, self.pid)
return self._name return self._name
@property @property
@ -205,15 +227,15 @@ class Worker(object):
This can be used to make `ps -ef` output more readable. This can be used to make `ps -ef` output more readable.
""" """
setprocname('rq: %s' % (message,)) setprocname('rq: {0}'.format(message))
def register_birth(self): def register_birth(self):
"""Registers its own birth.""" """Registers its own birth."""
self.log.debug('Registering birth of worker %s' % (self.name,)) self.log.debug('Registering birth of worker {0}'.format(self.name))
if self.connection.exists(self.key) and \ if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'): not self.connection.hexists(self.key, 'death'):
raise ValueError('There exists an active worker named \'%s\' ' msg = 'There exists an active worker named {0!r} already'
'already.' % (self.name,)) raise ValueError(msg.format(self.name))
key = self.key key = self.key
queues = ','.join(self.queue_names()) queues = ','.join(self.queue_names())
with self.connection._pipeline() as p: with self.connection._pipeline() as p:
@ -235,6 +257,20 @@ class Worker(object):
p.expire(self.key, 60) p.expire(self.key, 60)
p.execute() p.execute()
@property
def birth_date(self):
"""Fetches birth date from Redis."""
birth_timestamp = self.connection.hget(self.key, 'birth')
if birth_timestamp is not None:
return utcparse(as_text(birth_timestamp))
@property
def death_date(self):
"""Fetches death date from Redis."""
death_timestamp = self.connection.hget(self.key, 'death')
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline=None): def set_state(self, state, pipeline=None):
self._state = state self._state = state
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
@ -282,56 +318,75 @@ class Worker(object):
return self.job_class.fetch(job_id, self.connection) return self.job_class.fetch(job_id, self.connection)
@property
def stopped(self):
return self._stopped
def _install_signal_handlers(self): def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM """Installs signal handlers for handling SIGINT and SIGTERM
gracefully. gracefully.
""" """
def request_force_stop(signum, frame): signal.signal(signal.SIGINT, self.request_stop)
"""Terminates the application (cold shutdown). signal.signal(signal.SIGTERM, self.request_stop)
"""
self.log.warning('Cold shut down.')
# Take down the horse with the worker def request_force_stop(self, signum, frame):
if self.horse_pid: """Terminates the application (cold shutdown).
msg = 'Taking down horse %d with me.' % self.horse_pid """
self.log.debug(msg) self.log.warning('Cold shut down')
try:
os.kill(self.horse_pid, signal.SIGKILL) # Take down the horse with the worker
except OSError as e: if self.horse_pid:
# ESRCH ("No such process") is fine with us msg = 'Taking down horse {0} with me'.format(self.horse_pid)
if e.errno != errno.ESRCH: self.log.debug(msg)
self.log.debug('Horse already down.') try:
raise os.kill(self.horse_pid, signal.SIGKILL)
raise SystemExit() except OSError as e:
# ESRCH ("No such process") is fine with us
if e.errno != errno.ESRCH:
self.log.debug('Horse already down')
raise
raise SystemExit()
def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
self.log.debug('Got signal {0}'.format(signal_name(signum)))
signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)
msg = 'Warm shut down requested'
self.log.warning(msg)
# If shutdown is requested in the middle of a job, wait until
# finish before shutting down
if self.get_state() == 'busy':
self._stop_requested = True
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
raise StopRequested()
def request_stop(signum, frame): def check_for_suspension(self, burst):
"""Stops the current worker loop but waits for child processes to """Check to see if workers have been suspended by `rq suspend`"""
end gracefully (warm shutdown).
"""
self.log.debug('Got signal %s.' % signal_name(signum))
signal.signal(signal.SIGINT, request_force_stop) before_state = None
signal.signal(signal.SIGTERM, request_force_stop) notified = False
msg = 'Warm shut down requested.' while not self._stop_requested and is_suspended(self.connection):
self.log.warning(msg)
# If shutdown is requested in the middle of a job, wait until if burst:
# finish before shutting down self.log.info('Suspended in burst mode, exiting')
if self.get_state() == 'busy': self.log.info('Note: There could still be unfinished jobs on the queue')
self._stopped = True raise StopRequested
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
raise StopRequested()
signal.signal(signal.SIGINT, request_stop) if not notified:
signal.signal(signal.SIGTERM, request_stop) self.log.info('Worker suspended, run `rq resume` to resume')
before_state = self.get_state()
self.set_state(WorkerStatus.SUSPENDED)
notified = True
time.sleep(1)
if before_state:
self.set_state(before_state)
def work(self, burst=False): def work(self, burst=False):
"""Starts the work loop. """Starts the work loop.
@ -347,18 +402,27 @@ class Worker(object):
did_perform_work = False did_perform_work = False
self.register_birth() self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION) self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
self.set_state('starting') self.set_state(WorkerStatus.STARTED)
try: try:
while True: while True:
if self.stopped:
self.log.info('Stopping on request.')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try: try:
self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
self.clean_registries()
if self._stop_requested:
self.log.info('Stopping on request')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
result = self.dequeue_job_and_maintain_ttl(timeout) result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None: if result is None:
if burst:
self.log.info("RQ worker {0!r} done, quitting".format(self.key))
break break
except StopRequested: except StopRequested:
break break
@ -367,10 +431,11 @@ class Worker(object):
self.execute_job(job) self.execute_job(job)
self.heartbeat() self.heartbeat()
if job.get_status() == Status.FINISHED: if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job) queue.enqueue_dependents(job)
did_perform_work = True did_perform_work = True
finally: finally:
if not self.is_horse: if not self.is_horse:
self.register_death() self.register_death()
@ -380,11 +445,10 @@ class Worker(object):
result = None result = None
qnames = self.queue_names() qnames = self.queue_names()
self.set_state('idle') self.set_state(WorkerStatus.IDLE)
self.procline('Listening on %s' % ','.join(qnames)) self.procline('Listening on {0}'.format(','.join(qnames)))
self.log.info('') self.log.info('')
self.log.info('*** Listening on %s...' % self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
green(', '.join(qnames)))
while True: while True:
self.heartbeat() self.heartbeat()
@ -394,8 +458,8 @@ class Worker(object):
connection=self.connection) connection=self.connection)
if result is not None: if result is not None:
job, queue = result job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name), self.log.info('{0}: {1} ({2})'.format(green(queue.name),
blue(job.description), job.id)) blue(job.description), job.id))
break break
except DequeueTimeout: except DequeueTimeout:
@ -427,15 +491,17 @@ class Worker(object):
within the given timeout bounds, or will end the work horse with within the given timeout bounds, or will end the work horse with
SIGALRM. SIGALRM.
""" """
self.set_state('busy')
child_pid = os.fork() child_pid = os.fork()
if child_pid == 0: if child_pid == 0:
self.main_work_horse(job) self.main_work_horse(job)
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time())) self.procline('Forked {0} at {0}'.format(child_pid, time.time()))
while True: while True:
try: try:
os.waitpid(child_pid, 0) os.waitpid(child_pid, 0)
self.set_state('idle')
break break
except OSError as e: except OSError as e:
# In case we encountered an OSError due to EINTR (which is # In case we encountered an OSError due to EINTR (which is
@ -477,17 +543,16 @@ class Worker(object):
timeout = (job.timeout or 180) + 60 timeout = (job.timeout or 180) + 60
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
self.set_state('busy', pipeline=pipeline) self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection) registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline) registry.add(job, timeout, pipeline=pipeline)
job.set_status(Status.STARTED, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self.procline('Processing %s from %s since %s' % ( msg = 'Processing {0} from {1} since {2}'
job.func_name, self.procline(msg.format(job.func_name, job.origin, time.time()))
job.origin, time.time()))
def perform_job(self, job): def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called """Performs the actual work of a job. Will/should only be called
@ -508,10 +573,10 @@ class Worker(object):
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.ended_at = utcnow() job.ended_at = utcnow()
job._status = Status.FINISHED job._status = JobStatus.FINISHED
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection) finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
@ -523,23 +588,28 @@ class Worker(object):
pipeline.execute() pipeline.execute()
except Exception: except Exception:
job.set_status(Status.FAILED, pipeline=pipeline) job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute() try:
pipeline.execute()
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False
if rv is None: self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
self.log.info('Job OK') if rv:
else: log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),)) self.log.debug('Result: {0}'.format(yellow(log_result)))
if result_ttl == 0: if result_ttl == 0:
self.log.info('Result discarded immediately.') self.log.info('Result discarded immediately')
elif result_ttl > 0: elif result_ttl > 0:
self.log.info('Result is kept for %d seconds.' % result_ttl) self.log.info('Result is kept for {0} seconds'.format(result_ttl))
else: else:
self.log.warning('Result will never expire, clean up result key manually.') self.log.warning('Result will never expire, clean up result key manually')
return True return True
@ -555,7 +625,7 @@ class Worker(object):
}) })
for handler in reversed(self._exc_handlers): for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,)) self.log.debug('Invoking exception handler {0}'.format(handler))
fallthrough = handler(job, *exc_info) fallthrough = handler(job, *exc_info)
# Only handlers with explicit return values should disable further # Only handlers with explicit return values should disable further
@ -569,7 +639,7 @@ class Worker(object):
def move_to_failed_queue(self, job, *exc_info): def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue.""" """Default exception handler: move the job to the failed queue."""
exc_string = ''.join(traceback.format_exception(*exc_info)) exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.warning('Moving job to %s queue.' % self.failed_queue.name) self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
self.failed_queue.quarantine(job, exc_info=exc_string) self.failed_queue.quarantine(job, exc_info=exc_string)
def push_exc_handler(self, handler_func): def push_exc_handler(self, handler_func):
@ -580,13 +650,33 @@ class Worker(object):
"""Pops the latest exception handler off of the exc handler stack.""" """Pops the latest exception handler off of the exc handler stack."""
return self._exc_handlers.pop() return self._exc_handlers.pop()
def __eq__(self, other):
"""Equality does not take the database/connection into account"""
if not isinstance(other, self.__class__):
raise TypeError('Cannot compare workers to other types (of workers)')
return self.name == other.name
class SimpleWorker(Worker): def __hash__(self):
def _install_signal_handlers(self, *args, **kwargs): """The hash does not take the database/connection into account"""
"""Signal handlers are useless for test worker, as it return hash(self.name)
does not have fork() ability"""
pass def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
clean_registries(queue)
self.last_cleaned_at = utcnow()
@property
def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour."""
if self.last_cleaned_at is None:
return True
if (utcnow() - self.last_cleaned_at) > timedelta(hours=1):
return True
return False
class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):
raise NotImplementedError("Test worker does not implement this method") raise NotImplementedError("Test worker does not implement this method")

@ -17,9 +17,9 @@ else
safe_rg=cat safe_rg=cat
fi fi
export ONLY_RUN_FAST_TESTS=1 export RUN_SLOW_TESTS_TOO=1
if [ "$1" = '-f' ]; then # Poor man's argparse if [ "$1" = '-f' ]; then # Poor man's argparse
unset ONLY_RUN_FAST_TESTS unset RUN_SLOW_TESTS_TOO
shift 1 shift 1
fi fi

@ -1,5 +1,6 @@
[bdist_rpm] [bdist_rpm]
requires = redis requires = redis >= 2.7.0
click >= 3.0
[wheel] [wheel]
universal = 1 universal = 1

@ -51,13 +51,16 @@ setup(
'rqworker = rq.cli:worker', 'rqworker = rq.cli:worker',
], ],
}, },
extras_require={
':python_version=="2.6"': ['argparse', 'importlib'],
},
classifiers=[ classifiers=[
# As from http://pypi.python.org/pypi?%3Aaction=list_classifiers # As from http://pypi.python.org/pypi?%3Aaction=list_classifiers
#'Development Status :: 1 - Planning', #'Development Status :: 1 - Planning',
#'Development Status :: 2 - Pre-Alpha', #'Development Status :: 2 - Pre-Alpha',
#'Development Status :: 3 - Alpha', #'Development Status :: 3 - Alpha',
'Development Status :: 4 - Beta', #'Development Status :: 4 - Beta',
#'Development Status :: 5 - Production/Stable', 'Development Status :: 5 - Production/Stable',
#'Development Status :: 6 - Mature', #'Development Status :: 6 - Mature',
#'Development Status :: 7 - Inactive', #'Development Status :: 7 - Inactive',
'Intended Audience :: Developers', 'Intended Audience :: Developers',

@ -32,7 +32,7 @@ def slow(f):
@wraps(f) @wraps(f)
def _inner(*args, **kwargs): def _inner(*args, **kwargs):
if os.environ.get('ONLY_RUN_FAST_TESTS'): if os.environ.get('RUN_SLOW_TESTS_TOO'):
f(*args, **kwargs) f(*args, **kwargs)
return _inner return _inner

@ -11,6 +11,7 @@ import time
from rq import Connection, get_current_job from rq import Connection, get_current_job
from rq.decorators import job from rq.decorators import job
from rq.compat import PY2
def say_pid(): def say_pid():
@ -54,8 +55,7 @@ def create_file_after_timeout(path, timeout):
def access_self(): def access_self():
job = get_current_job() assert get_current_job() is not None
return job.id
def echo(*args, **kwargs): def echo(*args, **kwargs):
@ -79,11 +79,25 @@ class CallableObject(object):
return u"I'm callable" return u"I'm callable"
class UnicodeStringObject(object):
def __repr__(self):
if PY2:
return u'é'.encode('utf-8')
else:
return u'é'
with Connection(): with Connection():
@job(queue='default') @job(queue='default')
def decorated_job(x, y): def decorated_job(x, y):
return x + y return x + y
def long_running_job(): def black_hole(job, *exc_info):
time.sleep(10) # Don't fall through to default behaviour (moving to failed queue)
return False
def long_running_job(timeout=10):
time.sleep(timeout)
return 'Done sleeping...'

@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from click.testing import CliRunner from click.testing import CliRunner
from rq import get_failed_queue from rq import get_failed_queue, Queue
from rq.compat import is_python_version from rq.compat import is_python_version
from rq.job import Job from rq.job import Job
from rq.cli import main from rq.cli import main
@ -26,6 +26,17 @@ class TestCommandLine(TestCase):
class TestRQCli(RQTestCase): class TestRQCli(RQTestCase):
def assert_normal_execution(self, result):
if result.exit_code == 0:
return True
else:
print("Non normal execution")
print("Exit Code: {}".format(result.exit_code))
print("Output: {}".format(result.output))
print("Exception: {}".format(result.exception))
self.assertEqual(result.exit_code, 0)
"""Test rq_cli script""" """Test rq_cli script"""
def setUp(self): def setUp(self):
super(TestRQCli, self).setUp() super(TestRQCli, self).setUp()
@ -41,25 +52,71 @@ class TestRQCli(RQTestCase):
"""rq empty -u <url> failed""" """rq empty -u <url> failed"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed']) result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
self.assertEqual(result.exit_code, 0) self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self): def test_requeue(self):
"""rq requeue -u <url> --all""" """rq requeue -u <url> --all"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
self.assertEqual(result.exit_code, 0) self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue') self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
def test_info(self): def test_info(self):
"""rq info -u <url>""" """rq info -u <url>"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url]) result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assertEqual(result.exit_code, 0) self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output) self.assertIn('1 queues, 1 jobs total', result.output)
def test_worker(self): def test_worker(self):
"""rq worker -u <url> -b""" """rq worker -u <url> -b"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(result.exit_code, 0) self.assert_normal_execution(result)
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
q = Queue()
failed_q = get_failed_queue()
failed_q.empty()
runner = CliRunner()
# If exception handler is not given, failed job goes to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEquals(failed_q.count, 1)
# Black hole exception handler doesn't add failed jobs to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--exception-handler', 'tests.fixtures.black_hole'])
self.assertEquals(failed_q.count, 1)
def test_suspend_and_resume(self):
"""rq suspend -u <url>
rq resume -u <url>
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url])
self.assert_normal_execution(result)
result = runner.invoke(main, ['resume', '-u', self.redis_url])
self.assert_normal_execution(result)
def test_suspend_with_ttl(self):
"""rq suspend -u <url> --duration=2
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
self.assert_normal_execution(result)
def test_suspend_with_invalid_ttl(self):
"""rq suspend -u <url> --duration=0
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)

@ -0,0 +1,41 @@
from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
class TestHelpers(RQTestCase):
def test_get_redis_from_config(self):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
}
# Ensure REDIS_URL is read
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
settings = {
'REDIS_URL': 'redis://localhost:1/1',
'REDIS_HOST': 'foo',
'REDIS_DB': 2,
'REDIS_PORT': 2,
'REDIS_PASSWORD': 'bar'
}
# Ensure REDIS_URL is preferred
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
# Ensure fall back to regular connection parameters
settings['REDIS_URL'] = None
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['host'], 'foo')
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')

@ -3,17 +3,18 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import datetime from datetime import datetime
import time
from rq.compat import as_text, PY2 from tests import fixtures, RQTestCase
from tests.helpers import strip_microseconds
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job from rq.job import Job, get_current_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation)
from tests.helpers import strip_microseconds
try: try:
from cPickle import loads, dumps from cPickle import loads, dumps
@ -22,6 +23,26 @@ except ImportError:
class TestJob(RQTestCase): class TestJob(RQTestCase):
def test_unicode(self):
"""Unicode in job description [issue405]"""
job = Job.create(
'myfunc',
args=[12, ""],
kwargs=dict(snowman="", null=None),
)
if not PY2:
# Python 3
expected_string = "myfunc(12, '', null=None, snowman='')"
else:
# Python 2
expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
self.assertEquals(
job.description,
expected_string,
)
def test_create_empty_job(self): def test_create_empty_job(self):
"""Creation of new empty jobs.""" """Creation of new empty jobs."""
job = Job() job = Job()
@ -48,7 +69,7 @@ class TestJob(RQTestCase):
def test_create_typical_job(self): def test_create_typical_job(self):
"""Creation of jobs for function calls.""" """Creation of jobs for function calls."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Jobs have a random UUID # Jobs have a random UUID
self.assertIsNotNone(job.id) self.assertIsNotNone(job.id)
@ -57,7 +78,7 @@ class TestJob(RQTestCase):
self.assertIsNone(job.instance) self.assertIsNone(job.instance)
# Job data is set... # Job data is set...
self.assertEquals(job.func, some_calculation) self.assertEquals(job.func, fixtures.some_calculation)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2}) self.assertEquals(job.kwargs, {'z': 2})
@ -68,7 +89,7 @@ class TestJob(RQTestCase):
def test_create_instance_method_job(self): def test_create_instance_method_job(self):
"""Creation of jobs for instance methods.""" """Creation of jobs for instance methods."""
n = Number(2) n = fixtures.Number(2)
job = Job.create(func=n.div, args=(4,)) job = Job.create(func=n.div, args=(4,))
# Job data is set # Job data is set
@ -81,13 +102,13 @@ class TestJob(RQTestCase):
job = Job.create(func='tests.fixtures.say_hello', args=('World',)) job = Job.create(func='tests.fixtures.say_hello', args=('World',))
# Job data is set # Job data is set
self.assertEquals(job.func, say_hello) self.assertEquals(job.func, fixtures.say_hello)
self.assertIsNone(job.instance) self.assertIsNone(job.instance)
self.assertEquals(job.args, ('World',)) self.assertEquals(job.args, ('World',))
def test_create_job_from_callable_class(self): def test_create_job_from_callable_class(self):
"""Creation of jobs using a callable class specifier.""" """Creation of jobs using a callable class specifier."""
kallable = CallableObject() kallable = fixtures.CallableObject()
job = Job.create(func=kallable) job = Job.create(func=kallable)
self.assertEquals(job.func, kallable.__call__) self.assertEquals(job.func, kallable.__call__)
@ -116,7 +137,7 @@ class TestJob(RQTestCase):
def test_save(self): # noqa def test_save(self): # noqa
"""Storing jobs.""" """Storing jobs."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Saving creates a Redis hash # Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False) self.assertEquals(self.testconn.exists(job.key), False)
@ -152,7 +173,7 @@ class TestJob(RQTestCase):
def test_persistence_of_typical_jobs(self): def test_persistence_of_typical_jobs(self):
"""Storing typical jobs.""" """Storing typical jobs."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
expected_date = strip_microseconds(job.created_at) expected_date = strip_microseconds(job.created_at)
@ -168,15 +189,15 @@ class TestJob(RQTestCase):
def test_persistence_of_parent_job(self): def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation) parent_job = Job.create(func=fixtures.some_calculation)
parent_job.save() parent_job.save()
job = Job.create(func=some_calculation, depends_on=parent_job) job = Job.create(func=fixtures.some_calculation, depends_on=parent_job)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job) self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=some_calculation, depends_on=parent_job.id) job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
@ -184,7 +205,7 @@ class TestJob(RQTestCase):
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch.""" """Store, then fetch."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
@ -203,7 +224,7 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self): def test_fetching_unreadable_data(self):
"""Fetching succeeds on unreadable data, but lazy props fail.""" """Fetching succeeds on unreadable data, but lazy props fail."""
# Set up # Set up
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
# Just replace the data hkey with some random noise # Just replace the data hkey with some random noise
@ -216,7 +237,7 @@ class TestJob(RQTestCase):
def test_job_is_unimportable(self): def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access.""" """Jobs that cannot be imported throw exception on access."""
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save() job.save()
# Now slightly modify the job to make it unimportable (this is # Now slightly modify the job to make it unimportable (this is
@ -232,7 +253,7 @@ class TestJob(RQTestCase):
def test_custom_meta_is_persisted(self): def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly.""" """Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.meta['foo'] = 'bar' job.meta['foo'] = 'bar'
job.save() job.save()
@ -244,25 +265,25 @@ class TestJob(RQTestCase):
def test_result_ttl_is_persisted(self): def test_result_ttl_is_persisted(self):
"""Ensure that job's result_ttl is set properly""" """Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) job = Job.create(func=fixtures.say_hello, args=('Lionel',), result_ttl=10)
job.save() job.save()
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10) self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save() job.save()
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None) self.assertEqual(job.result_ttl, None)
def test_description_is_persisted(self): def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly""" """Ensure that job's custom description is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), description='Say hello!') job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!')
job.save() job.save()
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, 'Say hello!') self.assertEqual(job.description, 'Say hello!')
# Ensure job description is constructed from function call string # Ensure job description is constructed from function call string
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save() job.save()
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
if PY2: if PY2:
@ -270,42 +291,65 @@ class TestJob(RQTestCase):
else: else:
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_job_access_within_job_function(self): def test_job_access_outside_job_fails(self):
"""The current job is accessible within the job function.""" """The current job is accessible only within a job context."""
# Executing the job function from outside of RQ throws an exception
self.assertIsNone(get_current_job()) self.assertIsNone(get_current_job())
# Executing the job function from within the job works (and in def test_job_access_within_job_function(self):
# this case leads to the job ID being returned) """The current job is accessible within the job function."""
job = Job.create(func=access_self) q = Queue()
job.save() q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts
id = job.perform() w = Worker([q])
self.assertEqual(job.id, id) w.work(burst=True)
self.assertEqual(job.func, access_self)
# Ensure that get_current_job also works from within synchronous jobs def test_job_access_within_synchronous_job_function(self):
queue = Queue(async=False) queue = Queue(async=False)
job = queue.enqueue(access_self) queue.enqueue(fixtures.access_self)
id = job.perform()
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
def test_get_ttl(self): def test_get_result_ttl(self):
"""Getting job TTL.""" """Getting job result TTL."""
job_ttl = 1 job_result_ttl = 1
default_ttl = 2 default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl) job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=fixtures.say_hello, ttl=ttl)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), ttl)
self.assertEqual(job.get_ttl(), job_ttl) job = Job.create(func=fixtures.say_hello)
job = Job.create(func=say_hello)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None) self.assertEqual(job.get_ttl(), None)
def test_ttl_via_enqueue(self):
ttl = 1
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello, ttl=ttl)
self.assertEqual(job.get_ttl(), ttl)
def test_never_expire_during_execution(self):
"""Test what happens when job expires during execution"""
ttl = 1
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.long_running_job, args=(2,), ttl=ttl)
self.assertEqual(job.get_ttl(), ttl)
job.save()
job.perform()
self.assertEqual(job.get_ttl(), -1)
self.assertTrue(job.exists(job.id))
self.assertEqual(job.result, 'Done sleeping...')
def test_cleanup(self): def test_cleanup(self):
"""Test that jobs and results are expired properly.""" """Test that jobs and results are expired properly."""
job = Job.create(func=say_hello) job = Job.create(func=fixtures.say_hello)
job.save() job.save()
# Jobs with negative TTLs don't expire # Jobs with negative TTLs don't expire
@ -321,18 +365,24 @@ class TestJob(RQTestCase):
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self): def test_register_dependency(self):
"""Test that jobs updates the correct job dependents.""" """Ensure dependency registration works properly."""
job = Job.create(func=say_hello) origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=fixtures.say_hello, origin=origin)
job._dependency_id = 'id' job._dependency_id = 'id'
job.save() job.save()
self.assertEqual(registry.get_job_ids(), [])
job.register_dependency() job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)
self.assertEqual(registry.get_job_ids(), [job.id])
def test_cancel(self): def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis.""" """job.cancel() deletes itself & dependents mapping from Redis."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency() job2.register_dependency()
job.cancel() job.cancel()
self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.key))
@ -343,8 +393,30 @@ class TestJob(RQTestCase):
def test_create_job_with_id(self): def test_create_job_with_id(self):
"""test creating jobs with a custom ID""" """test creating jobs with a custom ID"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello, job_id="1234") job = queue.enqueue(fixtures.say_hello, job_id="1234")
self.assertEqual(job.id, "1234") self.assertEqual(job.id, "1234")
job.perform() job.perform()
self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234) self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234)
def test_get_call_string_unicode(self):
"""test call string with unicode keyword arguments"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject())
self.assertIsNotNone(job.get_call_string())
job.perform()
def test_create_job_with_ttl_should_have_ttl_after_enqueued(self):
"""test creating jobs with ttl and checks if get_jobs returns it properly [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10)
job = queue.get_jobs()[0]
self.assertEqual(job.ttl, 10)
def test_create_job_with_ttl_should_expire(self):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
time.sleep(1)
self.assertEqual(0, len(queue.get_jobs()))

@ -2,15 +2,16 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, Status
from rq.worker import Worker
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello, from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation) some_calculation)
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.worker import Worker
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -117,6 +118,7 @@ class TestQueue(RQTestCase):
# say_hello spec holds which queue this is sent to # say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar') job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id job_id = job.id
self.assertEqual(job.origin, q.name)
# Inspect data inside Redis # Inspect data inside Redis
q_key = 'rq:queue:default' q_key = 'rq:queue:default'
@ -131,14 +133,12 @@ class TestQueue(RQTestCase):
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar')) job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
# Preconditions # Preconditions
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at) self.assertIsNone(job.enqueued_at)
# Action # Action
q.enqueue_job(job) q.enqueue_job(job)
# Postconditions # Postconditions
self.assertEquals(job.origin, q.name)
self.assertIsNotNone(job.enqueued_at) self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self): def test_pop_job_id(self):
@ -173,6 +173,14 @@ class TestQueue(RQTestCase):
# ...and assert the queue count when down # ...and assert the queue count when down
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
def test_dequeue_deleted_jobs(self):
"""Dequeueing deleted jobs from queues don't blow the stack."""
q = Queue()
for _ in range(1, 1000):
job = q.enqueue(say_hello)
job.delete()
q.dequeue()
def test_dequeue_instance_method(self): def test_dequeue_instance_method(self):
"""Dequeueing instance method jobs from queues.""" """Dequeueing instance method jobs from queues."""
q = Queue() q = Queue()
@ -262,7 +270,7 @@ class TestQueue(RQTestCase):
"""Enqueueing a job sets its status to "queued".""" """Enqueueing a job sets its status to "queued"."""
q = Queue() q = Queue()
job = q.enqueue(say_hello) job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_explicit_args(self): def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args.""" """enqueue() works for both implicit/explicit args."""
@ -320,57 +328,101 @@ class TestQueue(RQTestCase):
self.assertEquals(len(Queue.all()), 3) self.assertEquals(len(Queue.all()), 3)
def test_enqueue_dependents(self): def test_enqueue_dependents(self):
"""Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" """Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue() q = Queue()
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save() parent_job.save()
job_1 = Job.create(func=say_hello, depends_on=parent_job) job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_1.save() job_2 = q.enqueue(say_hello, depends_on=parent_job)
job_1.register_dependency()
job_2 = Job.create(func=say_hello, depends_on=parent_job)
job_2.save()
job_2.register_dependency()
registry = DeferredJobRegistry(q.name, connection=self.testconn)
self.assertEqual(
set(registry.get_job_ids()),
set([job_1.id, job_2.id])
)
# After dependents is enqueued, job_1 and job_2 should be in queue # After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job) q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue("queue_1")
q_2 = Queue("queue_2")
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
# Each queue has its own DeferredJobRegistry
registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn)
self.assertEqual(
set(registry_1.get_job_ids()),
set([job_1.id])
)
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
self.assertEqual(
set(registry_2.get_job_ids()),
set([job_2.id])
)
# After dependents is enqueued, job_1 on queue_1 and
# job_2 should be in queue_2
self.assertEqual(q_1.job_ids, [])
self.assertEqual(q_2.job_ids, [])
q_1.enqueue_dependents(parent_job)
q_2.enqueue_dependents(parent_job)
self.assertEqual(set(q_1.job_ids), set([job_1.id]))
self.assertEqual(set(q_2.job_ids), set([job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key)) self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry_1.get_job_ids(), [])
self.assertEqual(registry_2.get_job_ids(), [])
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED) parent_job.set_status(JobStatus.FINISHED)
parent_job.save() parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects.""" """"Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
# We need to save the job for the ID to exist in redis
parent_job.save()
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id) q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED) parent_job.set_status(JobStatus.FINISHED)
parent_job.save() parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id) job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self): def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs still know their specified timeout after being scheduled as a dependency.""" """Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
@ -379,7 +431,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, 123) self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED) parent_job.set_status(JobStatus.FINISHED)
parent_job.save() parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
@ -441,7 +493,7 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().requeue(job.id) get_failed_queue().requeue(job.id)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_preserves_result_ttl(self): def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl.""" """Enqueueing persists result_ttl."""
@ -452,12 +504,23 @@ class TestFailedQueue(RQTestCase):
self.assertEqual(int(job_from_queue.result_ttl), 10) self.assertEqual(int(job_from_queue.result_ttl), 10)
def test_async_false(self): def test_async_false(self):
"""Executes a job immediately if async=False.""" """Job executes and cleaned up immediately if async=False."""
q = Queue(async=False) q = Queue(async=False)
job = q.enqueue(some_calculation, args=(2, 3)) job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6) self.assertEqual(job.return_value, 6)
self.assertNotEqual(self.testconn.ttl(job.key), -1)
def test_custom_job_class(self): def test_custom_job_class(self):
"""Ensure custom job class assignment works as expected.""" """Ensure custom job class assignment works as expected."""
q = Queue(job_class=CustomJob) q = Queue(job_class=CustomJob)
self.assertEqual(q.job_class, CustomJob) self.assertEqual(q.job_class, CustomJob)
def test_skip_queue(self):
"""Ensure the skip_queue option functions"""
q = Queue('foo')
job1 = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
assert q.dequeue() == job1
skip_job = q.enqueue(say_hello, at_front=True)
assert q.dequeue() == skip_job
assert q.dequeue() == job2

@ -1,11 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import from __future__ import absolute_import
from rq.job import Job from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.queue import FailedQueue, Queue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.registry import FinishedJobRegistry, StartedJobRegistry from rq.registry import (clean_registries, DeferredJobRegistry,
FinishedJobRegistry, StartedJobRegistry)
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello from tests.fixtures import div_by_zero, say_hello
@ -27,6 +29,10 @@ class TestRegistry(RQTestCase):
self.assertLess(self.testconn.zscore(self.registry.key, job.id), self.assertLess(self.testconn.zscore(self.registry.key, job.id),
timestamp + 1002) timestamp + 1002)
# Ensure that a timeout of -1 results in a score of -1
self.registry.add(job, -1)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), -1)
# Ensure that job is properly removed from sorted set # Ensure that job is properly removed from sorted set
self.registry.remove(job) self.registry.remove(job)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
@ -44,17 +50,31 @@ class TestRegistry(RQTestCase):
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar'])
def test_cleanup(self): def test_cleanup(self):
"""Moving expired jobs to FailedQueue.""" """Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn) failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty()) self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo')
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
self.registry.cleanup(1)
self.assertNotIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2)
self.registry.cleanup() self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
job.refresh()
self.assertEqual(job.status, JobStatus.FAILED)
def test_job_execution(self): def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution.""" """Job is removed from StartedJobRegistry after execution."""
@ -87,6 +107,21 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.count, 2) self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2) self.assertEqual(len(self.registry), 2)
def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries."""
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, 1, 'foo')
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
class TestFinishedJobRegistry(RQTestCase): class TestFinishedJobRegistry(RQTestCase):
@ -99,9 +134,13 @@ class TestFinishedJobRegistry(RQTestCase):
timestamp = current_timestamp() timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.registry.cleanup() self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar']) self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self): def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry.""" """Completed jobs are added to FinishedJobRegistry."""
@ -118,3 +157,18 @@ class TestFinishedJobRegistry(RQTestCase):
failed_job = queue.enqueue(div_by_zero) failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job) worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [job.id])
class TestDeferredRegistry(RQTestCase):
def setUp(self):
super(TestDeferredRegistry, self).setUp()
self.registry = DeferredJobRegistry(connection=self.testconn)
def test_add(self):
"""Adding a job to DeferredJobsRegistry."""
job = Job()
self.registry.add(job)
job_ids = [as_text(job_id) for job_id in
self.testconn.zrange(self.registry.key, 0, -1)]
self.assertEqual(job_ids, [job.id])

@ -3,17 +3,24 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import os import os
from datetime import timedelta
from rq import get_failed_queue, Queue, Worker, SimpleWorker from time import sleep
from rq.compat import as_text import signal
from rq.job import Job, Status import time
from rq.registry import StartedJobRegistry from multiprocessing import Process
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, say_hello, say_pid) div_by_zero, do_nothing, say_hello, say_pid)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker
from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -21,10 +28,35 @@ class CustomJob(Job):
class TestWorker(RQTestCase): class TestWorker(RQTestCase):
def test_create_worker(self): def test_create_worker(self):
"""Worker creation.""" """Worker creation using various inputs."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq]) # With single string argument
self.assertEquals(w.queues, [fooq, barq]) w = Worker('foo')
self.assertEquals(w.queues[0].name, 'foo')
# With list of strings
w = Worker(['foo', 'bar'])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With iterable of strings
w = Worker(iter(['foo', 'bar']))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With single Queue
w = Worker(Queue('foo'))
self.assertEquals(w.queues[0].name, 'foo')
# With iterable of Queues
w = Worker(iter([Queue('foo'), Queue('bar')]))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With list of Queues
w = Worker([Queue('foo'), Queue('bar')])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
def test_work_and_quit(self): def test_work_and_quit(self):
"""Worker processes work, then quits.""" """Worker processes work, then quits."""
@ -133,7 +165,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero) job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
w = Worker([q], exc_handler=black_hole) w = Worker([q], exception_handlers=black_hole)
w.work(burst=True) # should silently pass w.work(burst=True) # should silently pass
# Postconditions # Postconditions
@ -222,14 +254,14 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
job = q.enqueue(say_hello) job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.is_queued, True) self.assertEqual(job.is_queued, True)
self.assertEqual(job.is_finished, False) self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, False) self.assertEqual(job.is_failed, False)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.is_queued, False) self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, True) self.assertEqual(job.is_finished, True)
self.assertEqual(job.is_failed, False) self.assertEqual(job.is_failed, False)
@ -238,7 +270,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero, args=(1,)) job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FAILED) self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertEqual(job.is_queued, False) self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False) self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True) self.assertEqual(job.is_failed, True)
@ -251,13 +283,13 @@ class TestWorker(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
parent_job = q.enqueue(div_by_zero) parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertNotEqual(job.get_status(), Status.FINISHED) self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
def test_get_current_job(self): def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly""" """Ensure worker.get_current_job() works properly"""
@ -318,3 +350,195 @@ class TestWorker(RQTestCase):
'Expected at least some work done.') 'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Adam!') self.assertEquals(job.result, 'Hi there, Adam!')
self.assertEquals(job.description, '你好 世界!') self.assertEquals(job.description, '你好 世界!')
def test_suspend_worker_execution(self):
"""Test Pause Worker Execution"""
SENTINEL_FILE = '/tmp/rq-tests.txt'
try:
# Remove the sentinel if it is leftover from a previous test run
os.remove(SENTINEL_FILE)
except OSError as e:
if e.errno != 2:
raise
q = Queue()
q.enqueue(create_file, SENTINEL_FILE)
w = Worker([q])
suspend(self.testconn)
w.work(burst=True)
assert q.count == 1
# Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
resume(self.testconn)
w.work(burst=True)
assert q.count == 0
self.assertEquals(os.path.exists(SENTINEL_FILE), True)
def test_suspend_with_duration(self):
q = Queue()
for _ in range(5):
q.enqueue(do_nothing)
w = Worker([q])
# This suspends workers for working for 2 second
suspend(self.testconn, 2)
# So when this burst of work happens the queue should remain at 5
w.work(burst=True)
assert q.count == 5
sleep(3)
# The suspension should be expired now, and a burst of work should now clear the queue
w.work(burst=True)
assert q.count == 0
def test_worker_hash_(self):
"""Workers are hashed by their .name attribute"""
q = Queue('foo')
w1 = Worker([q], name="worker1")
w2 = Worker([q], name="worker2")
w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2)
def test_worker_sets_birth(self):
"""Ensure worker correctly sets worker birth date."""
q = Queue()
w = Worker([q])
w.register_birth()
birth_date = w.birth_date
self.assertIsNotNone(birth_date)
self.assertEquals(type(birth_date).__name__, 'datetime')
def test_worker_sets_death(self):
"""Ensure worker correctly sets worker death date."""
q = Queue()
w = Worker([q])
w.register_death()
death_date = w.death_date
self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime')
def test_clean_queue_registries(self):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
self.assertEqual(worker.last_cleaned_at, None)
worker.clean_registries()
self.assertNotEqual(worker.last_cleaned_at, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0)
def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0)
def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
if double_kill:
# give the worker time to switch signal handler
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
class TestWorkerShutdown(RQTestCase):
def setUp(self):
# we want tests to fail if signal are ignored and the work remain running,
# so set a signal to kill them after 5 seconds
signal.signal(signal.SIGALRM, self._timeout)
signal.alarm(5)
def _timeout(self, signal, frame):
raise AssertionError("test still running after 5 seconds, "
"likely the worker wasn't shutdown correctly")
@slow
def test_idle_worker_warm_shutdown(self):
"""worker with no ongoing job receiving single SIGTERM signal and shutting down"""
w = Worker('foo')
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(1)
self.assertFalse(w._stop_requested)
@slow
def test_working_worker_warm_shutdown(self):
"""worker with an ongoing job receiving single SIGTERM signal, allowing job to finish then shutting down"""
fooq = Queue('foo')
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_warm'
fooq.enqueue(create_file_after_timeout, sentinel_file, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(2)
self.assertTrue(w._stop_requested)
self.assertTrue(os.path.exists(sentinel_file))
@slow
def test_working_worker_cold_shutdown(self):
"""worker with an ongoing job receiving double SIGTERM signal and shutting down immediately"""
fooq = Queue('foo')
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_cold'
fooq.enqueue(create_file_after_timeout, sentinel_file, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), True))
p.start()
self.assertRaises(SystemExit, w.work)
p.join(1)
self.assertTrue(w._stop_requested)
self.assertFalse(os.path.exists(sentinel_file))

@ -1,5 +1,5 @@
[tox] [tox]
envlist=py26,py27,py33,py34,pypy,flake8 envlist=py26,py27,py33,py34,py35,pypy,flake8
[testenv] [testenv]
commands=py.test --cov rq {posargs} commands=py.test --cov rq {posargs}

Loading…
Cancel
Save