Merge branch 'master' into exception_handling

Conflicts:
	tests/fixtures.py
main
Selwin Ong 10 years ago
commit 3c3646cf91

@ -1,3 +1,4 @@
sudo: false
language: python
services:
- redis

@ -1,3 +1,35 @@
### 0.5.3
(June 3rd, 2015)
- Better API for instantiating Workers. Thanks @RyanMTB!
- Better support for unicode kwargs. Thanks @nealtodd and @brownstein!
- Workers now automatically cleans up job registries every hour
- Jobs in `FailedQueue` now have their statuses set properly
- `enqueue_call()` no longer ignores `ttl`. Thanks @mbodock!
- Improved logging. Thanks @trevorprater!
### 0.5.2
(April 14th, 2015)
- Support SSL connection to Redis (requires redis-py>=2.10)
- Fix to prevent deep call stacks with large queues
### 0.5.1
(March 9th, 2015)
- Resolve performance issue when queues contain many jobs
- Restore the ability to specify connection params in config
- Record `birth_date` and `death_date` on Worker
- Add support for SSL URLs in Redis (and `REDIS_SSL` config option)
- Fix encoding issues with non-ASCII characters in function arguments
- Fix Redis transaction management issue with job dependencies
### 0.5.0
(Jan 30th, 2015)

