From 7660fbdc18d25247e85a6cd6fb99662e43184ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B3hann=20=C3=9Eorvaldur=20Berg=C3=BE=C3=B3rsson?= Date: Sun, 31 Mar 2013 08:28:15 +0000 Subject: [PATCH 1/8] Made rqworker and rqinfo respect db parameters from --url When starting the rqworker and rqinfo scripts with an --url parameter containing a non default database, e.g. redis://localhost:6379/2, both scripts connected db 0 instead of the desired database. Fixed this behavior by ignoring the --host, --port and --db arguments if --url is there. Also fixed another issue with the rqinfo script, in which it defaulted to only the 'default' queue instead of finding all available queues using Queue.all(). --- rq/scripts/__init__.py | 8 +++----- rq/scripts/rqinfo.py | 3 ++- rq/scripts/rqworker.py | 29 ++++++++++++++++++----------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 119f4f6..c0ad226 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -7,7 +7,8 @@ def add_standard_arguments(parser): parser.add_argument('--config', '-c', default=None, help='Module containing RQ settings.') parser.add_argument('--url', '-u', default=None, - help='URL describing Redis connection details') + help='URL describing Redis connection details. ' + 'Overrides other connection arguments if supplied.') parser.add_argument('--host', '-H', default=None, help='The Redis hostname (default: localhost)') parser.add_argument('--port', '-p', default=None, @@ -42,13 +43,10 @@ def setup_default_arguments(args, settings): if args.password is None: args.password = settings.get('REDIS_PASSWORD', None) - if not args.queues: - args.queues = settings.get('QUEUES', ['default']) - def setup_redis(args): if args.url is not None: - redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url) else: redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password) diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 7e03647..df16eb2 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -21,6 +21,7 @@ def pad(s, pad_to_length): """Pads the given string to the given length.""" return ('%-' + '%ds' % pad_to_length) % (s,) + def get_scale(x): """Finds the lowest scale where x <= scale.""" scales = [20, 50, 100, 200, 400, 600, 800, 1000] @@ -29,6 +30,7 @@ def get_scale(x): return scale return x + def state_symbol(state): symbols = { 'busy': red('busy'), @@ -186,4 +188,3 @@ def main(): except KeyboardInterrupt: print sys.exit(0) - diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index 2e00494..f307be1 100755 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -31,6 +31,19 @@ def parse_args(): return parser.parse_args() +def setup_loghandlers_from_args(args): + if args.verbose and args.quiet: + raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") + + if args.verbose: + level = 'DEBUG' + elif args.quiet: + level = 'WARNING' + else: + level = 'INFO' + setup_loghandlers(level) + + def main(): args = parse_args() @@ -43,20 +56,14 @@ def main(): setup_default_arguments(args, settings) - # Other default arguments + # Worker specific default arguments + if not args.queues: + args.queues = settings.get('QUEUES', ['default']) + if args.sentry_dsn is None: args.sentry_dsn = settings.get('SENTRY_DSN', None) - if args.verbose and args.quiet: - raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") - - if args.verbose: - level = 'DEBUG' - elif args.quiet: - level = 'WARNING' - else: - level = 'INFO' - setup_loghandlers(level) + setup_loghandlers_from_args(args) setup_redis(args) cleanup_ghosts() From ef0f04bff691bc44f86898ee7f054cfb40484758 Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Wed, 13 Feb 2013 10:16:48 +0200 Subject: [PATCH 2/8] extract `safe_fetch_job` method --- rq/queue.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 3bf5d00..05c8394 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -64,6 +64,16 @@ class Queue(object): """Returns whether the current queue is empty.""" return self.count == 0 + def safe_fetch_job(self, job_id): + try: + job = Job.safe_fetch(job_id, connection=self.connection) + except NoSuchJobError: + self.remove(job_id) + return None + except UnpickleError: + return None + return job + @property def job_ids(self): """Returns a list of all job IDS in the queue.""" @@ -72,17 +82,7 @@ class Queue(object): @property def jobs(self): """Returns a list of all (valid) jobs in the queue.""" - def safe_fetch(job_id): - try: - job = Job.safe_fetch(job_id, connection=self.connection) - except NoSuchJobError: - self.remove(job_id) - return None - except UnpickleError: - return None - return job - - return compact([safe_fetch(job_id) for job_id in self.job_ids]) + return compact([self.safe_fetch_job(job_id) for job_id in self.job_ids]) @property def count(self): From e3075ea6be3cfb88c71a1a0d36202d4c51473afb Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Wed, 13 Feb 2013 10:17:36 +0200 Subject: [PATCH 3/8] get a page of jobs --- rq/queue.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index 05c8394..739b659 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,6 +74,11 @@ class Queue(object): return None return job + def get_jobs_page(self, offset, limit): + """Returns a paginated list of jobs in the queue.""" + job_ids = self.connection.lrange(self.key, offset, offset+limit) + return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) + @property def job_ids(self): """Returns a list of all job IDS in the queue.""" From 2fb6e5ca1ad8713c9b71ee72b894e2414a64c3ec Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 19 Apr 2013 10:24:38 +0200 Subject: [PATCH 4/8] Minor refactoring of the paging logic. --- rq/queue.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 739b659..7d51105 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,20 +74,28 @@ class Queue(object): return None return job - def get_jobs_page(self, offset, limit): - """Returns a paginated list of jobs in the queue.""" - job_ids = self.connection.lrange(self.key, offset, offset+limit) + def get_job_ids(self, start=0, limit=-1): + """Returns a slice of job IDs in the queue.""" + if limit >= 0: + end = start + limit + else: + end = limit + return self.connection.lrange(self.key, start, end) + + def get_jobs(self, start=0, limit=-1): + """Returns a slice of jobs in the queue.""" + job_ids = self.get_job_ids(start, limit) return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) @property def job_ids(self): """Returns a list of all job IDS in the queue.""" - return self.connection.lrange(self.key, 0, -1) + return self.get_jobs_ids() @property def jobs(self): """Returns a list of all (valid) jobs in the queue.""" - return compact([self.safe_fetch_job(job_id) for job_id in self.job_ids]) + return self.get_jobs() @property def count(self): @@ -118,7 +126,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): # noqa """Creates a job to represent the delayed function call and enqueues it. From 73b7453e40257e81d9e3ecf2b2e839764c27ec1b Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 19 Apr 2013 10:28:09 +0200 Subject: [PATCH 5/8] Fix typo. --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 7d51105..a7a82bc 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -90,7 +90,7 @@ class Queue(object): @property def job_ids(self): """Returns a list of all job IDS in the queue.""" - return self.get_jobs_ids() + return self.get_job_ids() @property def jobs(self): From 89293353d64a63ecb724338dae5a0a77bea0e7a4 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 19 Apr 2013 10:55:17 +0200 Subject: [PATCH 6/8] Update changelog. --- CHANGES.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 5366f96..93cc7bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,12 +1,17 @@ ### 0.3.8 (not yet released) +- `rqworker` and `rqinfo` have a `--url` argument to connect to a Redis url. + - `rqworker` and `rqinfo` have a `--socket` option to connect to a Redis server through a Unix socket. - `rqworker` reads `SENTRY_DSN` from the environment, unless specifically provided on the command line. +- `Queue` has a new API that supports paging `get_jobs(3, 7)`, which will + return at most 7 jobs, starting from the 3rd. + ### 0.3.7 (February 26th, 2013) From a5dff6659ce02047fa8ca65dc41d7ef1c2895081 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 19 Apr 2013 21:18:23 +0200 Subject: [PATCH 7/8] Replace the Calculator fixture by a Number fixture. This makes the tests a little more realistic, since I want to add a test for class methods. --- tests/fixtures.py | 15 +++++++++------ tests/test_job.py | 12 ++++++------ tests/test_queue.py | 10 +++++----- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 77774d9..337fc70 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -49,13 +49,16 @@ def access_self(): return job.id -class Calculator(object): - """Test instance methods.""" - def __init__(self, denominator): - self.denominator = denominator +class Number(object): + def __init__(self, value): + self.value = value - def calculate(self, x, y): - return x * y / self.denominator + @classmethod + def divide(cls, x, y): + return x * y + + def div(self, y): + return self.value / y with Connection(): diff --git a/tests/test_job.py b/tests/test_job.py index 8b1d137..8f59472 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,7 +1,7 @@ import times from datetime import datetime from tests import RQTestCase -from tests.fixtures import Calculator, some_calculation, say_hello, access_self +from tests.fixtures import Number, some_calculation, say_hello, access_self from tests.helpers import strip_milliseconds from cPickle import loads from rq.job import Job, get_current_job @@ -51,13 +51,13 @@ class TestJob(RQTestCase): def test_create_instance_method_job(self): """Creation of jobs for instance methods.""" - c = Calculator(2) - job = Job.create(func=c.calculate, args=(3, 4)) + n = Number(2) + job = Job.create(func=n.div, args=(4,)) # Job data is set - self.assertEquals(job.func, c.calculate) - self.assertEquals(job.instance, c) - self.assertEquals(job.args, (3, 4)) + self.assertEquals(job.func, n.div) + self.assertEquals(job.instance, n) + self.assertEquals(job.args, (4,)) def test_create_job_from_string_function(self): """Creation of jobs using string specifier.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 061c984..e5bd7fd 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,5 +1,5 @@ from tests import RQTestCase -from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation +from tests.fixtures import Number, div_by_zero, say_hello, some_calculation from rq import Queue, get_failed_queue from rq.job import Job, Status from rq.exceptions import InvalidJobOperationError @@ -161,14 +161,14 @@ class TestQueue(RQTestCase): def test_dequeue_instance_method(self): """Dequeueing instance method jobs from queues.""" q = Queue() - c = Calculator(2) - result = q.enqueue(c.calculate, 3, 4) + n = Number(2) + q.enqueue(n.div, 4) job = q.dequeue() # The instance has been pickled and unpickled, so it is now a separate # object. Test for equality using each object's __dict__ instead. - self.assertEquals(job.instance.__dict__, c.__dict__) - self.assertEquals(job.func.__name__, 'calculate') + self.assertEquals(job.instance.__dict__, Number.__dict__) + self.assertEquals(job.func.__name__, 'divide') self.assertEquals(job.args, (3, 4)) def test_dequeue_ignores_nonexisting_jobs(self): From 7ea02d4029ecab6afb696f400dd303d6ea80b05b Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 19 Apr 2013 21:21:53 +0200 Subject: [PATCH 8/8] Add explicit classmethod test. --- tests/test_queue.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index e5bd7fd..0385e0a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -165,8 +165,20 @@ class TestQueue(RQTestCase): q.enqueue(n.div, 4) job = q.dequeue() + # The instance has been pickled and unpickled, so it is now a separate # object. Test for equality using each object's __dict__ instead. + self.assertEquals(job.instance.__dict__, n.__dict__) + self.assertEquals(job.func.__name__, 'div') + self.assertEquals(job.args, (4,)) + + def test_dequeue_class_method(self): + """Dequeueing class method jobs from queues.""" + q = Queue() + q.enqueue(Number.divide, 3, 4) + + job = q.dequeue() + self.assertEquals(job.instance.__dict__, Number.__dict__) self.assertEquals(job.func.__name__, 'divide') self.assertEquals(job.args, (3, 4))