merge && updates to add multiple exception handlers.

main
Bradley Young 10 years ago
commit d3e4fb567b

@ -1,2 +1,2 @@
redis
click
redis==2.7.0
click>=3.0.0

@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')
config_option = click.option('--config', '-c', help='Module containing RQ settings.')
def connect(url):
def connect(url, config=None):
settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')
@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@ -158,11 +163,16 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
exc_handler = []
exception_handlers = []
for h in exception_handler:
exc_handler.append(import_attribute(h))
exception_handlers.append(import_attribute(h))
if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)
try:
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
@ -170,7 +180,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=job_class,
exc_handler=exc_handler)
exception_handlers=exception_handlers)
# Should we configure Sentry?
if sentry_dsn:
@ -183,3 +193,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e:
print(e)
sys.exit(1)
@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)
connection = connect(url, config)
connection_suspend(connection, duration)
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
@main.command()
@url_option
@config_option
def resume(url, config):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
click.echo("Resuming workers.")

@ -8,7 +8,9 @@ from functools import partial
import click
from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
@ -39,8 +41,9 @@ def get_scale(x):
def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
WorkerStatus.BUSY: red('busy'),
WorkerStatus.IDLE: green('idle'),
WorkerStatus.SUSPENDED: yellow('suspended'),
}
try:
return symbols[state]

@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse
from .utils import import_attribute, utcformat, utcnow, utcparse, enum
try:
import cPickle as pickle
@ -25,16 +25,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)
Status = enum('Status',
JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
@ -92,7 +83,7 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None,
result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@ -131,6 +122,7 @@ class Job(object):
# Extra meta data
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job.ttl = ttl
job.timeout = timeout
job._status = status
@ -166,19 +158,19 @@ class Job(object):
@property
def is_finished(self):
return self.get_status() == Status.FINISHED
return self.get_status() == JobStatus.FINISHED
@property
def is_queued(self):
return self.get_status() == Status.QUEUED
return self.get_status() == JobStatus.QUEUED
@property
def is_failed(self):
return self.get_status() == Status.FAILED
return self.get_status() == JobStatus.FAILED
@property
def is_started(self):
return self.get_status() == Status.STARTED
return self.get_status() == JobStatus.STARTED
@property
def dependency(self):
@ -311,6 +303,7 @@ class Job(object):
self.exc_info = None
self.timeout = None
self.result_ttl = None
self.ttl = None
self._status = None
self._dependency_id = None
self.meta = {}
@ -455,6 +448,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict())
self.cleanup(self.ttl)
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@ -491,8 +485,15 @@ class Job(object):
return self._result
def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result
will be persisted. In the future, this method will also be responsible
"""Returns ttl for a job that determines how long a job will be
persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.result_ttl is None else self.result_ttl
@ -513,14 +514,16 @@ class Job(object):
def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its
result depends on the value of result_ttl:
- If result_ttl is 0, cleanup the job immediately.
result depends on the value of ttl:
- If ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist
- If ttl is negative, don't set an expiry to it (persist
forever)
"""
if ttl == 0:
self.cancel()
elif not ttl:
return
elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)