@ -0,0 +1,18 @@
all:
@grep -Ee '^[a-z].*:' Makefile | cut -d: -f1 | grep -vF all
clean:
rm -rf build/ dist/
release: clean
# Check if latest tag is the current head we're releasing
echo "Latest tag = $$(git tag | sort -nr | head -n1)"
echo "HEAD SHA = $$(git sha head)"
echo "Latest tag SHA = $$(git tag | sort -nr | head -n1 | xargs git sha)"
@test "$$(git sha head)" = "$$(git tag | sort -nr | head -n1 | xargs git sha)"
make force_release
force_release: clean
git push --tags
python setup.py sdist bdist_wheel
twine upload dist/*

@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It should be integrated in your web stack
easily.
RQ requires Redis >= 2.6.0.
RQ requires Redis >= 2.7.0.
[![Build status](https://travis-ci.org/nvie/rq.svg?branch=master)](https://secure.travis-ci.org/nvie/rq)
[![Downloads](https://pypip.in/d/rq/badge.svg)](https://pypi.python.org/pypi/rq)

@ -1,2 +1,2 @@
redis==2.7.0
redis>=2.7
click>=3.0.0

@ -7,8 +7,8 @@ import time
from functools import partial
import click
import redis
from redis import StrictRedis
from rq import Queue, Worker
from rq.logutils import setup_loghandlers
from rq.worker import WorkerStatus
@ -31,12 +31,28 @@ def get_redis_from_config(settings):
if settings.get('REDIS_URL') is not None:
return StrictRedis.from_url(settings['REDIS_URL'])
return StrictRedis(
host=settings.get('REDIS_HOST', 'localhost'),
port=settings.get('REDIS_PORT', 6379),
db=settings.get('REDIS_DB', 0),
password=settings.get('REDIS_PASSWORD', None),
)
kwargs = {
'host': settings.get('REDIS_HOST', 'localhost'),
'port': settings.get('REDIS_PORT', 6379),
'db': settings.get('REDIS_DB', 0),
'password': settings.get('REDIS_PASSWORD', None),
}
use_ssl = settings.get('REDIS_SSL', False)
if use_ssl:
# If SSL is required, we need to depend on redis-py being 2.10 at
# least
def safeint(x):
try:
return int(x)
except ValueError:
return 0
version_info = tuple(safeint(x) for x in redis.__version__.split('.'))
if not version_info >= (2, 10):
raise RuntimeError('Using SSL requires a redis-py version >= 2.10')
kwargs['ssl'] = use_ssl
return StrictRedis(**kwargs)
def pad(s, pad_to_length):
@ -118,7 +134,7 @@ def show_workers(queues, raw, by_queue):
else:
qs = Queue.all()
ws = Worker.all()
filter_queues = lambda x: x
filter_queues = (lambda x: x)
if not by_queue:
for w in ws:

@ -43,7 +43,7 @@ def use_connection(redis=None):
use of use_connection() and stacked connection contexts.
"""
assert len(_connection_stack) <= 1, \
'You should not mix Connection contexts with use_connection().'
'You should not mix Connection contexts with use_connection()'
release_local(_connection_stack)
if redis is None:
@ -67,7 +67,7 @@ def resolve_connection(connection=None):
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException('Could not resolve a Redis connection.')
raise NoRedisConnectionException('Could not resolve a Redis connection')
return connection

@ -11,10 +11,6 @@ class InvalidJobOperationError(Exception):
pass
class NoQueueError(Exception):
pass
class UnpickleError(Exception):
def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message, inner_exception)

@ -50,7 +50,7 @@ def unpickle(pickled_string):
try:
obj = loads(pickled_string)
except Exception as e:
raise UnpickleError('Could not unpickle.', pickled_string, e)
raise UnpickleError('Could not unpickle', pickled_string, e)
return obj
@ -99,9 +99,9 @@ class Job(object):
kwargs = {}
if not isinstance(args, (tuple, list)):
raise TypeError('{0!r} is not a valid args list.'.format(args))
raise TypeError('{0!r} is not a valid args list'.format(args))
if not isinstance(kwargs, dict):
raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs))
raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs))
job = cls(connection=connection)
if id is not None:
@ -116,7 +116,7 @@ class Job(object):
job._instance = func.__self__
job._func_name = func.__name__
elif inspect.isfunction(func) or inspect.isbuiltin(func):
job._func_name = '%s.%s' % (func.__module__, func.__name__)
job._func_name = '{0}.{1}'.format(func.__module__, func.__name__)
elif isinstance(func, string_types):
job._func_name = as_text(func)
elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
@ -212,7 +212,7 @@ class Job(object):
def data(self):
if self._data is UNEVALUATED:
if self._func_name is UNEVALUATED:
raise ValueError('Cannot build the job data.')
raise ValueError('Cannot build the job data')
if self._instance is UNEVALUATED:
self._instance = None
@ -317,7 +317,7 @@ class Job(object):
self.meta = {}
def __repr__(self): # noqa
return 'Job(%r, enqueued_at=%r)' % (self._id, self.enqueued_at)
return 'Job({0!r}, enqueued_at={1!r})'.format(self._id, self.enqueued_at)
# Data access
def get_id(self): # noqa
@ -331,7 +331,7 @@ class Job(object):
def set_id(self, value):
"""Sets a job ID for the given job."""
if not isinstance(value, string_types):
raise TypeError('id must be a string, not {0}.'.format(type(value)))
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
id = property(get_id, set_id)
@ -344,7 +344,7 @@ class Job(object):
@classmethod
def dependents_key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s:dependents' % (job_id,)
return 'rq:job:{0}:dependents'.format(job_id)
@property
def key(self):
@ -393,7 +393,7 @@ class Job(object):
key = self.key
obj = decode_redis_hash(self.connection.hgetall(key))
if len(obj) == 0:
raise NoSuchJobError('No such job: %s' % (key,))
raise NoSuchJobError('No such job: {0}'.format(key))
def to_date(date_str):
if date_str is None:
@ -417,6 +417,7 @@ class Job(object):
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def to_dict(self):
@ -447,6 +448,8 @@ class Job(object):
obj['dependency_id'] = self._dependency_id
if self.meta:
obj['meta'] = dumps(self.meta)
if self.ttl:
obj['ttl'] = self.ttl
return obj
@ -456,7 +459,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict())
self.cleanup(self.ttl)
self.cleanup(self.ttl, pipeline=connection)
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@ -485,6 +488,8 @@ class Job(object):
# Job execution
def perform(self): # noqa
"""Invokes the job function with the job arguments."""
self.connection.persist(self.key)
self.ttl = -1
_job_stack.push(self.id)
try:
self._result = self.func(*self.args, **self.kwargs)
@ -514,17 +519,14 @@ class Job(object):
if self.func_name is None:
return None
# Python 2/3 compatibility
try:
arg_list = [repr(arg).decode('utf-8') for arg in self.args]
except AttributeError:
arg_list = [repr(arg) for arg in self.args]
arg_list = [as_text(repr(arg)) for arg in self.args]
kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()]
kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()]
# Sort here because python 3.3 & 3.4 makes different call_string
arg_list += sorted(kwargs)
args = ', '.join(arg_list)
return '%s(%s)' % (self.func_name, args)
return '{0}({1})'.format(self.func_name, args)
def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually
@ -563,7 +565,7 @@ class Job(object):
connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
def __str__(self):
return '<Job %s: %s>' % (self.id, self.description)
return '<Job {0}: {1}>'.format(self.id, self.description)
# Job equality
def __eq__(self, other): # noqa

@ -49,7 +49,7 @@ class Queue(object):
"""
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: %s' % (queue_key,))
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix):]
return cls(name, connection=connection)
@ -58,7 +58,7 @@ class Queue(object):
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '%s%s' % (prefix, name)
self._key = '{0}{1}'.format(prefix, name)
self._default_timeout = default_timeout
self._async = async
@ -70,6 +70,9 @@ class Queue(object):
def __len__(self):
return self.count
def __iter__(self):
yield self
@property
def key(self):
"""Returns the Redis key for this Queue."""
@ -183,7 +186,7 @@ class Queue(object):
job = self.job_class.create(
func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=JobStatus.QUEUED,
result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on,
timeout=timeout, id=job_id, origin=self.name)
@ -194,11 +197,12 @@ class Queue(object):
if depends_on is not None:
if not isinstance(depends_on, self.job_class):
depends_on = Job(id=depends_on, connection=self.connection)
with self.connection.pipeline() as pipe:
with self.connection._pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != JobStatus.FINISHED:
pipe.multi()
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
@ -226,7 +230,7 @@ class Queue(object):
"""
if not isinstance(f, string_types) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed '
'by workers.')
'by workers')
# Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
@ -239,7 +243,7 @@ class Queue(object):
at_front = kwargs.pop('at_front', False)
if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)
@ -310,7 +314,7 @@ class Queue(object):
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.')
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)
@ -328,6 +332,7 @@ class Queue(object):
Returns a job_class instance, which can be executed or inspected.
"""
while True:
job_id = self.pop_job_id()
if job_id is None:
return None
@ -335,8 +340,7 @@ class Queue(object):
job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively
return self.dequeue()
continue
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
@ -381,22 +385,22 @@ class Queue(object):
# auto-generated by the @total_ordering decorator)
def __eq__(self, other): # noqa
if not isinstance(other, Queue):
raise TypeError('Cannot compare queues to other objects.')
raise TypeError('Cannot compare queues to other objects')
return self.name == other.name
def __lt__(self, other):
if not isinstance(other, Queue):
raise TypeError('Cannot compare queues to other objects.')
raise TypeError('Cannot compare queues to other objects')
return self.name < other.name
def __hash__(self):
return hash(self.name)
def __repr__(self): # noqa
return 'Queue(%r)' % (self.name,)
return 'Queue({0!r})'.format(self.name)
def __str__(self):
return '<Queue \'%s\'>' % (self.name,)
return '<Queue {0!r}>'.format(self.name)
class FailedQueue(Queue):
@ -432,7 +436,7 @@ class FailedQueue(Queue):
# Delete it from the failed queue (raise an error if that failed)
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
raise InvalidJobOperationError('Cannot requeue non-failed jobs')
job.set_status(JobStatus.QUEUED)
job.exc_info = None

@ -1,13 +1,16 @@
from .compat import as_text
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .job import Job, JobStatus
from .queue import FailedQueue
from .utils import current_timestamp
class BaseRegistry(object):
"""
Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp).
Base implementation of a job registry, implemented in Redis sorted set.
Each job is stored as a key in the registry, scored by expiration time
(unix timestamp).
"""
def __init__(self, name='default', connection=None):
@ -66,7 +69,7 @@ class StartedJobRegistry(BaseRegistry):
def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name
self.key = 'rq:wip:{0}'.format(name)
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue.
@ -80,9 +83,17 @@ class StartedJobRegistry(BaseRegistry):
if job_ids:
failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
try:
job = Job.fetch(job_id, connection=self.connection)
job.status = JobStatus.FAILED
job.save(pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass
pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute()
@ -97,7 +108,7 @@ class FinishedJobRegistry(BaseRegistry):
def __init__(self, name='default', connection=None):
super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name
self.key = 'rq:finished:{0}'.format(name)
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.
@ -117,10 +128,18 @@ class DeferredJobRegistry(BaseRegistry):
def __init__(self, name='default', connection=None):
super(DeferredJobRegistry, self).__init__(name, connection)
self.key = 'rq:deferred:%s' % name
self.key = 'rq:deferred:{0}'.format(name)
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()
registry = StartedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()

@ -48,7 +48,7 @@ class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)
'value ({0} seconds)'.format(self._timeout))
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises

@ -13,8 +13,9 @@ import datetime
import importlib
import logging
import sys
from collections import Iterable
from .compat import as_text, is_python_version
from .compat import as_text, is_python_version, string_types
class _Colorizer(object):
@ -205,6 +206,19 @@ def first(iterable, default=None, key=None):
return default
def is_nonstring_iterable(obj):
"""Returns whether the obj is an iterable, but not a string"""
return isinstance(obj, Iterable) and not isinstance(obj, string_types)
def ensure_list(obj):
"""
When passed an iterable of objects, does nothing, otherwise, it returns
a list with just that object in it.
"""
return obj if is_nonstring_iterable(obj) else [obj]
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
VERSION = '0.5.0'
VERSION = '0.5.3'

@ -12,18 +12,20 @@ import sys
import time
import traceback
import warnings
from datetime import timedelta
from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection
from .exceptions import DequeueTimeout, NoQueueError
from .exceptions import DequeueTimeout
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .registry import FinishedJobRegistry, StartedJobRegistry
from .queue import Queue, get_failed_queue
from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow
from .utils import (ensure_list, enum, import_attribute, make_colorizer,
utcformat, utcnow, utcparse)
from .version import VERSION
try:
@ -101,7 +103,7 @@ class Worker(object):
"""
prefix = cls.redis_worker_namespace_prefix
if not worker_key.startswith(prefix):
raise ValueError('Not a valid RQ worker key: %s' % (worker_key,))
raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key))
if connection is None:
connection = get_current_connection()
@ -125,8 +127,9 @@ class Worker(object):
if connection is None:
connection = get_current_connection()
self.connection = connection
if isinstance(queues, self.queue_class):
queues = [queues]
queues = [self.queue_class(name=q) if isinstance(q, text_type) else q
for q in ensure_list(queues)]
self._name = name
self.queues = queues
self.validate_queues()
@ -143,9 +146,10 @@ class Worker(object):
self._state = 'starting'
self._is_horse = False
self._horse_pid = 0
self._stopped = False
self._stop_requested = False
self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection)
self.last_cleaned_at = None
# By default, push the "move-to-failed-queue" exception handler onto
# the stack
@ -170,11 +174,9 @@ class Worker(object):
def validate_queues(self):
"""Sanity check for the given queues."""
if not iterable(self.queues):
raise ValueError('Argument queues not iterable.')
for queue in self.queues:
if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.')
raise TypeError('{0} is not of type {1} or text type'.format(queue, self.queue_class))
def queue_names(self):
"""Returns the queue names of this worker's queues."""
@ -195,7 +197,7 @@ class Worker(object):
if self._name is None:
hostname = socket.gethostname()
shortname, _, _ = hostname.partition('.')
self._name = '%s.%s' % (shortname, self.pid)
self._name = '{0}.{1}'.format(shortname, self.pid)
return self._name
@property
@ -225,15 +227,15 @@ class Worker(object):
This can be used to make `ps -ef` output more readable.
"""
setprocname('rq: %s' % (message,))
setprocname('rq: {0}'.format(message))
def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s' % (self.name,))
self.log.debug('Registering birth of worker {0}'.format(self.name))
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
raise ValueError('There exists an active worker named \'%s\' '
'already.' % (self.name,))
msg = 'There exists an active worker named {0!r} already'
raise ValueError(msg.format(self.name))
key = self.key
queues = ','.join(self.queue_names())
with self.connection._pipeline() as p:
@ -255,6 +257,20 @@ class Worker(object):
p.expire(self.key, 60)
p.execute()
@property
def birth_date(self):
"""Fetches birth date from Redis."""
birth_timestamp = self.connection.hget(self.key, 'birth')
if birth_timestamp is not None:
return utcparse(as_text(birth_timestamp))
@property
def death_date(self):
"""Fetches death date from Redis."""
death_timestamp = self.connection.hget(self.key, 'death')
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline=None):
self._state = state
connection = pipeline if pipeline is not None else self.connection
@ -302,10 +318,6 @@ class Worker(object):
return self.job_class.fetch(job_id, self.connection)
@property
def stopped(self):
return self._stopped
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
@ -314,18 +326,18 @@ class Worker(object):
def request_force_stop(signum, frame):
"""Terminates the application (cold shutdown).
"""
self.log.warning('Cold shut down.')
self.log.warning('Cold shut down')
# Take down the horse with the worker
if self.horse_pid:
msg = 'Taking down horse %d with me.' % self.horse_pid
msg = 'Taking down horse {0} with me'.format(self.horse_pid)
self.log.debug(msg)
try:
os.kill(self.horse_pid, signal.SIGKILL)
except OSError as e:
# ESRCH ("No such process") is fine with us
if e.errno != errno.ESRCH:
self.log.debug('Horse already down.')
self.log.debug('Horse already down')
raise
raise SystemExit()
@ -333,18 +345,18 @@ class Worker(object):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
self.log.debug('Got signal %s.' % signal_name(signum))
self.log.debug('Got signal {0}'.format(signal_name(signum)))
signal.signal(signal.SIGINT, request_force_stop)
signal.signal(signal.SIGTERM, request_force_stop)
msg = 'Warm shut down requested.'
msg = 'Warm shut down requested'
self.log.warning(msg)
# If shutdown is requested in the middle of a job, wait until
# finish before shutting down
if self.get_state() == 'busy':
self._stopped = True
self._stop_requested = True
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
@ -359,15 +371,15 @@ class Worker(object):
before_state = None
notified = False
while not self.stopped and is_suspended(self.connection):
while not self._stop_requested and is_suspended(self.connection):
if burst:
self.log.info('Suspended in burst mode -- exiting.'
'Note: There could still be unperformed jobs on the queue')
self.log.info('Suspended in burst mode, exiting')
self.log.info('Note: There could still be unfinished jobs on the queue')
raise StopRequested
if not notified:
self.log.info('Worker suspended, use "rq resume" command to resume')
self.log.info('Worker suspended, run `rq resume` to resume')
before_state = self.get_state()
self.set_state(WorkerStatus.SUSPENDED)
notified = True
@ -390,7 +402,7 @@ class Worker(object):
did_perform_work = False
self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION)
self.log.info("RQ worker {0!r} started, version %s".format(self.key, VERSION))
self.set_state(WorkerStatus.STARTED)
try:
@ -398,14 +410,19 @@ class Worker(object):
try:
self.check_for_suspension(burst)
if self.stopped:
self.log.info('Stopping on request.')
if self.should_run_maintenance_tasks:
self.clean_registries()
if self._stop_requested:
self.log.info('Stopping on request')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if burst:
self.log.info("RQ worker {0!r} done, quitting".format(self.key))
break
except StopRequested:
break
@ -429,10 +446,9 @@ class Worker(object):
qnames = self.queue_names()
self.set_state(WorkerStatus.IDLE)
self.procline('Listening on %s' % ','.join(qnames))
self.procline('Listening on {0}'.format(','.join(qnames)))
self.log.info('')
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
while True:
self.heartbeat()
@ -442,7 +458,7 @@ class Worker(object):
connection=self.connection)
if result is not None:
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
self.log.info('{0}: {1} ({2})'.format(green(queue.name),
blue(job.description), job.id))
break
@ -480,7 +496,7 @@ class Worker(object):
self.main_work_horse(job)
else:
self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time()))
self.procline('Forked {0} at {0}'.format(child_pid, time.time()))
while True:
try:
self.set_state('busy')
@ -535,9 +551,8 @@ class Worker(object):
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
@ -582,14 +597,14 @@ class Worker(object):
if rv is None:
self.log.info('Job OK')
else:
self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),))
self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv))))
if result_ttl == 0:
self.log.info('Result discarded immediately.')
self.log.info('Result discarded immediately')
elif result_ttl > 0:
self.log.info('Result is kept for %d seconds.' % result_ttl)
self.log.info('Result is kept for {0} seconds'.format(result_ttl))
else:
self.log.warning('Result will never expire, clean up result key manually.')
self.log.warning('Result will never expire, clean up result key manually')
return True
@ -605,7 +620,7 @@ class Worker(object):
})
for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,))
self.log.debug('Invoking exception handler {0}'.format(handler))
fallthrough = handler(job, *exc_info)
# Only handlers with explicit return values should disable further
@ -619,7 +634,7 @@ class Worker(object):
def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue."""
exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.warning('Moving job to %s queue.' % self.failed_queue.name)
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
self.failed_queue.quarantine(job, exc_info=exc_string)
def push_exc_handler(self, handler_func):
@ -640,13 +655,23 @@ class Worker(object):
"""The hash does not take the database/connection into account"""
return hash(self.name)
def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
clean_registries(queue)
self.last_cleaned_at = utcnow()
class SimpleWorker(Worker):
def _install_signal_handlers(self, *args, **kwargs):
"""Signal handlers are useless for test worker, as it
does not have fork() ability"""
pass
@property
def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour."""
if self.last_cleaned_at is None:
return True
if (utcnow() - self.last_cleaned_at) > timedelta(hours=1):
return True
return False
class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs):
raise NotImplementedError("Test worker does not implement this method")

@ -51,13 +51,16 @@ setup(
'rqworker = rq.cli:worker',
],
},
extras_require={
':python_version=="2.6"': ['argparse', 'importlib'],
},
classifiers=[
# As from http://pypi.python.org/pypi?%3Aaction=list_classifiers
#'Development Status :: 1 - Planning',
#'Development Status :: 2 - Pre-Alpha',
#'Development Status :: 3 - Alpha',
'Development Status :: 4 - Beta',
#'Development Status :: 5 - Production/Stable',
#'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
#'Development Status :: 6 - Mature',
#'Development Status :: 7 - Inactive',
'Intended Audience :: Developers',

@ -11,6 +11,7 @@ import time
from rq import Connection, get_current_job
from rq.decorators import job
from rq.compat import PY2
def say_pid():
@ -54,8 +55,7 @@ def create_file_after_timeout(path, timeout):
def access_self():
job = get_current_job()
return job.id
assert get_current_job() is not None
def echo(*args, **kwargs):
@ -79,15 +79,25 @@ class CallableObject(object):
return u"I'm callable"
class UnicodeStringObject(object):
def __repr__(self):
if PY2:
return u'é'.encode('utf-8')
else:
return u'é'
with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y
def long_running_job():
time.sleep(10)
def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
def long_running_job(timeout=10):
time.sleep(timeout)
return 'Done sleeping...'

@ -3,18 +3,20 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime
import time
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation)
from tests.helpers import strip_microseconds
from rq.compat import as_text, PY2
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job
from rq.job import Job, get_current_job
from rq.queue import Queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat
from rq.worker import Worker
from . import fixtures
try:
from cPickle import loads, dumps
@ -31,16 +33,16 @@ class TestJob(RQTestCase):
kwargs=dict(snowman="", null=None),
)
try:
# Python 2
test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
except AttributeError:
if not PY2:
# Python 3
test_string = "myfunc(12, '', null=None, snowman='')"
expected_string = "myfunc(12, '', null=None, snowman='')"
else:
# Python 2
expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8')
self.assertEquals(
job.description,
test_string,
expected_string,
)
def test_create_empty_job(self):
@ -69,7 +71,7 @@ class TestJob(RQTestCase):
def test_create_typical_job(self):
"""Creation of jobs for function calls."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Jobs have a random UUID
self.assertIsNotNone(job.id)
@ -78,7 +80,7 @@ class TestJob(RQTestCase):
self.assertIsNone(job.instance)
# Job data is set...
self.assertEquals(job.func, some_calculation)
self.assertEquals(job.func, fixtures.some_calculation)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2})
@ -89,7 +91,7 @@ class TestJob(RQTestCase):
def test_create_instance_method_job(self):
"""Creation of jobs for instance methods."""
n = Number(2)
n = fixtures.Number(2)
job = Job.create(func=n.div, args=(4,))
# Job data is set
@ -102,13 +104,13 @@ class TestJob(RQTestCase):
job = Job.create(func='tests.fixtures.say_hello', args=('World',))
# Job data is set
self.assertEquals(job.func, say_hello)
self.assertEquals(job.func, fixtures.say_hello)
self.assertIsNone(job.instance)
self.assertEquals(job.args, ('World',))
def test_create_job_from_callable_class(self):
"""Creation of jobs using a callable class specifier."""
kallable = CallableObject()
kallable = fixtures.CallableObject()
job = Job.create(func=kallable)
self.assertEquals(job.func, kallable.__call__)
@ -137,7 +139,7 @@ class TestJob(RQTestCase):
def test_save(self): # noqa
"""Storing jobs."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
# Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False)
@ -173,7 +175,7 @@ class TestJob(RQTestCase):
def test_persistence_of_typical_jobs(self):
"""Storing typical jobs."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save()
expected_date = strip_microseconds(job.created_at)
@ -189,15 +191,15 @@ class TestJob(RQTestCase):
def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation)
parent_job = Job.create(func=fixtures.some_calculation)
parent_job.save()
job = Job.create(func=some_calculation, depends_on=parent_job)
job = Job.create(func=fixtures.some_calculation, depends_on=parent_job)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=some_calculation, depends_on=parent_job.id)
job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
@ -205,7 +207,7 @@ class TestJob(RQTestCase):
def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save()
job2 = Job.fetch(job.id)
@ -224,7 +226,7 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self):
"""Fetching succeeds on unreadable data, but lazy props fail."""
# Set up
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save()
# Just replace the data hkey with some random noise
@ -237,7 +239,7 @@ class TestJob(RQTestCase):
def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access."""
job = Job.create(func=say_hello, args=('Lionel',))
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
# Now slightly modify the job to make it unimportable (this is
@ -253,7 +255,7 @@ class TestJob(RQTestCase):
def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=say_hello, args=('Lionel',))
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.meta['foo'] = 'bar'
job.save()
@ -265,25 +267,25 @@ class TestJob(RQTestCase):
def test_result_ttl_is_persisted(self):
"""Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10)
job = Job.create(func=fixtures.say_hello, args=('Lionel',), result_ttl=10)
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',))
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)
def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), description='Say hello!')
job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!')
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, 'Say hello!')
# Ensure job description is constructed from function call string
job = Job.create(func=say_hello, args=('Lionel',))
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
if PY2:
@ -291,35 +293,30 @@ class TestJob(RQTestCase):
else:
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
# Executing the job function from outside of RQ throws an exception
def test_job_access_outside_job_fails(self):
"""The current job is accessible only within a job context."""
self.assertIsNone(get_current_job())
# Executing the job function from within the job works (and in
# this case leads to the job ID being returned)
job = Job.create(func=access_self)
job.save()
id = job.perform()
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
q = Queue()
q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts
w = Worker([q])
w.work(burst=True)
# Ensure that get_current_job also works from within synchronous jobs
def test_job_access_within_synchronous_job_function(self):
queue = Queue(async=False)
job = queue.enqueue(access_self)
id = job.perform()
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
queue.enqueue(fixtures.access_self)
def test_get_result_ttl(self):
"""Getting job result TTL."""
job_result_ttl = 1
default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_result_ttl)
job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=say_hello)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
@ -327,16 +324,34 @@ class TestJob(RQTestCase):
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=say_hello, ttl=ttl)
job = Job.create(func=fixtures.say_hello, ttl=ttl)
job.save()
self.assertEqual(job.get_ttl(), ttl)
job = Job.create(func=say_hello)
job = Job.create(func=fixtures.say_hello)
job.save()
self.assertEqual(job.get_ttl(), None)
def test_ttl_via_enqueue(self):
ttl = 1
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello, ttl=ttl)
self.assertEqual(job.get_ttl(), ttl)
def test_never_expire_during_execution(self):
"""Test what happens when job expires during execution"""
ttl = 1
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.long_running_job, args=(2,), ttl=ttl)
self.assertEqual(job.get_ttl(), ttl)
job.save()
job.perform()
self.assertEqual(job.get_ttl(), -1)
self.assertTrue(job.exists(job.id))
self.assertEqual(job.result, 'Done sleeping...')
def test_cleanup(self):
"""Test that jobs and results are expired properly."""
job = Job.create(func=say_hello)
job = Job.create(func=fixtures.say_hello)
job.save()
# Jobs with negative TTLs don't expire
@ -356,7 +371,7 @@ class TestJob(RQTestCase):
origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=say_hello, origin=origin)
job = Job.create(func=fixtures.say_hello, origin=origin)
job._dependency_id = 'id'
job.save()
@ -368,8 +383,8 @@ class TestJob(RQTestCase):
def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
job2 = Job.create(func=say_hello, depends_on=job)
job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency()
job.cancel()
self.assertFalse(self.testconn.exists(job.key))
@ -380,8 +395,30 @@ class TestJob(RQTestCase):
def test_create_job_with_id(self):
"""test creating jobs with a custom ID"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello, job_id="1234")
job = queue.enqueue(fixtures.say_hello, job_id="1234")
self.assertEqual(job.id, "1234")
job.perform()
self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234)
self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234)
def test_get_call_string_unicode(self):
"""test call string with unicode keyword arguments"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject())
self.assertIsNotNone(job.get_call_string())
job.perform()
def test_create_job_with_ttl_should_have_ttl_after_enqueued(self):
"""test creating jobs with ttl and checks if get_jobs returns it properly [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10)
job = queue.get_jobs()[0]
self.assertEqual(job.ttl, 10)
def test_create_job_with_ttl_should_expire(self):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
time.sleep(1)
self.assertEqual(0, len(queue.get_jobs()))

@ -173,6 +173,14 @@ class TestQueue(RQTestCase):
# ...and assert the queue count when down
self.assertEquals(q.count, 0)
def test_dequeue_deleted_jobs(self):
"""Dequeueing deleted jobs from queues don't blow the stack."""
q = Queue()
for _ in range(1, 1000):
job = q.enqueue(say_hello)
job.delete()
q.dequeue()
def test_dequeue_instance_method(self):
"""Dequeueing instance method jobs from queues."""
q = Queue()

@ -2,12 +2,12 @@
from __future__ import absolute_import
from rq.compat import as_text
from rq.job import Job
from rq.job import Job, JobStatus
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import (DeferredJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from rq.registry import (clean_registries, DeferredJobRegistry,
FinishedJobRegistry, StartedJobRegistry)
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
@ -60,15 +60,21 @@ class TestRegistry(RQTestCase):
"""Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 2, 'foo')
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2)
self.assertNotIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2)
self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
self.assertIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
job.refresh()
self.assertEqual(job.status, JobStatus.FAILED)
def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""
@ -101,6 +107,21 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries."""
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, 1, 'foo')
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
class TestFinishedJobRegistry(RQTestCase):

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
from datetime import timedelta
from time import sleep
from tests import RQTestCase, slow
@ -15,6 +16,7 @@ from rq.compat import as_text
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
class CustomJob(Job):
@ -23,10 +25,35 @@ class CustomJob(Job):
class TestWorker(RQTestCase):
def test_create_worker(self):
"""Worker creation."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq])
"""Worker creation using various inputs."""
# With single string argument
w = Worker('foo')
self.assertEquals(w.queues[0].name, 'foo')
# With list of strings
w = Worker(['foo', 'bar'])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With iterable of strings
w = Worker(iter(['foo', 'bar']))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With single Queue
w = Worker(Queue('foo'))
self.assertEquals(w.queues[0].name, 'foo')
# With iterable of Queues
w = Worker(iter([Queue('foo'), Queue('bar')]))
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
# With list of Queues
w = Worker([Queue('foo'), Queue('bar')])
self.assertEquals(w.queues[0].name, 'foo')
self.assertEquals(w.queues[1].name, 'bar')
def test_work_and_quit(self):
"""Worker processes work, then quits."""
@ -379,3 +406,65 @@ class TestWorker(RQTestCase):
w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2)
def test_worker_sets_birth(self):
"""Ensure worker correctly sets worker birth date."""
q = Queue()
w = Worker([q])
w.register_birth()
birth_date = w.birth_date
self.assertIsNotNone(birth_date)
self.assertEquals(type(birth_date).__name__, 'datetime')
def test_worker_sets_death(self):
"""Ensure worker correctly sets worker death date."""
q = Queue()
w = Worker([q])
w.register_death()
death_date = w.death_date
self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime')
def test_clean_queue_registries(self):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
self.assertEqual(worker.last_cleaned_at, None)
worker.clean_registries()
self.assertNotEqual(worker.last_cleaned_at, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0)
def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0)

Loading…
Cancel
Save