From 191cc2854348bc6799553413c35ed0d8c0ec0ab1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=BB=E6=99=93=E7=A3=8A?= Date: Fri, 12 Aug 2016 17:07:30 +0800 Subject: [PATCH 01/10] custom connection class --- rq/cli/cli.py | 12 +++++++----- rq/cli/helpers.py | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index bb3c5a6..0b3e611 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -37,12 +37,12 @@ config_option = click.option('--config', '-c', help='Module containing RQ settings.') -def connect(url, config=None): +def connect(url, config=None, connection_class='redis.StrictRedis'): if url: - return StrictRedis.from_url(url) + return connection_class.from_url(url) settings = read_config_file(config) if config else {} - return get_redis_from_config(settings) + return get_redis_from_config(settings, connection_class) @click.group() @@ -143,6 +143,7 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use') @click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use') +@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use') @click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--results-ttl', type=int, help='Default results timeout to be used') @click.option('--worker-ttl', type=int, help='Default worker timeout to be used') @@ -152,7 +153,7 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) -def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, +def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl, worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" @@ -170,7 +171,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, setup_loghandlers_from_args(verbose, quiet) - conn = connect(url, config) + connection_class = import_attribute(connection_class) + conn = connect(url, config, connection_class) cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 7bfc0cf..340233d 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -26,10 +26,10 @@ def read_config_file(module): if k.upper() == k]) -def get_redis_from_config(settings): +def get_redis_from_config(settings, connection_class): """Returns a StrictRedis instance from a dictionary of settings.""" if settings.get('REDIS_URL') is not None: - return StrictRedis.from_url(settings['REDIS_URL']) + return connection_class.from_url(settings['REDIS_URL']) kwargs = { 'host': settings.get('REDIS_HOST', 'localhost'), @@ -52,7 +52,7 @@ def get_redis_from_config(settings): if not version_info >= (2, 10): raise RuntimeError('Using SSL requires a redis-py version >= 2.10') kwargs['ssl'] = use_ssl - return StrictRedis(**kwargs) + return connection_class(**kwargs) def pad(s, pad_to_length): From 83f81b351d5a62b016111c1742d0fa7dd2c772c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=BB=E6=99=93=E7=A3=8A?= Date: Fri, 12 Aug 2016 17:20:19 +0800 Subject: [PATCH 02/10] fix default argument --- rq/cli/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 0b3e611..b3573e9 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -37,7 +37,7 @@ config_option = click.option('--config', '-c', help='Module containing RQ settings.') -def connect(url, config=None, connection_class='redis.StrictRedis'): +def connect(url, config=None, connection_class=StrictRedis): if url: return connection_class.from_url(url) From ee4cf6e3ee1af19e9364a8c104bfa262cdc65ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=BB=E6=99=93=E7=A3=8A?= Date: Fri, 12 Aug 2016 17:23:53 +0800 Subject: [PATCH 03/10] fix get_redis_from_config() --- rq/cli/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 340233d..da7ce7f 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -26,7 +26,7 @@ def read_config_file(module): if k.upper() == k]) -def get_redis_from_config(settings, connection_class): +def get_redis_from_config(settings, connection_class=StrictRedis): """Returns a StrictRedis instance from a dictionary of settings.""" if settings.get('REDIS_URL') is not None: return connection_class.from_url(settings['REDIS_URL']) From 301e5c927b90993528edf8281766fa89b5e4933b Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Thu, 25 Aug 2016 20:06:56 +0200 Subject: [PATCH 04/10] Raise an exception if a given dependency does not exist Adapted some tests to the change: the dependency has to be saved first. --- rq/exceptions.py | 4 ++++ rq/queue.py | 11 +++++++++-- tests/test_queue.py | 19 ++++++++++++++++++- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 530733d..ebb6b3b 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -7,6 +7,10 @@ class NoSuchJobError(Exception): pass +class InvalidJobDependency(Exception): + pass + + class InvalidJobOperationError(Exception): pass diff --git a/rq/queue.py b/rq/queue.py index e5d3df8..1b8aacc 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -9,8 +9,8 @@ from redis import WatchError from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL -from .exceptions import (DequeueTimeout, InvalidJobOperationError, - NoSuchJobError, UnpickleError) +from .exceptions import (DequeueTimeout, InvalidJobDependency, + InvalidJobOperationError, NoSuchJobError, UnpickleError) from .job import Job, JobStatus from .utils import import_attribute, utcnow @@ -202,6 +202,13 @@ class Queue(object): while True: try: pipe.watch(depends_on.key) + + # If the dependency does not exist, we raise an + # exception. So the caller is able to avoid an orphaned + # job. + if not self.job_class.exists(depends_on.id): + raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id)) + if depends_on.get_status() != JobStatus.FINISHED: pipe.multi() job.set_status(JobStatus.DEFERRED) diff --git a/tests/test_queue.py b/tests/test_queue.py index b62fddc..2edb163 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -7,7 +7,7 @@ from tests.fixtures import (div_by_zero, echo, Number, say_hello, some_calculation) from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobOperationError +from rq.exceptions import InvalidJobDependency, InvalidJobOperationError from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker @@ -401,6 +401,7 @@ class TestQueue(RQTestCase): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, []) @@ -417,6 +418,7 @@ class TestQueue(RQTestCase): def test_enqueue_job_with_dependency_by_id(self): """Can specify job dependency with job object or job id.""" parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() q.enqueue_call(say_hello, depends_on=parent_job.id) @@ -433,6 +435,7 @@ class TestQueue(RQTestCase): """Jobs remember their timeout when enqueued as a dependency.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) self.assertEqual(q.job_ids, []) @@ -445,6 +448,20 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) + def test_enqueue_job_with_invalid_dependency(self): + """Enqueuing a job fails, if the dependency does not exist at all.""" + parent_job = Job.create(func=say_hello) + # without save() the job is not visible to others + + q = Queue() + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job) + + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job.id) + + self.assertEqual(q.job_ids, []) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): From afc7469c27dad82501e019a98845dbeef0f75150 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Thu, 28 Jul 2016 13:24:58 +0100 Subject: [PATCH 05/10] fetch_job - check correct queue, fix #728 --- rq/queue.py | 5 ++++- tests/test_queue.py | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 1b8aacc..3fcb8a0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -107,9 +107,12 @@ class Queue(object): def fetch_job(self, job_id): try: - return self.job_class.fetch(job_id, connection=self.connection) + job = self.job_class.fetch(job_id, connection=self.connection) except NoSuchJobError: self.remove(job_id) + else: + if job.origin == self.name: + return job def get_job_ids(self, offset=0, length=-1): """Returns a slice of job IDs in the queue.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 2edb163..015590a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -462,6 +462,32 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) + def test_fetch_job_successful(self): + """Fetch a job from a queue.""" + q = Queue('example') + job_orig = q.enqueue(say_hello) + job_fetch = q.fetch_job(job_orig.id) + self.assertIsNotNone(job_fetch) + self.assertEqual(job_orig.id, job_fetch.id) + self.assertEqual(job_orig.description, job_fetch.description) + + def test_fetch_job_missing(self): + """Fetch a job from a queue which doesn't exist.""" + q = Queue('example') + job = q.fetch_job('123') + self.assertIsNone(job) + + def test_fetch_job_different_queue(self): + """Fetch a job from a queue which is in a different queue.""" + q1 = Queue('example1') + q2 = Queue('example2') + job_orig = q1.enqueue(say_hello) + job_fetch = q2.fetch_job(job_orig.id) + self.assertIsNone(job_fetch) + + job_fetch = q1.fetch_job(job_orig.id) + self.assertIsNotNone(job_fetch) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): From 5945c28ce3031448c151fc02c4288db569a74322 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2016 08:33:40 +0100 Subject: [PATCH 06/10] Remove legacy script --- rq/scripts/rqgenload.py | 67 ----------------------------------------- 1 file changed, 67 deletions(-) delete mode 100755 rq/scripts/rqgenload.py diff --git a/rq/scripts/rqgenload.py b/rq/scripts/rqgenload.py deleted file mode 100755 index 5c87fbb..0000000 --- a/rq/scripts/rqgenload.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, print_function, - unicode_literals) - -import optparse - -from rq import dummy, Queue, use_connection - - -def parse_args(): - parser = optparse.OptionParser() - parser.add_option('-n', '--count', type='int', dest='count', default=1) - opts, args = parser.parse_args() - return (opts, args, parser) - - -def main(): - import sys - sys.path.insert(0, '.') - - opts, args, parser = parse_args() - - use_connection() - - queues = ('default', 'high', 'low') - - sample_calls = [ - (dummy.do_nothing, [], {}), - (dummy.sleep, [1], {}), - (dummy.fib, [8], {}), # normal result - (dummy.fib, [24], {}), # takes pretty long - (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc - (dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing) - ] - - for i in range(opts.count): - import random - f, args, kwargs = random.choice(sample_calls) - - q = Queue(random.choice(queues)) - q.enqueue(f, *args, **kwargs) - - # q = Queue('foo') - # q.enqueue(do_nothing) - # q.enqueue(sleep, 3) - # q = Queue('bar') - # q.enqueue(yield_stuff) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - # q.enqueue(do_nothing) - -if __name__ == '__main__': - main() From a689cdb3ec45f16fc5c18e46d084229428c464af Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2016 08:33:11 +0100 Subject: [PATCH 07/10] Ignore more files --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 03a7aee..f07314b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ .vagrant Vagrantfile .idea/ +.coverage.* +/.cache From af6ce54ea4df2103d6744beb48f46d9cb5e8dc04 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2016 08:32:39 +0100 Subject: [PATCH 08/10] Fix PEP8 complaints --- rq/cli/cli.py | 4 ++-- rq/job.py | 1 + rq/worker.py | 5 +++-- tests/test_worker.py | 2 ++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index b3573e9..fb0dc8e 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -153,8 +153,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) -def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl, worker_ttl, - verbose, quiet, sentry_dsn, exception_handler, pid, queues): +def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl, + worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" if path: diff --git a/rq/job.py b/rq/job.py index ce1894b..133a748 100644 --- a/rq/job.py +++ b/rq/job.py @@ -579,4 +579,5 @@ class Job(object): def __hash__(self): return hash(self.id) + _job_stack = LocalStack() diff --git a/rq/worker.py b/rq/worker.py index 95bcded..3067f68 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -55,6 +55,7 @@ def iterable(x): def compact(l): return [x for x in l if x is not None] + _signames = dict((getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame) @@ -544,7 +545,7 @@ class Worker(object): except Exception: pass - #Unhandled failure: move the job to the failed queue + # Unhandled failure: move the job to the failed queue self.log.warning( 'Moving job to {0!r} queue'.format( self.failed_queue.name @@ -826,7 +827,7 @@ class HerokuWorker(Worker): self.request_force_stop_sigrtmin(signum, frame) else: self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', - self.imminent_shutdown_delay) + self.imminent_shutdown_delay) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) diff --git a/tests/test_worker.py b/tests/test_worker.py index b753274..4923379 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -577,6 +577,7 @@ def kill_worker(pid, double_kill): time.sleep(0.5) os.kill(pid, signal.SIGTERM) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) os.kill(pid, signal.SIGKILL) @@ -678,6 +679,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertEqual(failed_q.count, 1) self.assertEqual(fooq.count, 0) + def schedule_access_self(): q = Queue('default', connection=get_current_connection()) q.enqueue(access_self) From 20e258f6104e196d25bd7acfed6532465f8f2752 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2016 08:26:35 +0100 Subject: [PATCH 09/10] Bump to 0.7.0 --- CHANGES.md | 12 +++++++++++- rq/version.py | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d47d860..e010bd1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,16 @@ ### 0.7.0 -- ... +- Better support for Heroku workers (#584, #715) +- Support for connecting using a custom connection class (#741) +- Fix: connection stack in default worker (#479, #641) +- Fix: `fetch_job` now checks that a job requested actually comes from the + intended queue (#728, #733) +- Fix: Properly raise exception if a job dependency does not exist (#747) +- Fix: Job status not updated when horse dies unexpectedly (#710) +- Fix: `request_force_stop_sigrtmin` failing for Python 3 (#727) +- Fix `Job.cancel()` method on failed queue (#707) +- Python 3.5 compatibility improvements (#729) +- Improved signal name lookup (#722) ### 0.6.0 diff --git a/rq/version.py b/rq/version.py index b7fea81..eafddd9 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.7.0dev0' +VERSION = '0.7.0' From f3f61bbc15fd39478e1da7882631b2b64912affb Mon Sep 17 00:00:00 2001 From: Michael DeWulf Date: Fri, 18 Nov 2016 14:12:06 -0600 Subject: [PATCH 10/10] Pass pipeline to push_job_id from enqueue_job --- rq/queue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 3fcb8a0..5027fa6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -290,12 +290,12 @@ class Queue(object): job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + if self._async: + self.push_job_id(job.id, pipeline=pipe, at_front=at_front) + if pipeline is None: pipe.execute() - if self._async: - self.push_job_id(job.id, at_front=at_front) - return job def enqueue_dependents(self, job, pipeline=None):