@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
import uuid
from .connections import resolve_connection
from .job import Job, Status
from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
@ -149,7 +149,7 @@ class Queue(object):
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
guaranteeing FIFO semantics.
"""
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
@ -161,14 +161,18 @@ class Queue(object):
if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id, pipeline=None):
"""Pushes a job ID on the corresponding Redis queue."""
def push_job_id(self, job_id, pipeline=None, at_front=False):
"""Pushes a job ID on the corresponding Redis queue.
'at_front' allows you to push the job onto the front instead of the back of the queue"""
connection = pipeline if pipeline is not None else self.connection
if at_front:
connection.lpush(self.key, job_id)
else:
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None,
job_id=None):
result_ttl=None, ttl=None, description=None,
depends_on=None, job_id=None, at_front=False):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -180,7 +184,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
@ -189,11 +193,13 @@ class Queue(object):
# If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry
if depends_on is not None:
if not isinstance(depends_on, self.job_class):
depends_on = Job(id=depends_on, connection=self.connection)
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED:
if depends_on.get_status() != JobStatus.FINISHED:
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
@ -202,7 +208,7 @@ class Queue(object):
except WatchError:
continue
return self.enqueue_job(job)
return self.enqueue_job(job, at_front=at_front)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues
@ -227,8 +233,10 @@ class Queue(object):
timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
@ -236,11 +244,11 @@ class Queue(object):
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
description=description, depends_on=depends_on,
job_id=job_id)
job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True):
def enqueue_job(self, job, set_meta_data=True, at_front=False):
"""Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
@ -260,7 +268,7 @@ class Queue(object):
job.save()
if self._async:
self.push_job_id(job.id)
self.push_job_id(job.id, at_front=at_front)
else:
job.perform()
job.save()
@ -388,7 +396,7 @@ class Queue(object):
class FailedQueue(Queue):
def __init__(self, connection=None):
super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
@ -415,7 +423,7 @@ class FailedQueue(Queue):
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.set_status(Status.QUEUED)
job.set_status(JobStatus.QUEUED)
job.exc_info = None
q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)

@ -8,9 +8,6 @@ class BaseRegistry(object):
"""
Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp).
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
"""
def __init__(self, name='default', connection=None):
@ -27,9 +24,9 @@ class BaseRegistry(object):
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job, timeout, pipeline=None):
"""Adds a job to StartedJobRegistry with expiry time of now + timeout."""
score = current_timestamp() + timeout
def add(self, job, ttl, pipeline=None):
"""Adds a job to a registry with expiry time of now + ttl."""
score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None:
return pipeline.zadd(self.key, score, job.id)
@ -39,10 +36,16 @@ class BaseRegistry(object):
connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id)
def get_expired_job_ids(self):
"""Returns job ids whose score are less than current timestamp."""
def get_expired_job_ids(self, timestamp=None):
"""Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
specified as seconds since the Unix epoch. timestamp defaults to call
time if unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, current_timestamp())]
self.connection.zrangebyscore(self.key, 0, score)]
def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids."""
@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
"""
def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name
def cleanup(self):
"""Remove expired jobs from registry and add them to FailedQueue."""
job_ids = self.get_expired_job_ids()
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.
"""
score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)
if job_ids:
failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.zremrangebyscore(self.key, 0, current_timestamp())
pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute()
return job_ids
@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry):
super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name
def cleanup(self):
"""Remove expired jobs from registry."""
self.connection.zremrangebyscore(self.key, 0, current_timestamp())
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)

@ -0,0 +1,18 @@
WORKERS_SUSPENDED = 'rq:suspended'
def is_suspended(connection):
return connection.exists(WORKERS_SUSPENDED)
def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)
def resume(connection):
return connection.delete(WORKERS_SUSPENDED)

@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

