diff --git a/.gitignore b/.gitignore index 03a7aee..f07314b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ .vagrant Vagrantfile .idea/ +.coverage.* +/.cache diff --git a/CHANGES.md b/CHANGES.md index d47d860..e010bd1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,16 @@ ### 0.7.0 -- ... +- Better support for Heroku workers (#584, #715) +- Support for connecting using a custom connection class (#741) +- Fix: connection stack in default worker (#479, #641) +- Fix: `fetch_job` now checks that a job requested actually comes from the + intended queue (#728, #733) +- Fix: Properly raise exception if a job dependency does not exist (#747) +- Fix: Job status not updated when horse dies unexpectedly (#710) +- Fix: `request_force_stop_sigrtmin` failing for Python 3 (#727) +- Fix `Job.cancel()` method on failed queue (#707) +- Python 3.5 compatibility improvements (#729) +- Improved signal name lookup (#722) ### 0.6.0 diff --git a/rq/cli/cli.py b/rq/cli/cli.py index bb3c5a6..fb0dc8e 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -37,12 +37,12 @@ config_option = click.option('--config', '-c', help='Module containing RQ settings.') -def connect(url, config=None): +def connect(url, config=None, connection_class=StrictRedis): if url: - return StrictRedis.from_url(url) + return connection_class.from_url(url) settings = read_config_file(config) if config else {} - return get_redis_from_config(settings) + return get_redis_from_config(settings, connection_class) @click.group() @@ -143,6 +143,7 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use') @click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use') +@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use') @click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--results-ttl', type=int, help='Default results timeout to be used') @click.option('--worker-ttl', type=int, help='Default worker timeout to be used') @@ -152,8 +153,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) -def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, - verbose, quiet, sentry_dsn, exception_handler, pid, queues): +def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl, + worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" if path: @@ -170,7 +171,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, setup_loghandlers_from_args(verbose, quiet) - conn = connect(url, config) + connection_class = import_attribute(connection_class) + conn = connect(url, config, connection_class) cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 7bfc0cf..da7ce7f 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -26,10 +26,10 @@ def read_config_file(module): if k.upper() == k]) -def get_redis_from_config(settings): +def get_redis_from_config(settings, connection_class=StrictRedis): """Returns a StrictRedis instance from a dictionary of settings.""" if settings.get('REDIS_URL') is not None: - return StrictRedis.from_url(settings['REDIS_URL']) + return connection_class.from_url(settings['REDIS_URL']) kwargs = { 'host': settings.get('REDIS_HOST', 'localhost'), @@ -52,7 +52,7 @@ def get_redis_from_config(settings): if not version_info >= (2, 10): raise RuntimeError('Using SSL requires a redis-py version >= 2.10') kwargs['ssl'] = use_ssl - return StrictRedis(**kwargs) + return connection_class(**kwargs) def pad(s, pad_to_length): diff --git a/rq/exceptions.py b/rq/exceptions.py index 530733d..ebb6b3b 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -7,6 +7,10 @@ class NoSuchJobError(Exception): pass +class InvalidJobDependency(Exception): + pass + + class InvalidJobOperationError(Exception): pass diff --git a/rq/job.py b/rq/job.py index d037374..b820f79 100644 --- a/rq/job.py +++ b/rq/job.py @@ -579,4 +579,5 @@ class Job(object): def __hash__(self): return hash(self.id) + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 1e5b7e4..8231b08 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -9,8 +9,8 @@ from redis import WatchError from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL -from .exceptions import (DequeueTimeout, InvalidJobOperationError, - NoSuchJobError, UnpickleError) +from .exceptions import (DequeueTimeout, InvalidJobDependency, + InvalidJobOperationError, NoSuchJobError, UnpickleError) from .job import Job, JobStatus from .utils import import_attribute, utcnow @@ -107,9 +107,12 @@ class Queue(object): def fetch_job(self, job_id): try: - return self.job_class.fetch(job_id, connection=self.connection) + job = self.job_class.fetch(job_id, connection=self.connection) except NoSuchJobError: self.remove(job_id) + else: + if job.origin == self.name: + return job def get_job_ids(self, offset=0, length=-1): """Returns a slice of job IDs in the queue.""" @@ -202,6 +205,13 @@ class Queue(object): while True: try: pipe.watch(depends_on.key) + + # If the dependency does not exist, we raise an + # exception. So the caller is able to avoid an orphaned + # job. + if not self.job_class.exists(depends_on.id): + raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id)) + if depends_on.get_status() != JobStatus.FINISHED: pipe.multi() job.set_status(JobStatus.DEFERRED) diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py deleted file mode 100755 index 5c87fbb..0000000 --- a/rq/scripts/rqgenload.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, print_function, - unicode_literals) - -import optparse - -from rq import dummy, Queue, use_connection - - -def parse_args(): - parser = optparse.OptionParser() - parser.add_option('-n', '--count', type='int', dest='count', default=1) - opts, args = parser.parse_args() - return (opts, args, parser) - - -def main(): - import sys - sys.path.insert(0, '.') - - opts, args, parser = parse_args() - - use_connection() - - queues = ('default', 'high', 'low') - - sample_calls = [ - (dummy.do_nothing, [], {}), - (dummy.sleep, [1], {}), - (dummy.fib, [8], {}), # normal result - (dummy.fib, [24], {}), # takes pretty long - (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc - (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) - ] - - for i in range(opts.count): - import random - f, args, kwargs = random.choice(sample_calls) - - q = Queue(random.choice(queues)) - q.enqueue(f, *args, **kwargs) - - # q = Queue('foo') - # q.enqueue(do_nothing) - # q.enqueue(sleep, 3) - # q = Queue('bar') - # q.enqueue(yield_stuff) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - -if __name__ == '__main__': - main() diff --git a/rq/version.py b/rq/version.py index b7fea81..eafddd9 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.7.0dev0' +VERSION = '0.7.0' diff --git a/rq/worker.py b/rq/worker.py index 8b47236..7d745c4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -57,6 +57,7 @@ def iterable(x): def compact(l): return [x for x in l if x is not None] + _signames = dict((getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame) @@ -540,7 +541,7 @@ class Worker(object): job=job ) - #Unhandled failure: move the job to the failed queue + # Unhandled failure: move the job to the failed queue self.log.warning( 'Moving job to {0!r} queue'.format( self.failed_queue.name @@ -843,7 +844,7 @@ class HerokuWorker(Worker): self.request_force_stop_sigrtmin(signum, frame) else: self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', - self.imminent_shutdown_delay) + self.imminent_shutdown_delay) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) diff --git a/tests/test_queue.py b/tests/test_queue.py index b62fddc..015590a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -7,7 +7,7 @@ from tests.fixtures import (div_by_zero, echo, Number, say_hello, some_calculation) from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobOperationError +from rq.exceptions import InvalidJobDependency, InvalidJobOperationError from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker @@ -401,6 +401,7 @@ class TestQueue(RQTestCase): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, []) @@ -417,6 +418,7 @@ class TestQueue(RQTestCase): def test_enqueue_job_with_dependency_by_id(self): """Can specify job dependency with job object or job id.""" parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() q.enqueue_call(say_hello, depends_on=parent_job.id) @@ -433,6 +435,7 @@ class TestQueue(RQTestCase): """Jobs remember their timeout when enqueued as a dependency.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) self.assertEqual(q.job_ids, []) @@ -445,6 +448,46 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) + def test_enqueue_job_with_invalid_dependency(self): + """Enqueuing a job fails, if the dependency does not exist at all.""" + parent_job = Job.create(func=say_hello) + # without save() the job is not visible to others + + q = Queue() + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job) + + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job.id) + + self.assertEqual(q.job_ids, []) + + def test_fetch_job_successful(self): + """Fetch a job from a queue.""" + q = Queue('example') + job_orig = q.enqueue(say_hello) + job_fetch = q.fetch_job(job_orig.id) + self.assertIsNotNone(job_fetch) + self.assertEqual(job_orig.id, job_fetch.id) + self.assertEqual(job_orig.description, job_fetch.description) + + def test_fetch_job_missing(self): + """Fetch a job from a queue which doesn't exist.""" + q = Queue('example') + job = q.fetch_job('123') + self.assertIsNone(job) + + def test_fetch_job_different_queue(self): + """Fetch a job from a queue which is in a different queue.""" + q1 = Queue('example1') + q2 = Queue('example2') + job_orig = q1.enqueue(say_hello) + job_fetch = q2.fetch_job(job_orig.id) + self.assertIsNone(job_fetch) + + job_fetch = q1.fetch_job(job_orig.id) + self.assertIsNotNone(job_fetch) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index dd4d0fe..8c1d2d2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -613,6 +613,7 @@ def kill_worker(pid, double_kill): time.sleep(0.5) os.kill(pid, signal.SIGTERM) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) os.kill(pid, signal.SIGKILL) @@ -714,6 +715,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertEqual(failed_q.count, 1) self.assertEqual(fooq.count, 0) + def schedule_access_self(): q = Queue('default', connection=get_current_connection()) q.enqueue(access_self)