@ -12,18 +12,20 @@ import sys
import time
import traceback
import warnings
from datetime import datetime
from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection
from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, Status
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended
try:
from procname import setprocname
@ -65,6 +67,15 @@ def signal_name(signum):
return 'SIG_UNKNOWN'
WorkerStatus = enum(
'WorkerStatus',
STARTED='started',
SUSPENDED='suspended',
BUSY='busy',
IDLE='idle'
)
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers'
@ -162,11 +173,11 @@ class Worker(object):
def queue_names(self):
"""Returns the queue names of this worker's queues."""
return map(lambda q: q.name, self.queues)
return list(map(lambda q: q.name, self.queues))
def queue_keys(self):
"""Returns the Redis keys representing this worker's queues."""
return map(lambda q: q.key, self.queues)
return list(map(lambda q: q.key, self.queues))
@property
def name(self):
@ -337,6 +348,30 @@ class Worker(object):
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`"""
before_state = None
notified = False
while not self.stopped and is_suspended(self.connection):
if burst:
self.log.info('Suspended in burst mode -- exiting.'
'Note: There could still be unperformed jobs on the queue')
raise StopRequested
if not notified:
self.log.info('Worker suspended, use "rq resume" command to resume')
before_state = self.get_state()
self.set_state(WorkerStatus.SUSPENDED)
notified = True
time.sleep(1)
if before_state:
self.set_state(before_state)
def work(self, burst=False):
"""Starts the work loop.
@ -352,15 +387,19 @@ class Worker(object):
did_perform_work = False
self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION)
self.set_state('starting')
self.set_state(WorkerStatus.STARTED)
try:
while True:
try:
self.check_for_suspension(burst)
if self.stopped:
self.log.info('Stopping on request.')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
break
@ -371,20 +410,22 @@ class Worker(object):
self.execute_job(job)
self.heartbeat()
if job.get_status() == Status.FINISHED:
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)
did_perform_work = True
finally:
if not self.is_horse:
self.register_death()
return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
self.set_state('idle')
self.set_state(WorkerStatus.IDLE)
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
@ -439,7 +480,9 @@ class Worker(object):
self.procline('Forked %d at %d' % (child_pid, time.time()))
while True:
try:
self.set_state('busy')
os.waitpid(child_pid, 0)
self.set_state('idle')
break
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
@ -481,12 +524,12 @@ class Worker(object):
timeout = (job.timeout or 180) + 60
with self.connection._pipeline() as pipeline:
self.set_state('busy', pipeline=pipeline)
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(Status.STARTED, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % (
@ -512,10 +555,10 @@ class Worker(object):
self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
job._status = Status.FINISHED
job._status = JobStatus.FINISHED
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
@ -527,7 +570,7 @@ class Worker(object):
pipeline.execute()
except Exception:
job.set_status(Status.FAILED, pipeline=pipeline)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())
@ -584,6 +627,16 @@ class Worker(object):
"""Pops the latest exception handler off of the exc handler stack."""
return self._exc_handlers.pop()
def __eq__(self, other):
"""Equality does not take the database/connection into account"""
if not isinstance(other, self.__class__):
raise TypeError('Cannot compare workers to other types (of workers)')
return self.name == other.name
def __hash__(self):
"""The hash does not take the database/connection into account"""
return hash(self.name)
class SimpleWorker(Worker):
def _install_signal_handlers(self, *args, **kwargs):

@ -26,6 +26,17 @@ class TestCommandLine(TestCase):
class TestRQCli(RQTestCase):
def assert_normal_execution(self, result):
if result.exit_code == 0:
return True
else:
print("Non normal execution")
print("Exit Code: {}".format(result.exit_code))
print("Output: {}".format(result.output))
print("Exception: {}".format(result.exception))
self.assertEqual(result.exit_code, 0)
"""Test rq_cli script"""
def setUp(self):
super(TestRQCli, self).setUp()
@ -41,30 +52,30 @@ class TestRQCli(RQTestCase):
"""rq empty -u <url> failed"""
runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self):
"""rq requeue -u <url> --all"""
runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
def test_info(self):
"""rq info -u <url>"""
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output)
def test_worker(self):
"""rq worker -u <url> -b"""
runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
def test_exc_handler(self):
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
q = Queue()
failed_q = get_failed_queue()
@ -87,3 +98,31 @@ class TestRQCli(RQTestCase):
# Check the job
job = Job.fetch(job.id)
self.assertEquals(job.is_failed, True)
def test_suspend_and_resume(self):
"""rq suspend -u <url>
rq resume -u <url>
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url])
self.assert_normal_execution(result)
result = runner.invoke(main, ['resume', '-u', self.redis_url])
self.assert_normal_execution(result)
def test_suspend_with_ttl(self):
"""rq suspend -u <url> --duration=2
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
self.assert_normal_execution(result)
def test_suspend_with_invalid_ttl(self):
"""rq suspend -u <url> --duration=0
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)

@ -290,17 +290,27 @@ class TestJob(RQTestCase):
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
def test_get_ttl(self):
"""Getting job TTL."""
job_ttl = 1
def test_get_result_ttl(self):
"""Getting job result TTL."""
job_result_ttl = 1
default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl)
job = Job.create(func=say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=say_hello, ttl=ttl)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl)
self.assertEqual(job.get_ttl(), job_ttl)
self.assertEqual(job.get_ttl(), ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None)
def test_cleanup(self):

@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, Status
from rq.job import Job, JobStatus
from rq.worker import Worker
from tests import RQTestCase
@ -262,7 +262,7 @@ class TestQueue(RQTestCase):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
@ -346,12 +346,27 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED)
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects."""
parent_job = Job.create(func=say_hello)
q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs still know their specified timeout after being scheduled as a dependency."""
# Job with unfinished dependency is not immediately enqueued
@ -362,7 +377,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED)
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id])
@ -424,7 +439,7 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl."""
@ -444,3 +459,13 @@ class TestFailedQueue(RQTestCase):
"""Ensure custom job class assignment works as expected."""
q = Queue(job_class=CustomJob)
self.assertEqual(q.job_class, CustomJob)
def test_skip_queue(self):
"""Ensure the skip_queue option functions"""
q = Queue('foo')
job1 = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
assert q.dequeue() == job1
skip_job = q.enqueue(say_hello, at_front=True)
assert q.dequeue() == skip_job
assert q.dequeue() == job2

@ -27,6 +27,10 @@ class TestRegistry(RQTestCase):
self.assertLess(self.testconn.zscore(self.registry.key, job.id),
timestamp + 1002)
# Ensure that a timeout of -1 results in a score of -1
self.registry.add(job, -1)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), -1)
# Ensure that job is properly removed from sorted set
self.registry.remove(job)
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
@ -44,14 +48,22 @@ class TestRegistry(RQTestCase):
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar'])
def test_cleanup(self):
"""Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, 2, 'foo')
self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2)
self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
@ -99,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase):
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar'])
self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry."""

@ -3,15 +3,17 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
from time import sleep
from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text
from rq.job import Job, Status
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import suspend, resume
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, say_hello, say_pid)
div_by_zero, say_hello, say_pid, do_nothing)
from tests.helpers import strip_microseconds
@ -133,7 +135,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1)
w = Worker([q], exc_handler=black_hole)
w = Worker([q], exception_handlers=black_hole)
w.work(burst=True) # should silently pass
# Postconditions
@ -222,14 +224,14 @@ class TestWorker(RQTestCase):
w = Worker([q])
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.is_queued, True)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, False)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, True)
self.assertEqual(job.is_failed, False)
@ -238,7 +240,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FAILED)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
@ -251,13 +253,13 @@ class TestWorker(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertNotEqual(job.get_status(), Status.FINISHED)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly"""
@ -318,3 +320,62 @@ class TestWorker(RQTestCase):
'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Adam!')
self.assertEquals(job.description, '你好 世界!')
def test_suspend_worker_execution(self):
"""Test Pause Worker Execution"""
SENTINEL_FILE = '/tmp/rq-tests.txt'
try:
# Remove the sentinel if it is leftover from a previous test run
os.remove(SENTINEL_FILE)
except OSError as e:
if e.errno != 2:
raise
q = Queue()
job = q.enqueue(create_file, SENTINEL_FILE)
w = Worker([q])
suspend(self.testconn)
w.work(burst=True)
assert q.count == 1
# Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
resume(self.testconn)
w.work(burst=True)
assert q.count == 0
self.assertEquals(os.path.exists(SENTINEL_FILE), True)
def test_suspend_with_duration(self):
q = Queue()
for _ in range(5):
q.enqueue(do_nothing)
w = Worker([q])
# This suspends workers for working for 2 second
suspend(self.testconn, 2)
# So when this burst of work happens the queue should remain at 5
w.work(burst=True)
assert q.count == 5
sleep(3)
# The suspension should be expired now, and a burst of work should now clear the queue
w.work(burst=True)
assert q.count == 0
def test_worker_hash_(self):
"""Workers are hashed by their .name attribute"""
q = Queue('foo')
w1 = Worker([q], name="worker1")
w2 = Worker([q], name="worker2")
w3 = Worker([q], name="worker1")
worker_set = set([w1, w2, w3])
self.assertEquals(len(worker_set), 2)

Loading…
Cancel
Save