From e6c32edad9da044b66deb9d9a612f4acb02e2090 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Fri, 19 Dec 2014 14:17:17 -0500 Subject: [PATCH 01/53] add birth_date and death_date properties to Worker --- rq/worker.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 7283d56..73347c7 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,7 +21,7 @@ from .job import Job, Status from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty -from .utils import import_attribute, make_colorizer, utcformat, utcnow +from .utils import import_attribute, make_colorizer, utcformat, utcnow, utcparse from .version import VERSION from .registry import FinishedJobRegistry, StartedJobRegistry @@ -235,6 +235,21 @@ class Worker(object): p.expire(self.key, 60) p.execute() + @property + def birth_date(self): + """Fetches birth date from Redis.""" + birth_timestamp = self.connection.hget(self.key, 'birth') + if birth_timestamp: + return utcparse(birth_timestamp) + + @property + def death_date(self): + """Fetches death date from Redis.""" + death_timestamp = self.connection.hget(self.key, 'death') + if death_timestamp: + return utcparse(death_timestamp) + + def set_state(self, state, pipeline=None): self._state = state connection = pipeline if pipeline is not None else self.connection From 5b5ab4860f56c430280ba6e39e02361c06222622 Mon Sep 17 00:00:00 2001 From: glaslos Date: Thu, 29 Jan 2015 14:18:59 +0100 Subject: [PATCH 02/53] passing on the ttl --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 7352459..bdcf1f3 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -184,7 +184,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=JobStatus.QUEUED, + result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) From 3af0e95ce5a0314fa8d95b60785e6cf2979e9d6b Mon Sep 17 00:00:00 2001 From: glaslos Date: Thu, 29 Jan 2015 14:19:50 +0100 Subject: [PATCH 03/53] adding two tests to ensure ttl behaviour --- tests/fixtures.py | 5 +++-- tests/test_job.py | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index a0e8eba..4271373 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -85,5 +85,6 @@ with Connection(): return x + y -def long_running_job(): - time.sleep(10) +def long_running_job(timeout=10): + time.sleep(timeout) + return 'Done sleeping...' diff --git a/tests/test_job.py b/tests/test_job.py index 34859a7..1684315 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -12,7 +12,7 @@ from rq.utils import utcformat from tests import RQTestCase from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation) + some_calculation, long_running_job) from tests.helpers import strip_microseconds try: @@ -313,6 +313,22 @@ class TestJob(RQTestCase): job.save() self.assertEqual(job.get_ttl(), None) + def test_ttl_via_enqueue(self): + ttl = 1 + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello, ttl=ttl) + self.assertEqual(job.get_ttl(), ttl) + + def test_expire_during_execution(self): + """Test what happens when job expires during execution""" + ttl = 2 + queue = Queue(connection=self.testconn) + job = queue.enqueue(long_running_job, args=(4,), ttl=ttl) + self.assertEqual(job.get_ttl(), ttl) + job.perform() + self.assertFalse(job.exists(job.id)) + self.assertEqual(job.result, 'Done sleeping...') + def test_cleanup(self): """Test that jobs and results are expired properly.""" job = Job.create(func=say_hello) From 703ab0e355b2d427da252693da1f865db8dd2de9 Mon Sep 17 00:00:00 2001 From: glaslos Date: Thu, 29 Jan 2015 14:26:32 +0100 Subject: [PATCH 04/53] removed merged comment --- rq/queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 6b0c3bd..2c175e5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -182,7 +182,6 @@ class Queue(object): """ timeout = timeout or self._default_timeout - # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create( func, args, kwargs, connection=self.connection, result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED, From af5a8624a69fc3f528a0489984a272ff79a4ef0b Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:05:10 -0500 Subject: [PATCH 05/53] add tests for birth and death worker methods --- tests/test_worker.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index c47a62e..e89f39b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -379,3 +379,23 @@ class TestWorker(RQTestCase): w3 = Worker([q], name="worker1") worker_set = set([w1, w2, w3]) self.assertEquals(len(worker_set), 2) + + def test_worker_sets_birth(self): + """Ensure worker correctly sets worker birth date.""" + q = Queue() + w = Worker([q]) + + birth_date = w.birth_date + self.assertIsNotNone(birth_date) + self.assertEquals(type(birth_date).__name__, 'datetime') + + def test_worker_sets_death(self): + """Ensure worker correctly sets worker birth date.""" + q = Queue() + w = Worker([q]) + + w.register_death() + + death_date = w.death_date + self.assertIsNotNone(death_date) + self.assertEquals(type(death_date).__name__, 'datetime') From 450c5969aa4afd0550c130e12c55213b111a1108 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:12:33 -0500 Subject: [PATCH 06/53] call register birth in the register birth test --- tests/test_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index e89f39b..65edead 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -385,6 +385,8 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) + w.register_birth() + birth_date = w.birth_date self.assertIsNotNone(birth_date) self.assertEquals(type(birth_date).__name__, 'datetime') From a4ca4704ace1c1e027543ecaf1f154ad561016c6 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 5 Feb 2015 23:17:41 -0500 Subject: [PATCH 07/53] convert redis returned bytes into text/string --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c863b56..6c351c8 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -250,14 +250,14 @@ class Worker(object): """Fetches birth date from Redis.""" birth_timestamp = self.connection.hget(self.key, 'birth') if birth_timestamp: - return utcparse(birth_timestamp) + return utcparse(as_text(birth_timestamp)) @property def death_date(self): """Fetches death date from Redis.""" death_timestamp = self.connection.hget(self.key, 'death') if death_timestamp: - return utcparse(death_timestamp) + return utcparse(as_text(death_timestamp)) def set_state(self, state, pipeline=None): From 3dfd044767706ff6e2d0169a13d237d66e1154a6 Mon Sep 17 00:00:00 2001 From: Taras Semenenko Date: Wed, 25 Feb 2015 14:32:37 +0300 Subject: [PATCH 08/53] Add missed `multi` command after `watch` `watch` command should be used in conjunction with `multi` command which was missed in enqueuing of job with dependencies. Fix #487 --- rq/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/queue.py b/rq/queue.py index 988a54e..f5a8887 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -199,6 +199,7 @@ class Queue(object): try: pipe.watch(depends_on.key) if depends_on.get_status() != JobStatus.FINISHED: + pipe.multi() job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) From 636a537fa7bb4fe31e3c5e30d79e2dfe4a460e45 Mon Sep 17 00:00:00 2001 From: Mark LaPerriere Date: Thu, 26 Feb 2015 09:41:20 -0500 Subject: [PATCH 09/53] updates addressing @selwin comments for PR #465 --- rq/worker.py | 4 ++-- tests/test_worker.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 6c351c8..a32c6d9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -249,14 +249,14 @@ class Worker(object): def birth_date(self): """Fetches birth date from Redis.""" birth_timestamp = self.connection.hget(self.key, 'birth') - if birth_timestamp: + if birth_timestamp is not None: return utcparse(as_text(birth_timestamp)) @property def death_date(self): """Fetches death date from Redis.""" death_timestamp = self.connection.hget(self.key, 'death') - if death_timestamp: + if death_timestamp is not None: return utcparse(as_text(death_timestamp)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 65edead..6f89f4a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -392,7 +392,7 @@ class TestWorker(RQTestCase): self.assertEquals(type(birth_date).__name__, 'datetime') def test_worker_sets_death(self): - """Ensure worker correctly sets worker birth date.""" + """Ensure worker correctly sets worker death date.""" q = Queue() w = Worker([q]) From 3d4d6a86d5f73ccdedbb4d2ee596f957337a5fd3 Mon Sep 17 00:00:00 2001 From: glaslos Date: Fri, 27 Feb 2015 16:07:44 +0100 Subject: [PATCH 10/53] persist the job right before execution --- rq/job.py | 2 ++ tests/test_job.py | 10 ++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rq/job.py b/rq/job.py index 5dbbcf5..7195ddb 100644 --- a/rq/job.py +++ b/rq/job.py @@ -485,6 +485,8 @@ class Job(object): # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" + self.connection.persist(self.key) + self.ttl = self.connection.ttl(self.key) _job_stack.push(self.id) try: self._result = self.func(*self.args, **self.kwargs) diff --git a/tests/test_job.py b/tests/test_job.py index d715298..2e91751 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -345,14 +345,16 @@ class TestJob(RQTestCase): job = queue.enqueue(say_hello, ttl=ttl) self.assertEqual(job.get_ttl(), ttl) - def test_expire_during_execution(self): + def test_never_expire_during_execution(self): """Test what happens when job expires during execution""" - ttl = 2 + ttl = 1 queue = Queue(connection=self.testconn) - job = queue.enqueue(long_running_job, args=(4,), ttl=ttl) + job = queue.enqueue(long_running_job, args=(2,), ttl=ttl) self.assertEqual(job.get_ttl(), ttl) + job.save() job.perform() - self.assertFalse(job.exists(job.id)) + self.assertEqual(job.get_ttl(), -1) + self.assertTrue(job.exists(job.id)) self.assertEqual(job.result, 'Done sleeping...') def test_cleanup(self): From 933e6697cd0ac5d4037eed5e14d07daf312e4086 Mon Sep 17 00:00:00 2001 From: Vlad Pronsky Date: Thu, 5 Mar 2015 01:08:07 +0200 Subject: [PATCH 11/53] Fixed redis drivers bug --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index f5a8887..ea39d81 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -194,7 +194,7 @@ class Queue(object): if depends_on is not None: if not isinstance(depends_on, self.job_class): depends_on = Job(id=depends_on, connection=self.connection) - with self.connection.pipeline() as pipe: + with self.connection._pipeline() as pipe: while True: try: pipe.watch(depends_on.key) From 9fba806662cf58acdb7ab92512ea4ca5195fe529 Mon Sep 17 00:00:00 2001 From: Eric Bustarret Date: Fri, 6 Mar 2015 12:53:48 +0100 Subject: [PATCH 12/53] Add REDIS_SSL option Allow the worker to connect to a Redis instance through SSL (ex: Azure Redis Cache use SSL only by default) --- rq/cli/helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index d5c54a1..8a13218 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -36,6 +36,7 @@ def get_redis_from_config(settings): port=settings.get('REDIS_PORT', 6379), db=settings.get('REDIS_DB', 0), password=settings.get('REDIS_PASSWORD', None), + ssl=settings.get('REDIS_SSL',False) ) From 80009570090a50858b75bff5a9fa6344d9a0a5bf Mon Sep 17 00:00:00 2001 From: Eric Bustarret Date: Fri, 6 Mar 2015 15:34:07 +0100 Subject: [PATCH 13/53] Update requirements to redis-py 2.10.0 --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 8da152d..1be36a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis==2.7.0 +redis==2.10.0 click>=3.0.0 diff --git a/setup.py b/setup.py index f2a8574..0ab149b 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_version(): def get_dependencies(): - deps = ['redis >= 2.7.0', 'click >= 3.0'] + deps = ['redis >= 2.10.0', 'click >= 3.0'] if sys.version_info < (2, 7) or \ (sys.version_info >= (3, 0) and sys.version_info < (3, 1)): deps += ['importlib'] From 5e57e97b4ef31375ee502cbe64ec4545a0a26c81 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:18:34 +0100 Subject: [PATCH 14/53] Add changelog for 0.5.1. --- CHANGES.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 12f273a..4b4d6d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,15 @@ +### 0.5.1 + +(March 9th, 2015) + +- Resolve performance issue when queues contain many jobs +- Restore the ability to specify connection params in config +- Record `birth_date` and `death_date` on Worker +- Add support for SSL URLs in Redis (and `REDIS_SSL` config option) +- Fix encoding issues with non-ASCII characters in function arguments +- Fix Redis transaction management issue with job dependencies + + ### 0.5.0 (Jan 30th, 2015) From 85051982980a07504c81aeb8525a406f03b22037 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:18:48 +0100 Subject: [PATCH 15/53] Bump to 0.5.1. --- rq/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/version.py b/rq/version.py index 0d96ce8..7b8e658 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.0' +VERSION = '0.5.1' From f3740539289854a563da2356b99571bac84b9eab Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:20:42 +0100 Subject: [PATCH 16/53] Don't require Redis 2.10. --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 1be36a2..291ee5b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis==2.10.0 +redis>=2.7 click>=3.0.0 diff --git a/setup.py b/setup.py index 0ab149b..f2a8574 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_version(): def get_dependencies(): - deps = ['redis >= 2.10.0', 'click >= 3.0'] + deps = ['redis >= 2.7.0', 'click >= 3.0'] if sys.version_info < (2, 7) or \ (sys.version_info >= (3, 0) and sys.version_info < (3, 1)): deps += ['importlib'] From c6f79784640f95635b5e575e6691caa47350e948 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:21:40 +0100 Subject: [PATCH 17/53] Fix non-truth in README. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2b1fa71..b317dd1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily. -RQ requires Redis >= 2.6.0. +RQ requires Redis >= 2.7.0. [![Build status](https://travis-ci.org/nvie/rq.svg?branch=master)](https://secure.travis-ci.org/nvie/rq) [![Downloads](https://pypip.in/d/rq/badge.svg)](https://pypi.python.org/pypi/rq) From 6c6e53542fa62dec263c66e963deba882f7fa9a5 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:33:08 +0100 Subject: [PATCH 18/53] Don't require redis-py 2.10 just for the SSL option. --- rq/cli/helpers.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 8a13218..621ba8b 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -7,8 +7,8 @@ import time from functools import partial import click +import redis from redis import StrictRedis - from rq import Queue, Worker from rq.logutils import setup_loghandlers from rq.worker import WorkerStatus @@ -31,13 +31,28 @@ def get_redis_from_config(settings): if settings.get('REDIS_URL') is not None: return StrictRedis.from_url(settings['REDIS_URL']) - return StrictRedis( - host=settings.get('REDIS_HOST', 'localhost'), - port=settings.get('REDIS_PORT', 6379), - db=settings.get('REDIS_DB', 0), - password=settings.get('REDIS_PASSWORD', None), - ssl=settings.get('REDIS_SSL',False) - ) + kwargs = { + 'host': settings.get('REDIS_HOST', 'localhost'), + 'port': settings.get('REDIS_PORT', 6379), + 'db': settings.get('REDIS_DB', 0), + 'password': settings.get('REDIS_PASSWORD', None), + } + + use_ssl = settings.get('REDIS_SSL', False) + if use_ssl: + # If SSL is required, we need to depend on redis-py being 2.10 at + # least + def safeint(x): + try: + return int(x) + except ValueError: + return 0 + + version_info = tuple(safeint(x) for x in redis.__version__.split('.')) + if not version_info >= (2, 10): + raise RuntimeError('Using SSL requires a redis-py version >= 2.10') + kwargs['ssl'] = use_ssl + return StrictRedis(**kwargs) def pad(s, pad_to_length): From cd155299a45f2e419d1609eb68a893d19d993089 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:33:18 +0100 Subject: [PATCH 19/53] Fix PEP8 complaint. --- rq/cli/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 621ba8b..7bfc0cf 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -134,7 +134,7 @@ def show_workers(queues, raw, by_queue): else: qs = Queue.all() ws = Worker.all() - filter_queues = lambda x: x + filter_queues = (lambda x: x) if not by_queue: for w in ws: From 5cb873b438f1db009447e4feb17fb0f2075b272f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 9 Mar 2015 09:34:22 +0100 Subject: [PATCH 20/53] Fix PEP8 complaint. --- rq/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index a32c6d9..41daa01 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -259,7 +259,6 @@ class Worker(object): if death_timestamp is not None: return utcparse(as_text(death_timestamp)) - def set_state(self, state, pipeline=None): self._state = state connection = pipeline if pipeline is not None else self.connection From 985a2664a4099b192de877c9dfc824d36b43f48e Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Mon, 16 Mar 2015 23:30:41 -0700 Subject: [PATCH 21/53] Prevent `Queue#dequeue` from blowing the stack In the case of many sequential jobs having been deleted, a recursive implementation of `Queue#dequeue` is prone to blowing the stack in the absence of tail-recursion support. Change the implementation from recursive to iterative to work around this issue in CPython. --- rq/queue.py | 32 ++++++++++++++++---------------- tests/test_queue.py | 8 ++++++++ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index ea39d81..baf16d6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -329,22 +329,22 @@ class Queue(object): Returns a job_class instance, which can be executed or inspected. """ - job_id = self.pop_job_id() - if job_id is None: - return None - try: - job = self.job_class.fetch(job_id, connection=self.connection) - except NoSuchJobError as e: - # Silently pass on jobs that don't exist (anymore), - # and continue by reinvoking itself recursively - return self.dequeue() - except UnpickleError as e: - # Attach queue information on the exception for improved error - # reporting - e.job_id = job_id - e.queue = self - raise e - return job + while True: + job_id = self.pop_job_id() + if job_id is None: + return None + try: + job = self.job_class.fetch(job_id, connection=self.connection) + except NoSuchJobError as e: + # Silently pass on jobs that don't exist (anymore), + continue + except UnpickleError as e: + # Attach queue information on the exception for improved error + # reporting + e.job_id = job_id + e.queue = self + raise e + return job @classmethod def dequeue_any(cls, queues, timeout, connection=None): diff --git a/tests/test_queue.py b/tests/test_queue.py index e4e9253..6a631d7 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -173,6 +173,14 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) + def test_dequeue_deleted_jobs(self): + """Dequeueing deleted jobs from queues don't blow the stack.""" + q = Queue() + for _ in range(1,1000): + job = q.enqueue(say_hello) + job.delete() + q.dequeue() + def test_dequeue_instance_method(self): """Dequeueing instance method jobs from queues.""" q = Queue() From a37621a429f4598be6bfd1eb9332b761d351df46 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Mon, 16 Mar 2015 23:38:56 -0700 Subject: [PATCH 22/53] Switch to Travis container-based infrastructure --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 9d3d110..0a9a69e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +sudo: false language: python services: - redis From 91f263d8e051cc2d86f702606c1d1cf5d2f2d13d Mon Sep 17 00:00:00 2001 From: Serhii Maltsev Date: Thu, 19 Mar 2015 09:48:04 +0200 Subject: [PATCH 23/53] change try/except in python2/3 compatibility to to_text() --- rq/job.py | 7 ++----- tests/test_job.py | 8 ++++---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/rq/job.py b/rq/job.py index 5dbbcf5..782c3c7 100644 --- a/rq/job.py +++ b/rq/job.py @@ -514,16 +514,13 @@ class Job(object): if self.func_name is None: return None - # Python 2/3 compatibility - try: - arg_list = [repr(arg).decode('utf-8') for arg in self.args] - except AttributeError: - arg_list = [repr(arg) for arg in self.args] + arg_list = [as_text(repr(arg)) for arg in self.args] kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()] # Sort here because python 3.3 & 3.4 makes different call_string arg_list += sorted(kwargs) args = ', '.join(arg_list) + return '%s(%s)' % (self.func_name, args) def cleanup(self, ttl=None, pipeline=None): diff --git a/tests/test_job.py b/tests/test_job.py index d2e7f44..1a7d231 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -31,12 +31,12 @@ class TestJob(RQTestCase): kwargs=dict(snowman="☃", null=None), ) - try: - # Python 2 - test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') - except AttributeError: + if not PY2: # Python 3 test_string = "myfunc(12, '☃', null=None, snowman='☃')" + else: + # Python 2 + test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') self.assertEquals( job.description, From 071c47dacba06f55de9d9a1569ac91592dc18eb2 Mon Sep 17 00:00:00 2001 From: glaslos Date: Thu, 19 Mar 2015 15:32:01 +0100 Subject: [PATCH 24/53] hard-coded TTL to -1 on job.perform() Removed merge artifacts --- rq/job.py | 2 +- tests/test_job.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index 7195ddb..ffa7983 100644 --- a/rq/job.py +++ b/rq/job.py @@ -486,7 +486,7 @@ class Job(object): def perform(self): # noqa """Invokes the job function with the job arguments.""" self.connection.persist(self.key) - self.ttl = self.connection.ttl(self.key) + self.ttl = -1 _job_stack.push(self.id) try: self._result = self.func(*self.args, **self.kwargs) diff --git a/tests/test_job.py b/tests/test_job.py index 2e91751..600cefd 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,11 +4,6 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation) -from tests.helpers import strip_microseconds - from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job From fd1dca40b954698680939803a232f5e1f4cecb67 Mon Sep 17 00:00:00 2001 From: Trevor Prater Date: Fri, 27 Mar 2015 17:45:33 -0400 Subject: [PATCH 25/53] Improving logging. - Include worker key in worker startup log statement. - Added a notification to make it more clear when a 'burst' worker dies. --- rq/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 41daa01..e118c3f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -394,7 +394,7 @@ class Worker(object): did_perform_work = False self.register_birth() - self.log.info('RQ worker started, version %s' % VERSION) + self.log.info("RQ worker, '%s', started, version %s" % (self.key, VERSION)) self.set_state(WorkerStatus.STARTED) try: @@ -410,6 +410,8 @@ class Worker(object): result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: + if burst: + self.log.info("RQ worker, '%s', has died." % self.key) break except StopRequested: break From 5674edad61d85bbf4dd780439ce5cbb43c40cb13 Mon Sep 17 00:00:00 2001 From: Trevor Prater Date: Sat, 28 Mar 2015 12:12:04 -0400 Subject: [PATCH 26/53] Changed rqworker shutdown message. - As requested by @nvie and @selwin. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index e118c3f..a29b09b 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -411,7 +411,7 @@ class Worker(object): result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: - self.log.info("RQ worker, '%s', has died." % self.key) + self.log.info("RQ worker, '%s', done, quitting." % self.key) break except StopRequested: break From 82df2ee6899df7d9f10fe83279044b11c613afbe Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 12 Apr 2015 10:29:20 +0200 Subject: [PATCH 27/53] Fix PEP8 issue --- tests/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 6a631d7..8bb0ce0 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -176,7 +176,7 @@ class TestQueue(RQTestCase): def test_dequeue_deleted_jobs(self): """Dequeueing deleted jobs from queues don't blow the stack.""" q = Queue() - for _ in range(1,1000): + for _ in range(1, 1000): job = q.enqueue(say_hello) job.delete() q.dequeue() From df4d4c8d5dca9c70a82f8f5b22ddb5eb876e3959 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 12 Apr 2015 11:15:55 +0200 Subject: [PATCH 28/53] Make test cases more explicit --- tests/fixtures.py | 3 +-- tests/test_job.py | 40 ++++++++++++++++++---------------------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 4271373..a2bbefb 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -54,8 +54,7 @@ def create_file_after_timeout(path, timeout): def access_self(): - job = get_current_job() - return job.id + assert get_current_job() is not None def echo(*args, **kwargs): diff --git a/tests/test_job.py b/tests/test_job.py index 0f6ab30..0ce142e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,17 +4,18 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime -from rq.compat import as_text, PY2 +from tests import RQTestCase +from tests.fixtures import (CallableObject, Number, access_self, + long_running_job, say_hello, some_calculation) +from tests.helpers import strip_microseconds + +from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError -from rq.job import get_current_job, Job +from rq.job import Job, get_current_job from rq.queue import Queue from rq.registry import DeferredJobRegistry from rq.utils import utcformat - -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation, long_running_job) -from tests.helpers import strip_microseconds +from rq.worker import Worker try: from cPickle import loads, dumps @@ -291,25 +292,20 @@ class TestJob(RQTestCase): else: self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") - def test_job_access_within_job_function(self): - """The current job is accessible within the job function.""" - # Executing the job function from outside of RQ throws an exception + def test_job_access_outside_job_fails(self): + """The current job is accessible only within a job context.""" self.assertIsNone(get_current_job()) - # Executing the job function from within the job works (and in - # this case leads to the job ID being returned) - job = Job.create(func=access_self) - job.save() - id = job.perform() - self.assertEqual(job.id, id) - self.assertEqual(job.func, access_self) + def test_job_access_within_job_function(self): + """The current job is accessible within the job function.""" + q = Queue() + q.enqueue(access_self) # access_self calls get_current_job() and asserts + w = Worker([q]) + w.work(burst=True) - # Ensure that get_current_job also works from within synchronous jobs + def test_job_access_within_synchronous_job_function(self): queue = Queue(async=False) - job = queue.enqueue(access_self) - id = job.perform() - self.assertEqual(job.id, id) - self.assertEqual(job.func, access_self) + queue.enqueue(access_self) def test_get_result_ttl(self): """Getting job result TTL.""" From f651a72a50f1d9df1e6788347fdb60a7625c41fd Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 12 Apr 2015 11:27:58 +0200 Subject: [PATCH 29/53] Make clearer what are fixture functions --- tests/test_job.py | 68 +++++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index 0ce142e..f6766fe 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -5,8 +5,6 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime from tests import RQTestCase -from tests.fixtures import (CallableObject, Number, access_self, - long_running_job, say_hello, some_calculation) from tests.helpers import strip_microseconds from rq.compat import PY2, as_text @@ -17,6 +15,8 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker +from . import fixtures + try: from cPickle import loads, dumps except ImportError: @@ -70,7 +70,7 @@ class TestJob(RQTestCase): def test_create_typical_job(self): """Creation of jobs for function calls.""" - job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) # Jobs have a random UUID self.assertIsNotNone(job.id) @@ -79,7 +79,7 @@ class TestJob(RQTestCase): self.assertIsNone(job.instance) # Job data is set... - self.assertEquals(job.func, some_calculation) + self.assertEquals(job.func, fixtures.some_calculation) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, {'z': 2}) @@ -90,7 +90,7 @@ class TestJob(RQTestCase): def test_create_instance_method_job(self): """Creation of jobs for instance methods.""" - n = Number(2) + n = fixtures.Number(2) job = Job.create(func=n.div, args=(4,)) # Job data is set @@ -103,13 +103,13 @@ class TestJob(RQTestCase): job = Job.create(func='tests.fixtures.say_hello', args=('World',)) # Job data is set - self.assertEquals(job.func, say_hello) + self.assertEquals(job.func, fixtures.say_hello) self.assertIsNone(job.instance) self.assertEquals(job.args, ('World',)) def test_create_job_from_callable_class(self): """Creation of jobs using a callable class specifier.""" - kallable = CallableObject() + kallable = fixtures.CallableObject() job = Job.create(func=kallable) self.assertEquals(job.func, kallable.__call__) @@ -138,7 +138,7 @@ class TestJob(RQTestCase): def test_save(self): # noqa """Storing jobs.""" - job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) # Saving creates a Redis hash self.assertEquals(self.testconn.exists(job.key), False) @@ -174,7 +174,7 @@ class TestJob(RQTestCase): def test_persistence_of_typical_jobs(self): """Storing typical jobs.""" - job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() expected_date = strip_microseconds(job.created_at) @@ -190,15 +190,15 @@ class TestJob(RQTestCase): def test_persistence_of_parent_job(self): """Storing jobs with parent job, either instance or key.""" - parent_job = Job.create(func=some_calculation) + parent_job = Job.create(func=fixtures.some_calculation) parent_job.save() - job = Job.create(func=some_calculation, depends_on=parent_job) + job = Job.create(func=fixtures.some_calculation, depends_on=parent_job) job.save() stored_job = Job.fetch(job.id) self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job.dependency, parent_job) - job = Job.create(func=some_calculation, depends_on=parent_job.id) + job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id) job.save() stored_job = Job.fetch(job.id) self.assertEqual(stored_job._dependency_id, parent_job.id) @@ -206,7 +206,7 @@ class TestJob(RQTestCase): def test_store_then_fetch(self): """Store, then fetch.""" - job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() job2 = Job.fetch(job.id) @@ -225,7 +225,7 @@ class TestJob(RQTestCase): def test_fetching_unreadable_data(self): """Fetching succeeds on unreadable data, but lazy props fail.""" # Set up - job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise @@ -238,7 +238,7 @@ class TestJob(RQTestCase): def test_job_is_unimportable(self): """Jobs that cannot be imported throw exception on access.""" - job = Job.create(func=say_hello, args=('Lionel',)) + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.save() # Now slightly modify the job to make it unimportable (this is @@ -254,7 +254,7 @@ class TestJob(RQTestCase): def test_custom_meta_is_persisted(self): """Additional meta data on jobs are stored persisted correctly.""" - job = Job.create(func=say_hello, args=('Lionel',)) + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.meta['foo'] = 'bar' job.save() @@ -266,25 +266,25 @@ class TestJob(RQTestCase): def test_result_ttl_is_persisted(self): """Ensure that job's result_ttl is set properly""" - job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) + job = Job.create(func=fixtures.say_hello, args=('Lionel',), result_ttl=10) job.save() Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, 10) - job = Job.create(func=say_hello, args=('Lionel',)) + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.save() Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, None) def test_description_is_persisted(self): """Ensure that job's custom description is set properly""" - job = Job.create(func=say_hello, args=('Lionel',), description='Say hello!') + job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!') job.save() Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.description, 'Say hello!') # Ensure job description is constructed from function call string - job = Job.create(func=say_hello, args=('Lionel',)) + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.save() Job.fetch(job.id, connection=self.testconn) if PY2: @@ -299,23 +299,23 @@ class TestJob(RQTestCase): def test_job_access_within_job_function(self): """The current job is accessible within the job function.""" q = Queue() - q.enqueue(access_self) # access_self calls get_current_job() and asserts + q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts w = Worker([q]) w.work(burst=True) def test_job_access_within_synchronous_job_function(self): queue = Queue(async=False) - queue.enqueue(access_self) + queue.enqueue(fixtures.access_self) def test_get_result_ttl(self): """Getting job result TTL.""" job_result_ttl = 1 default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl) job.save() self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) self.assertEqual(job.get_result_ttl(), job_result_ttl) - job = Job.create(func=say_hello) + job = Job.create(func=fixtures.say_hello) job.save() self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_result_ttl(), None) @@ -323,24 +323,24 @@ class TestJob(RQTestCase): def test_get_job_ttl(self): """Getting job TTL.""" ttl = 1 - job = Job.create(func=say_hello, ttl=ttl) + job = Job.create(func=fixtures.say_hello, ttl=ttl) job.save() self.assertEqual(job.get_ttl(), ttl) - job = Job.create(func=say_hello) + job = Job.create(func=fixtures.say_hello) job.save() self.assertEqual(job.get_ttl(), None) def test_ttl_via_enqueue(self): ttl = 1 queue = Queue(connection=self.testconn) - job = queue.enqueue(say_hello, ttl=ttl) + job = queue.enqueue(fixtures.say_hello, ttl=ttl) self.assertEqual(job.get_ttl(), ttl) def test_never_expire_during_execution(self): """Test what happens when job expires during execution""" ttl = 1 queue = Queue(connection=self.testconn) - job = queue.enqueue(long_running_job, args=(2,), ttl=ttl) + job = queue.enqueue(fixtures.long_running_job, args=(2,), ttl=ttl) self.assertEqual(job.get_ttl(), ttl) job.save() job.perform() @@ -350,7 +350,7 @@ class TestJob(RQTestCase): def test_cleanup(self): """Test that jobs and results are expired properly.""" - job = Job.create(func=say_hello) + job = Job.create(func=fixtures.say_hello) job.save() # Jobs with negative TTLs don't expire @@ -370,7 +370,7 @@ class TestJob(RQTestCase): origin = 'some_queue' registry = DeferredJobRegistry(origin, self.testconn) - job = Job.create(func=say_hello, origin=origin) + job = Job.create(func=fixtures.say_hello, origin=origin) job._dependency_id = 'id' job.save() @@ -382,8 +382,8 @@ class TestJob(RQTestCase): def test_cancel(self): """job.cancel() deletes itself & dependents mapping from Redis.""" queue = Queue(connection=self.testconn) - job = queue.enqueue(say_hello) - job2 = Job.create(func=say_hello, depends_on=job) + job = queue.enqueue(fixtures.say_hello) + job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2.register_dependency() job.cancel() self.assertFalse(self.testconn.exists(job.key)) @@ -394,8 +394,8 @@ class TestJob(RQTestCase): def test_create_job_with_id(self): """test creating jobs with a custom ID""" queue = Queue(connection=self.testconn) - job = queue.enqueue(say_hello, job_id="1234") + job = queue.enqueue(fixtures.say_hello, job_id="1234") self.assertEqual(job.id, "1234") job.perform() - self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234) + self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234) From 8f7322ed10ae53c93c9fe3a20db1c24441b8b464 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 14 Apr 2015 09:10:35 +0200 Subject: [PATCH 30/53] This is 0.5.2 --- CHANGES.md | 8 ++++++++ rq/version.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 4b4d6d0..5428c71 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,11 @@ +### 0.5.2 + +(April 14th, 2015) + +- Support SSL connection to Redis (requires redis-py>=2.10) +- Fix to prevent deep call stacks with large queues + + ### 0.5.1 (March 9th, 2015) diff --git a/rq/version.py b/rq/version.py index 7b8e658..4b0ec94 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.1' +VERSION = '0.5.2' From de1178ce3f366cb99a2c2682616ed21032829fc7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 14 Apr 2015 09:20:24 +0200 Subject: [PATCH 31/53] Add Makefile for convenient releasing --- Makefile | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3149d8d --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +all: + @grep -Ee '^[a-z].*:' Makefile | cut -d: -f1 | grep -vF all + +clean: + rm -rf build/ dist/ + +release: + # Check if latest tag is the current head we're releasing + echo "Latest tag = $$(git tag | sort -nr | head -n1)" + echo "HEAD SHA = $$(git sha head)" + echo "Latest tag SHA = $$(git tag | sort -nr | head -n1 | xargs git sha)" + @test "$$(git sha head)" = "$$(git tag | sort -nr | head -n1 | xargs git sha)" + make force_release + +force_release: clean + git push --tags + python setup.py sdist bdist_wheel + twine upload dist/* From 260f7caf66bc151ba1c8320c6eb2593a18a1906b Mon Sep 17 00:00:00 2001 From: Cosmin Stefan Date: Thu, 30 Apr 2015 11:57:05 +0300 Subject: [PATCH 32/53] Enable proper setup of signale handlers for SimpleWorker as well. fixes #523 This allows a SIGTERM to make the worker perform a warm shutdown and cleanly break out of the loop and finish the current job, if any. --- rq/worker.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 41daa01..750ebec 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -646,11 +646,6 @@ class Worker(object): class SimpleWorker(Worker): - def _install_signal_handlers(self, *args, **kwargs): - """Signal handlers are useless for test worker, as it - does not have fork() ability""" - pass - def main_work_horse(self, *args, **kwargs): raise NotImplementedError("Test worker does not implement this method") From 70d5f971bddface0075c54bcf98108f5737bfe71 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 12 May 2015 17:24:05 +0800 Subject: [PATCH 33/53] Jobs from FinishedJobRegistry that are moved to FailedQueue should have "failed" as status. --- rq/registry.py | 12 +++++++++++- tests/test_registry.py | 18 ++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 08798eb..1180d4f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,5 +1,7 @@ from .compat import as_text from .connections import resolve_connection +from .exceptions import NoSuchJobError +from .job import Job, JobStatus from .queue import FailedQueue from .utils import current_timestamp @@ -80,9 +82,17 @@ class StartedJobRegistry(BaseRegistry): if job_ids: failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: for job_id in job_ids: - failed_queue.push_job_id(job_id, pipeline=pipeline) + try: + job = Job.fetch(job_id, connection=self.connection) + job.status = JobStatus.FAILED + job.save(pipeline=pipeline) + failed_queue.push_job_id(job_id, pipeline=pipeline) + except NoSuchJobError: + pass + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() diff --git a/tests/test_registry.py b/tests/test_registry.py index f54b315..628636a 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from rq.compat import as_text -from rq.job import Job +from rq.job import Job, JobStatus from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker @@ -60,15 +60,21 @@ class TestRegistry(RQTestCase): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 2, 'foo') + + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + self.testconn.zadd(self.registry.key, 2, job.id) self.registry.cleanup(1) - self.assertNotIn('foo', failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.assertNotIn(job.id, failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2) self.registry.cleanup() - self.assertIn('foo', failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) + self.assertIn(job.id, failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None) + job.refresh() + self.assertEqual(job.status, JobStatus.FAILED) def test_job_execution(self): """Job is removed from StartedJobRegistry after execution.""" From 9a00b0eca6e79f0f24645cbf92f762e0d6bb8a6a Mon Sep 17 00:00:00 2001 From: RyanMTB Date: Wed, 20 May 2015 21:48:13 -0700 Subject: [PATCH 34/53] Updated Worker API --- rq/queue.py | 3 +++ rq/worker.py | 16 +++++++++++----- tests/test_worker.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 2187c7e..bf5a186 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -70,6 +70,9 @@ class Queue(object): def __len__(self): return self.count + def __iter__(self): + yield self + @property def key(self): """Returns the Redis key for this Queue.""" diff --git a/rq/worker.py b/rq/worker.py index fa3c49b..39e8ce6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -125,8 +125,7 @@ class Worker(object): if connection is None: connection = get_current_connection() self.connection = connection - if isinstance(queues, self.queue_class): - queues = [queues] + queues = self.process_queue_args(queues) self._name = name self.queues = queues self.validate_queues() @@ -160,11 +159,18 @@ class Worker(object): def validate_queues(self): """Sanity check for the given queues.""" - if not iterable(self.queues): - raise ValueError('Argument queues not iterable.') for queue in self.queues: if not isinstance(queue, self.queue_class): - raise NoQueueError('Give each worker at least one Queue.') + raise NoQueueError('{0} is not a queue'.format(queue)) + + def process_queue_args(self, queue_args): + """ allow for a string, a queue an iterable of strings + or an iterable of queues""" + if isinstance(queue_args, text_type): + return self.queue_class(name = queue_args) + else: + return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) + else queue_arg for queue_arg in queue_args] def queue_names(self): """Returns the queue names of this worker's queues.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 6f89f4a..c74d36a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -12,6 +12,7 @@ from tests.helpers import strip_microseconds from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq.compat import as_text +from rq.exceptions import NoQueueError from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend @@ -28,6 +29,40 @@ class TestWorker(RQTestCase): w = Worker([fooq, barq]) self.assertEquals(w.queues, [fooq, barq]) + def test_create_worker_args_single_queue(self): + """Test Worker creation with single queue instance arg""" + fooq = Queue('foo') + w = Worker(fooq) + self.assertEquals(w.queue_keys(), ['rq:queue:foo']) + + def test_create_worker_args_single_string(self): + """ Test Worker creation with single string arg""" + w = Worker('foo') + self.assertEquals(w.queue_keys(),['rq:queue:foo']) + + def test_create_worker_args_iterable_strings(self): + """ Test Worker creation with iterable of strings""" + w = Worker(['foo', 'bar']) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_args_iterable_queues(self): + """ Test Worker test worker creation + with an iterable of queue instance args""" + w = Worker(map(Queue, ['foo', 'bar'])) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_args_list_map(self): + """ Test Worker test worker creation + with a list of queue from map""" + w = Worker(list(map(Queue, ['foo', 'bar']))) + self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) + + def test_create_worker_raises_noqueue_error(self): + """ make sure raises noqueue error if a + a non string or queue is passed""" + with self.assertRaises(NoQueueError): + w = Worker([1]) + def test_work_and_quit(self): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') From faf9d3e66838a71e5b2e28add0a503e3fdc4fa03 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 08:46:00 +0700 Subject: [PATCH 35/53] Added clean_registries(queue) function to clean job registries related to that queue. --- rq/registry.py | 13 +++++++++++-- tests/test_registry.py | 19 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 1180d4f..ded86da 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,8 +8,9 @@ from .utils import current_timestamp class BaseRegistry(object): """ - Base implementation of job registry, implemented in Redis sorted set. Each job - is stored as a key in the registry, scored by expiration time (unix timestamp). + Base implementation of a job registry, implemented in Redis sorted set. + Each job is stored as a key in the registry, scored by expiration time + (unix timestamp). """ def __init__(self, name='default', connection=None): @@ -134,3 +135,11 @@ class DeferredJobRegistry(BaseRegistry): automatically called by `count()` and `get_job_ids()` methods implemented in BaseRegistry.""" pass + + +def clean_registries(queue): + """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" + registry = FinishedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() + registry = StartedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() diff --git a/tests/test_registry.py b/tests/test_registry.py index 628636a..9bb1856 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -6,8 +6,8 @@ from rq.job import Job, JobStatus from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, - StartedJobRegistry) +from rq.registry import (clean_registries, DeferredJobRegistry, + FinishedJobRegistry, StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -107,6 +107,21 @@ class TestRegistry(RQTestCase): self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) + def test_clean_registries(self): + """clean_registries() cleans Started and Finished job registries.""" + + queue = Queue(connection=self.testconn) + + finished_job_registry = FinishedJobRegistry(connection=self.testconn) + self.testconn.zadd(finished_job_registry.key, 1, 'foo') + + started_job_registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(started_job_registry.key, 1, 'foo') + + clean_registries(queue) + self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) + class TestFinishedJobRegistry(RQTestCase): From 5782ac10c40c84c495ee1033e78652084cab1078 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 09:01:25 +0700 Subject: [PATCH 36/53] Added worker.clean_registries(). --- rq/worker.py | 9 ++++++++- tests/test_worker.py | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index fa3c49b..a4cd529 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -20,7 +20,7 @@ from .exceptions import DequeueTimeout, NoQueueError from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue -from .registry import FinishedJobRegistry, StartedJobRegistry +from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse @@ -146,6 +146,7 @@ class Worker(object): self._stopped = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + self.maintenance_date = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -646,6 +647,12 @@ class Worker(object): """The hash does not take the database/connection into account""" return hash(self.name) + def clean_registries(self): + """Runs maintenance jobs on each Queue's registries.""" + for queue in self.queues: + clean_registries(queue) + self.maintenance_date = utcnow() + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 6f89f4a..961e96b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -401,3 +401,22 @@ class TestWorker(RQTestCase): death_date = w.death_date self.assertIsNotNone(death_date) self.assertEquals(type(death_date).__name__, 'datetime') + + def test_clean_queue_registries(self): + """worker.clean_registries sets maintenance_date and cleans registries.""" + foo_queue = Queue('foo', connection=self.testconn) + foo_registry = StartedJobRegistry('foo', connection=self.testconn) + self.testconn.zadd(foo_registry.key, 1, 'foo') + self.assertEqual(self.testconn.zcard(foo_registry.key), 1) + + bar_queue = Queue('bar', connection=self.testconn) + bar_registry = StartedJobRegistry('bar', connection=self.testconn) + self.testconn.zadd(bar_registry.key, 1, 'bar') + self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + + worker = Worker([foo_queue, bar_queue]) + self.assertEqual(worker.maintenance_date, None) + worker.clean_registries() + self.assertNotEqual(worker.maintenance_date, None) + self.assertEqual(self.testconn.zcard(foo_registry.key), 0) + self.assertEqual(self.testconn.zcard(bar_registry.key), 0) From c3767e28e2ca43a6aad93e6947f00f205c431b5a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 10:08:04 +0700 Subject: [PATCH 37/53] Worker now runs maintenance tasks every hour and on startup. --- rq/worker.py | 15 ++++++++++++++- tests/test_worker.py | 23 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index a4cd529..adeb56e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import timedelta import errno import logging import os @@ -403,6 +404,9 @@ class Worker(object): try: self.check_for_suspension(burst) + if self.should_run_maintenance_tasks: + self.clean_registries() + if self.stopped: self.log.info('Stopping on request.') break @@ -609,7 +613,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) @@ -653,6 +657,15 @@ class Worker(object): clean_registries(queue) self.maintenance_date = utcnow() + @property + def should_run_maintenance_tasks(self): + """Maintenance tasks should run on first startup or every hour.""" + if self.maintenance_date is None: + return True + if (utcnow() - self.maintenance_date) > timedelta(seconds=3600): + return True + return False + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 961e96b..3fc44bb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +from datetime import timedelta from time import sleep from tests import RQTestCase, slow @@ -15,6 +16,7 @@ from rq.compat import as_text from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend +from rq.utils import utcnow class CustomJob(Job): @@ -420,3 +422,24 @@ class TestWorker(RQTestCase): self.assertNotEqual(worker.maintenance_date, None) self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0) + + def test_should_run_maintenance_tasks(self): + """Workers should run maintenance tasks on startup and every hour.""" + queue = Queue(connection=self.testconn) + worker = Worker(queue) + self.assertTrue(worker.should_run_maintenance_tasks) + + worker.maintenance_date = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.maintenance_date = utcnow() - timedelta(seconds=3700) + self.assertTrue(worker.should_run_maintenance_tasks) + + def test_worker_calls_clean_registries(self): + """Worker calls clean_registries when run.""" + queue = Queue(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(registry.key, 1, 'foo') + + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + self.assertEqual(self.testconn.zcard(registry.key), 0) From bac6699ea4f00b807fedbee185f54ba6a38abb24 Mon Sep 17 00:00:00 2001 From: Neal Todd Date: Tue, 26 May 2015 17:32:11 +0100 Subject: [PATCH 38/53] Allow non-ASCII characters in keyword arguments. --- rq/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index 3c3016d..cbdf7f4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -518,7 +518,7 @@ class Job(object): arg_list = [as_text(repr(arg)) for arg in self.args] - kwargs = ['{0}={1!r}'.format(k, v) for k, v in self.kwargs.items()] + kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()] # Sort here because python 3.3 & 3.4 makes different call_string arg_list += sorted(kwargs) args = ', '.join(arg_list) From 303f4ed47c10002b4ff797592598036de2e5a778 Mon Sep 17 00:00:00 2001 From: Robert Brownstein Date: Thu, 28 May 2015 17:15:18 -0400 Subject: [PATCH 39/53] Added test coverage for unicode keyword argument support in method signatures (#536) --- tests/fixtures.py | 5 +++++ tests/test_job.py | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/tests/fixtures.py b/tests/fixtures.py index a2bbefb..09cf02e 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -78,6 +78,11 @@ class CallableObject(object): return u"I'm callable" +class UnicodeStringObject(object): + def __repr__(self): + return u'é'.encode('utf-8') + + with Connection(): @job(queue='default') def decorated_job(x, y): diff --git a/tests/test_job.py b/tests/test_job.py index f6766fe..36570cf 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -399,3 +399,11 @@ class TestJob(RQTestCase): job.perform() self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234) + + def test_get_call_string_unicode(self): + """test call string with unicode keyword arguments""" + queue = Queue(connection=self.testconn) + + job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject()) + self.assertIsNotNone(job.get_call_string()) + job.perform() \ No newline at end of file From 3d8faa0e5d3e7359e71927fee32f9ef3cfd9109d Mon Sep 17 00:00:00 2001 From: Robert Brownstein Date: Thu, 28 May 2015 17:23:04 -0400 Subject: [PATCH 40/53] Added proper conditional behavior to unicode fixture for python 3 --- tests/fixtures.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 09cf02e..cdb1822 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -11,7 +11,7 @@ import time from rq import Connection, get_current_job from rq.decorators import job - +from rq.compat import PY2 def say_pid(): return os.getpid() @@ -80,7 +80,10 @@ class CallableObject(object): class UnicodeStringObject(object): def __repr__(self): - return u'é'.encode('utf-8') + if PY2: + return u'é'.encode('utf-8') + else: + return u'é' with Connection(): From 5b8726ad2d8bce4d7c947af2bb6c5bd8f060452b Mon Sep 17 00:00:00 2001 From: Marcus Martins Date: Tue, 26 May 2015 14:39:17 -0300 Subject: [PATCH 41/53] Fixes #502 Fixes some broken tests and misbehaviour with ttls. There was a temporal coupling between saving the job and setting its expires parameter. --- rq/job.py | 5 ++++- tests/test_job.py | 28 ++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index cbdf7f4..c1022cc 100644 --- a/rq/job.py +++ b/rq/job.py @@ -417,6 +417,7 @@ class Job(object): self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self._dependency_id = as_text(obj.get('dependency_id', None)) + self.ttl = int(obj.get('ttl', -1)) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def to_dict(self): @@ -447,6 +448,8 @@ class Job(object): obj['dependency_id'] = self._dependency_id if self.meta: obj['meta'] = dumps(self.meta) + if self.ttl: + obj['ttl'] = self.ttl return obj @@ -456,7 +459,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) - self.cleanup(self.ttl) + self.cleanup(self.ttl, pipeline=connection) def cancel(self): """Cancels the given job, which will prevent the job from ever being diff --git a/tests/test_job.py b/tests/test_job.py index 36570cf..08d6385 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import time from datetime import datetime from tests import RQTestCase @@ -15,7 +16,7 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker -from . import fixtures +import fixtures try: from cPickle import loads, dumps @@ -103,7 +104,8 @@ class TestJob(RQTestCase): job = Job.create(func='tests.fixtures.say_hello', args=('World',)) # Job data is set - self.assertEquals(job.func, fixtures.say_hello) + self.assertTrue(job.func.func_code.co_filename in fixtures.say_hello.func_code.co_filename) + self.assertEquals(job.func.func_code.co_firstlineno, fixtures.say_hello.func_code.co_firstlineno) self.assertIsNone(job.instance) self.assertEquals(job.args, ('World',)) @@ -147,7 +149,7 @@ class TestJob(RQTestCase): # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) - self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation') + self.assertEquals(unpickled_data[0], 'fixtures.some_calculation') def test_fetch(self): """Fetching jobs.""" @@ -288,9 +290,9 @@ class TestJob(RQTestCase): job.save() Job.fetch(job.id, connection=self.testconn) if PY2: - self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')") + self.assertEqual(job.description, "fixtures.say_hello(u'Lionel')") else: - self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + self.assertEqual(job.description, "fixtures.say_hello('Lionel')") def test_job_access_outside_job_fails(self): """The current job is accessible only within a job context.""" @@ -406,4 +408,18 @@ class TestJob(RQTestCase): job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject()) self.assertIsNotNone(job.get_call_string()) - job.perform() \ No newline at end of file + job.perform() + + def test_create_job_with_ttl_should_have_ttl_after_enqueued(self): + """test creating jobs with ttl and checks if get_jobs returns it properly [issue502]""" + queue = Queue(connection=self.testconn) + queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10) + job = queue.get_jobs()[0] + self.assertEqual(job.ttl, 10) + + def test_create_job_with_ttl_should_expire(self): + """test if a job created with ttl expires [issue502]""" + queue = Queue(connection=self.testconn) + queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1) + time.sleep(1) + self.assertEqual(0, len(queue.get_jobs())) From ab6c129833dcb013142beefede6d44da3c3f16be Mon Sep 17 00:00:00 2001 From: Marcus Martins Date: Tue, 26 May 2015 16:30:35 -0300 Subject: [PATCH 42/53] Fix broken tests --- tests/test_job.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index 08d6385..a078ce6 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -2,8 +2,8 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -import time from datetime import datetime +import time from tests import RQTestCase from tests.helpers import strip_microseconds @@ -16,7 +16,7 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker -import fixtures +from . import fixtures try: from cPickle import loads, dumps @@ -104,8 +104,7 @@ class TestJob(RQTestCase): job = Job.create(func='tests.fixtures.say_hello', args=('World',)) # Job data is set - self.assertTrue(job.func.func_code.co_filename in fixtures.say_hello.func_code.co_filename) - self.assertEquals(job.func.func_code.co_firstlineno, fixtures.say_hello.func_code.co_firstlineno) + self.assertEquals(job.func, fixtures.say_hello) self.assertIsNone(job.instance) self.assertEquals(job.args, ('World',)) @@ -149,7 +148,7 @@ class TestJob(RQTestCase): # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) - self.assertEquals(unpickled_data[0], 'fixtures.some_calculation') + self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation') def test_fetch(self): """Fetching jobs.""" @@ -290,9 +289,9 @@ class TestJob(RQTestCase): job.save() Job.fetch(job.id, connection=self.testconn) if PY2: - self.assertEqual(job.description, "fixtures.say_hello(u'Lionel')") + self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')") else: - self.assertEqual(job.description, "fixtures.say_hello('Lionel')") + self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") def test_job_access_outside_job_fails(self): """The current job is accessible only within a job context.""" From 513f6310d2d6946907b34141f13d7637de6acb2b Mon Sep 17 00:00:00 2001 From: Marcus Martins Date: Thu, 28 May 2015 23:35:30 -0300 Subject: [PATCH 43/53] Change default TTL to None insted of -1 --- rq/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index c1022cc..ebccb13 100644 --- a/rq/job.py +++ b/rq/job.py @@ -417,7 +417,7 @@ class Job(object): self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self._dependency_id = as_text(obj.get('dependency_id', None)) - self.ttl = int(obj.get('ttl', -1)) + self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} def to_dict(self): From efcdd15902677cee4307d8120faace25d7e4e3b1 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 1 Jun 2015 11:00:26 +0700 Subject: [PATCH 44/53] Update setup.py to ensure Python 2.6 dependencies are installed by pip. --- setup.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index f2a8574..9fdd3ee 100644 --- a/setup.py +++ b/setup.py @@ -51,13 +51,16 @@ setup( 'rqworker = rq.cli:worker', ], }, + extras_require={ + ':python_version=="2.6"': ['argparse', 'importlib'], + }, classifiers=[ # As from http://pypi.python.org/pypi?%3Aaction=list_classifiers #'Development Status :: 1 - Planning', #'Development Status :: 2 - Pre-Alpha', #'Development Status :: 3 - Alpha', - 'Development Status :: 4 - Beta', - #'Development Status :: 5 - Production/Stable', + #'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', #'Development Status :: 6 - Mature', #'Development Status :: 7 - Inactive', 'Intended Audience :: Developers', From 8f9c507f12e4044dc7e065733204ea009fcdeae4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 1 Jun 2015 11:00:36 +0700 Subject: [PATCH 45/53] Bump version to 0.5.3. --- CHANGES.md | 11 +++++++++++ rq/version.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 5428c71..00f248a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,14 @@ +### 0.5.3 + +(June 1st, 2015) + +- enqueue_call() no longer ignores `ttl`. Thanks @mbodock! +- Better support for unicode kwargs. Thanks @nealtodd and @brownstein! +- Better API for instantiating Workers. Thanks @RyanMTB! +- Jobs that are moved from `StartedJobRegistry` to `FailedQueue` now + have their statuses assigned properly. +- Workers now automatically cleans up job registries every hour. + ### 0.5.2 (April 14th, 2015) diff --git a/rq/version.py b/rq/version.py index 4b0ec94..e8c1e4b 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.2' +VERSION = '0.5.3' From 94258761aeb46ff0342b4f6658fbddad704ff1b6 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 09:43:07 +0200 Subject: [PATCH 46/53] Make string formatting consistent --- rq/connections.py | 4 +-- rq/job.py | 22 +++++++-------- rq/queue.py | 22 +++++++-------- rq/registry.py | 6 ++-- rq/timeouts.py | 2 +- rq/worker.py | 71 +++++++++++++++++++++++------------------------ 6 files changed, 62 insertions(+), 65 deletions(-) diff --git a/rq/connections.py b/rq/connections.py index ae4d036..03b8c05 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -43,7 +43,7 @@ def use_connection(redis=None): use of use_connection() and stacked connection contexts. """ assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection().' + 'You should not mix Connection contexts with use_connection()' release_local(_connection_stack) if redis is None: @@ -67,7 +67,7 @@ def resolve_connection(connection=None): connection = get_current_connection() if connection is None: - raise NoRedisConnectionException('Could not resolve a Redis connection.') + raise NoRedisConnectionException('Could not resolve a Redis connection') return connection diff --git a/rq/job.py b/rq/job.py index ebccb13..1828fe2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -50,7 +50,7 @@ def unpickle(pickled_string): try: obj = loads(pickled_string) except Exception as e: - raise UnpickleError('Could not unpickle.', pickled_string, e) + raise UnpickleError('Could not unpickle', pickled_string, e) return obj @@ -99,9 +99,9 @@ class Job(object): kwargs = {} if not isinstance(args, (tuple, list)): - raise TypeError('{0!r} is not a valid args list.'.format(args)) + raise TypeError('{0!r} is not a valid args list'.format(args)) if not isinstance(kwargs, dict): - raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) + raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) job = cls(connection=connection) if id is not None: @@ -116,7 +116,7 @@ class Job(object): job._instance = func.__self__ job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): - job._func_name = '%s.%s' % (func.__module__, func.__name__) + job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) elif isinstance(func, string_types): job._func_name = as_text(func) elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance @@ -212,7 +212,7 @@ class Job(object): def data(self): if self._data is UNEVALUATED: if self._func_name is UNEVALUATED: - raise ValueError('Cannot build the job data.') + raise ValueError('Cannot build the job data') if self._instance is UNEVALUATED: self._instance = None @@ -317,7 +317,7 @@ class Job(object): self.meta = {} def __repr__(self): # noqa - return 'Job(%r, enqueued_at=%r)' % (self._id, self.enqueued_at) + return 'Job({0!r}, enqueued_at={1!r})'.format(self._id, self.enqueued_at) # Data access def get_id(self): # noqa @@ -331,7 +331,7 @@ class Job(object): def set_id(self, value): """Sets a job ID for the given job.""" if not isinstance(value, string_types): - raise TypeError('id must be a string, not {0}.'.format(type(value))) + raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value id = property(get_id, set_id) @@ -344,7 +344,7 @@ class Job(object): @classmethod def dependents_key_for(cls, job_id): """The Redis key that is used to store job hash under.""" - return 'rq:job:%s:dependents' % (job_id,) + return 'rq:job:{0}:dependents'.format(job_id) @property def key(self): @@ -393,7 +393,7 @@ class Job(object): key = self.key obj = decode_redis_hash(self.connection.hgetall(key)) if len(obj) == 0: - raise NoSuchJobError('No such job: %s' % (key,)) + raise NoSuchJobError('No such job: {0}'.format(key)) def to_date(date_str): if date_str is None: @@ -526,7 +526,7 @@ class Job(object): arg_list += sorted(kwargs) args = ', '.join(arg_list) - return '%s(%s)' % (self.func_name, args) + return '{0}({1})'.format(self.func_name, args) def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually @@ -565,7 +565,7 @@ class Job(object): connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) def __str__(self): - return '' % (self.id, self.description) + return ''.format(self.id, self.description) # Job equality def __eq__(self, other): # noqa diff --git a/rq/queue.py b/rq/queue.py index bf5a186..16f5b0a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -49,7 +49,7 @@ class Queue(object): """ prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): - raise ValueError('Not a valid RQ queue key: %s' % (queue_key,)) + raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) name = queue_key[len(prefix):] return cls(name, connection=connection) @@ -58,7 +58,7 @@ class Queue(object): self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name - self._key = '%s%s' % (prefix, name) + self._key = '{0}{1}'.format(prefix, name) self._default_timeout = default_timeout self._async = async @@ -71,7 +71,7 @@ class Queue(object): return self.count def __iter__(self): - yield self + yield self @property def key(self): @@ -230,7 +230,7 @@ class Queue(object): """ if not isinstance(f, string_types) and f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed ' - 'by workers.') + 'by workers') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) @@ -243,7 +243,7 @@ class Queue(object): at_front = kwargs.pop('at_front', False) if 'args' in kwargs or 'kwargs' in kwargs: - assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa + assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) @@ -314,7 +314,7 @@ class Queue(object): connection = resolve_connection(connection) if timeout is not None: # blocking variant if timeout == 0: - raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.') + raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) @@ -385,22 +385,22 @@ class Queue(object): # auto-generated by the @total_ordering decorator) def __eq__(self, other): # noqa if not isinstance(other, Queue): - raise TypeError('Cannot compare queues to other objects.') + raise TypeError('Cannot compare queues to other objects') return self.name == other.name def __lt__(self, other): if not isinstance(other, Queue): - raise TypeError('Cannot compare queues to other objects.') + raise TypeError('Cannot compare queues to other objects') return self.name < other.name def __hash__(self): return hash(self.name) def __repr__(self): # noqa - return 'Queue(%r)' % (self.name,) + return 'Queue({0!r})'.format(self.name) def __str__(self): - return '' % (self.name,) + return ''.format(self.name) class FailedQueue(Queue): @@ -436,7 +436,7 @@ class FailedQueue(Queue): # Delete it from the failed queue (raise an error if that failed) if self.remove(job) == 0: - raise InvalidJobOperationError('Cannot requeue non-failed jobs.') + raise InvalidJobOperationError('Cannot requeue non-failed jobs') job.set_status(JobStatus.QUEUED) job.exc_info = None diff --git a/rq/registry.py b/rq/registry.py index ded86da..8d66c0d 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -69,7 +69,7 @@ class StartedJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) - self.key = 'rq:wip:%s' % name + self.key = 'rq:wip:{0}'.format(name) def cleanup(self, timestamp=None): """Remove expired jobs from registry and add them to FailedQueue. @@ -108,7 +108,7 @@ class FinishedJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(FinishedJobRegistry, self).__init__(name, connection) - self.key = 'rq:finished:%s' % name + self.key = 'rq:finished:{0}'.format(name) def cleanup(self, timestamp=None): """Remove expired jobs from registry. @@ -128,7 +128,7 @@ class DeferredJobRegistry(BaseRegistry): def __init__(self, name='default', connection=None): super(DeferredJobRegistry, self).__init__(name, connection) - self.key = 'rq:deferred:%s' % name + self.key = 'rq:deferred:{0}'.format(name) def cleanup(self): """This method is only here to prevent errors because this method is diff --git a/rq/timeouts.py b/rq/timeouts.py index a7b1242..a6afdf2 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -48,7 +48,7 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): def handle_death_penalty(self, signum, frame): raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % self._timeout) + 'value ({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises diff --git a/rq/worker.py b/rq/worker.py index e53aff4..27df2dd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -102,7 +102,7 @@ class Worker(object): """ prefix = cls.redis_worker_namespace_prefix if not worker_key.startswith(prefix): - raise ValueError('Not a valid RQ worker key: %s' % (worker_key,)) + raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key)) if connection is None: connection = get_current_connection() @@ -166,13 +166,12 @@ class Worker(object): raise NoQueueError('{0} is not a queue'.format(queue)) def process_queue_args(self, queue_args): - """ allow for a string, a queue an iterable of strings - or an iterable of queues""" + """Allow for a string, a queue an iterable of strings or an iterable of queues""" if isinstance(queue_args, text_type): - return self.queue_class(name = queue_args) + return self.queue_class(name=queue_args) else: - return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) - else queue_arg for queue_arg in queue_args] + return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) else queue_arg + for queue_arg in queue_args] def queue_names(self): """Returns the queue names of this worker's queues.""" @@ -193,7 +192,7 @@ class Worker(object): if self._name is None: hostname = socket.gethostname() shortname, _, _ = hostname.partition('.') - self._name = '%s.%s' % (shortname, self.pid) + self._name = '{0}.{1}'.format(shortname, self.pid) return self._name @property @@ -223,15 +222,15 @@ class Worker(object): This can be used to make `ps -ef` output more readable. """ - setprocname('rq: %s' % (message,)) + setprocname('rq: {0}'.format(message)) def register_birth(self): """Registers its own birth.""" - self.log.debug('Registering birth of worker %s' % (self.name,)) + self.log.debug('Registering birth of worker {0}'.format(self.name)) if self.connection.exists(self.key) and \ not self.connection.hexists(self.key, 'death'): - raise ValueError('There exists an active worker named \'%s\' ' - 'already.' % (self.name,)) + msg = 'There exists an active worker named {0!r} already' + raise ValueError(msg.format(self.name)) key = self.key queues = ','.join(self.queue_names()) with self.connection._pipeline() as p: @@ -326,18 +325,18 @@ class Worker(object): def request_force_stop(signum, frame): """Terminates the application (cold shutdown). """ - self.log.warning('Cold shut down.') + self.log.warning('Cold shut down') # Take down the horse with the worker if self.horse_pid: - msg = 'Taking down horse %d with me.' % self.horse_pid + msg = 'Taking down horse {0} with me'.format(self.horse_pid) self.log.debug(msg) try: os.kill(self.horse_pid, signal.SIGKILL) except OSError as e: # ESRCH ("No such process") is fine with us if e.errno != errno.ESRCH: - self.log.debug('Horse already down.') + self.log.debug('Horse already down') raise raise SystemExit() @@ -345,12 +344,12 @@ class Worker(object): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). """ - self.log.debug('Got signal %s.' % signal_name(signum)) + self.log.debug('Got signal {0}'.format(signal_name(signum))) signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) - msg = 'Warm shut down requested.' + msg = 'Warm shut down requested' self.log.warning(msg) # If shutdown is requested in the middle of a job, wait until @@ -374,12 +373,12 @@ class Worker(object): while not self.stopped and is_suspended(self.connection): if burst: - self.log.info('Suspended in burst mode -- exiting.' - 'Note: There could still be unperformed jobs on the queue') + self.log.info('Suspended in burst mode, exiting') + self.log.info('Note: There could still be unfinished jobs on the queue') raise StopRequested if not notified: - self.log.info('Worker suspended, use "rq resume" command to resume') + self.log.info('Worker suspended, run `rq resume` to resume') before_state = self.get_state() self.set_state(WorkerStatus.SUSPENDED) notified = True @@ -402,7 +401,7 @@ class Worker(object): did_perform_work = False self.register_birth() - self.log.info("RQ worker, '%s', started, version %s" % (self.key, VERSION)) + self.log.info("RQ worker {0!r} started, version %s".format(self.key, VERSION)) self.set_state(WorkerStatus.STARTED) try: @@ -414,7 +413,7 @@ class Worker(object): self.clean_registries() if self.stopped: - self.log.info('Stopping on request.') + self.log.info('Stopping on request') break timeout = None if burst else max(1, self.default_worker_ttl - 60) @@ -422,7 +421,7 @@ class Worker(object): result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: - self.log.info("RQ worker, '%s', done, quitting." % self.key) + self.log.info("RQ worker {0!r} done, quitting".format(self.key)) break except StopRequested: break @@ -446,10 +445,9 @@ class Worker(object): qnames = self.queue_names() self.set_state(WorkerStatus.IDLE) - self.procline('Listening on %s' % ','.join(qnames)) + self.procline('Listening on {0}'.format(','.join(qnames))) self.log.info('') - self.log.info('*** Listening on %s...' % - green(', '.join(qnames))) + self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames)))) while True: self.heartbeat() @@ -459,8 +457,8 @@ class Worker(object): connection=self.connection) if result is not None: job, queue = result - self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) + self.log.info('{0}: {1} ({2})'.format(green(queue.name), + blue(job.description), job.id)) break except DequeueTimeout: @@ -497,7 +495,7 @@ class Worker(object): self.main_work_horse(job) else: self._horse_pid = child_pid - self.procline('Forked %d at %d' % (child_pid, time.time())) + self.procline('Forked {0} at {0}'.format(child_pid, time.time())) while True: try: self.set_state('busy') @@ -552,9 +550,8 @@ class Worker(object): job.set_status(JobStatus.STARTED, pipeline=pipeline) pipeline.execute() - self.procline('Processing %s from %s since %s' % ( - job.func_name, - job.origin, time.time())) + msg = 'Processing {0} from {1} since {2}' + self.procline(msg.format(job.func_name, job.origin, time.time())) def perform_job(self, job): """Performs the actual work of a job. Will/should only be called @@ -599,14 +596,14 @@ class Worker(object): if rv is None: self.log.info('Job OK') else: - self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),)) + self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv)))) if result_ttl == 0: - self.log.info('Result discarded immediately.') + self.log.info('Result discarded immediately') elif result_ttl > 0: - self.log.info('Result is kept for %d seconds.' % result_ttl) + self.log.info('Result is kept for {0} seconds'.format(result_ttl)) else: - self.log.warning('Result will never expire, clean up result key manually.') + self.log.warning('Result will never expire, clean up result key manually') return True @@ -622,7 +619,7 @@ class Worker(object): }) for handler in reversed(self._exc_handlers): - self.log.debug('Invoking exception handler %s' % (handler,)) + self.log.debug('Invoking exception handler {0}'.format(handler)) fallthrough = handler(job, *exc_info) # Only handlers with explicit return values should disable further @@ -636,7 +633,7 @@ class Worker(object): def move_to_failed_queue(self, job, *exc_info): """Default exception handler: move the job to the failed queue.""" exc_string = ''.join(traceback.format_exception(*exc_info)) - self.log.warning('Moving job to %s queue.' % self.failed_queue.name) + self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) def push_exc_handler(self, handler_func): From 4d0ae5da981af5ccc40f655ff287077a847595e5 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 09:52:26 +0200 Subject: [PATCH 47/53] Clean up type checking of Worker constructor --- rq/exceptions.py | 4 --- rq/worker.py | 4 +-- tests/test_worker.py | 60 ++++++++++++++++++-------------------------- 3 files changed, 27 insertions(+), 41 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 94b22bf..88bbbb0 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -11,10 +11,6 @@ class InvalidJobOperationError(Exception): pass -class NoQueueError(Exception): - pass - - class UnpickleError(Exception): def __init__(self, message, raw_data, inner_exception=None): super(UnpickleError, self).__init__(message, inner_exception) diff --git a/rq/worker.py b/rq/worker.py index 27df2dd..ee048c3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -17,7 +17,7 @@ import warnings from rq.compat import as_text, string_types, text_type from .connections import get_current_connection -from .exceptions import DequeueTimeout, NoQueueError +from .exceptions import DequeueTimeout from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue @@ -163,7 +163,7 @@ class Worker(object): """Sanity check for the given queues.""" for queue in self.queues: if not isinstance(queue, self.queue_class): - raise NoQueueError('{0} is not a queue'.format(queue)) + raise TypeError('{0} is not a Queue or a string'.format(queue)) def process_queue_args(self, queue_args): """Allow for a string, a queue an iterable of strings or an iterable of queues""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 62d2876..016f83c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -13,7 +13,6 @@ from tests.helpers import strip_microseconds from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq.compat import as_text -from rq.exceptions import NoQueueError from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend @@ -26,44 +25,35 @@ class CustomJob(Job): class TestWorker(RQTestCase): def test_create_worker(self): - """Worker creation.""" - fooq, barq = Queue('foo'), Queue('bar') - w = Worker([fooq, barq]) - self.assertEquals(w.queues, [fooq, barq]) - - def test_create_worker_args_single_queue(self): - """Test Worker creation with single queue instance arg""" - fooq = Queue('foo') - w = Worker(fooq) - self.assertEquals(w.queue_keys(), ['rq:queue:foo']) + """Worker creation using various inputs.""" - def test_create_worker_args_single_string(self): - """ Test Worker creation with single string arg""" + # With single string argument w = Worker('foo') - self.assertEquals(w.queue_keys(),['rq:queue:foo']) + self.assertEquals(w.queues[0].name, 'foo') - def test_create_worker_args_iterable_strings(self): - """ Test Worker creation with iterable of strings""" + # With list of strings w = Worker(['foo', 'bar']) - self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) - - def test_create_worker_args_iterable_queues(self): - """ Test Worker test worker creation - with an iterable of queue instance args""" - w = Worker(map(Queue, ['foo', 'bar'])) - self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) - - def test_create_worker_args_list_map(self): - """ Test Worker test worker creation - with a list of queue from map""" - w = Worker(list(map(Queue, ['foo', 'bar']))) - self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar']) - - def test_create_worker_raises_noqueue_error(self): - """ make sure raises noqueue error if a - a non string or queue is passed""" - with self.assertRaises(NoQueueError): - w = Worker([1]) + self.assertEquals(w.queues[0].name, 'foo') + self.assertEquals(w.queues[1].name, 'bar') + + # With iterable of strings + w = Worker(iter(['foo', 'bar'])) + self.assertEquals(w.queues[0].name, 'foo') + self.assertEquals(w.queues[1].name, 'bar') + + # With single Queue + w = Worker(Queue('foo')) + self.assertEquals(w.queues[0].name, 'foo') + + # With iterable of Queues + w = Worker(iter([Queue('foo'), Queue('bar')])) + self.assertEquals(w.queues[0].name, 'foo') + self.assertEquals(w.queues[1].name, 'bar') + + # With list of Queues + w = Worker([Queue('foo'), Queue('bar')]) + self.assertEquals(w.queues[0].name, 'foo') + self.assertEquals(w.queues[1].name, 'bar') def test_work_and_quit(self): """Worker processes work, then quits.""" From 891be55b874cba8b827874095d66561e126e97e3 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 10:05:28 +0200 Subject: [PATCH 48/53] Clean up Worker.__init__ logic a little --- rq/utils.py | 16 +++++++++++++++- rq/worker.py | 23 +++++++++-------------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/rq/utils.py b/rq/utils.py index 3e44a98..f2e1897 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -13,8 +13,9 @@ import datetime import importlib import logging import sys +from collections import Iterable -from .compat import as_text, is_python_version +from .compat import as_text, is_python_version, string_types class _Colorizer(object): @@ -205,6 +206,19 @@ def first(iterable, default=None, key=None): return default +def is_nonstring_iterable(obj): + """Returns whether the obj is an iterable, but not a string""" + return isinstance(obj, Iterable) and not isinstance(obj, string_types) + + +def ensure_list(obj): + """ + When passed an iterable of objects, does nothing, otherwise, it returns + a list with just that object in it. + """ + return obj if is_nonstring_iterable(obj) else [obj] + + def current_timestamp(): """Returns current UTC timestamp""" return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) diff --git a/rq/worker.py b/rq/worker.py index ee048c3..dc9ce00 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -2,7 +2,6 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from datetime import timedelta import errno import logging import os @@ -13,6 +12,7 @@ import sys import time import traceback import warnings +from datetime import timedelta from rq.compat import as_text, string_types, text_type @@ -20,11 +20,12 @@ from .connections import get_current_connection from .exceptions import DequeueTimeout from .job import Job, JobStatus from .logutils import setup_loghandlers -from .queue import get_failed_queue, Queue -from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry +from .queue import Queue, get_failed_queue +from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty -from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse +from .utils import (ensure_list, enum, import_attribute, make_colorizer, + utcformat, utcnow, utcparse) from .version import VERSION try: @@ -126,7 +127,9 @@ class Worker(object): if connection is None: connection = get_current_connection() self.connection = connection - queues = self.process_queue_args(queues) + + queues = [self.queue_class(name=q) if isinstance(q, text_type) else q + for q in ensure_list(queues)] self._name = name self.queues = queues self.validate_queues() @@ -163,15 +166,7 @@ class Worker(object): """Sanity check for the given queues.""" for queue in self.queues: if not isinstance(queue, self.queue_class): - raise TypeError('{0} is not a Queue or a string'.format(queue)) - - def process_queue_args(self, queue_args): - """Allow for a string, a queue an iterable of strings or an iterable of queues""" - if isinstance(queue_args, text_type): - return self.queue_class(name=queue_args) - else: - return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) else queue_arg - for queue_arg in queue_args] + raise TypeError('{0} is not of type {1} or text type'.format(queue, self.queue_class)) def queue_names(self): """Returns the queue names of this worker's queues.""" From ec0e04727bb79ace3894ab084fa7293206fe98d7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 10:13:38 +0200 Subject: [PATCH 49/53] Rename `maintenance_date` -> `last_cleaned_at` --- rq/worker.py | 8 ++++---- tests/test_worker.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index dc9ce00..3221da2 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -149,7 +149,7 @@ class Worker(object): self._stopped = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) - self.maintenance_date = None + self.last_cleaned_at = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -653,14 +653,14 @@ class Worker(object): """Runs maintenance jobs on each Queue's registries.""" for queue in self.queues: clean_registries(queue) - self.maintenance_date = utcnow() + self.last_cleaned_at = utcnow() @property def should_run_maintenance_tasks(self): """Maintenance tasks should run on first startup or every hour.""" - if self.maintenance_date is None: + if self.last_cleaned_at is None: return True - if (utcnow() - self.maintenance_date) > timedelta(seconds=3600): + if (utcnow() - self.last_cleaned_at) > timedelta(hours=1): return True return False diff --git a/tests/test_worker.py b/tests/test_worker.py index 016f83c..02dacb7 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -430,7 +430,7 @@ class TestWorker(RQTestCase): self.assertEquals(type(death_date).__name__, 'datetime') def test_clean_queue_registries(self): - """worker.clean_registries sets maintenance_date and cleans registries.""" + """worker.clean_registries sets last_cleaned_at and cleans registries.""" foo_queue = Queue('foo', connection=self.testconn) foo_registry = StartedJobRegistry('foo', connection=self.testconn) self.testconn.zadd(foo_registry.key, 1, 'foo') @@ -442,9 +442,9 @@ class TestWorker(RQTestCase): self.assertEqual(self.testconn.zcard(bar_registry.key), 1) worker = Worker([foo_queue, bar_queue]) - self.assertEqual(worker.maintenance_date, None) + self.assertEqual(worker.last_cleaned_at, None) worker.clean_registries() - self.assertNotEqual(worker.maintenance_date, None) + self.assertNotEqual(worker.last_cleaned_at, None) self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0) @@ -454,9 +454,9 @@ class TestWorker(RQTestCase): worker = Worker(queue) self.assertTrue(worker.should_run_maintenance_tasks) - worker.maintenance_date = utcnow() + worker.last_cleaned_at = utcnow() self.assertFalse(worker.should_run_maintenance_tasks) - worker.maintenance_date = utcnow() - timedelta(seconds=3700) + worker.last_cleaned_at = utcnow() - timedelta(seconds=3700) self.assertTrue(worker.should_run_maintenance_tasks) def test_worker_calls_clean_registries(self): From 6319128ebc83c94f57b854bef516759f6955599a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 10:43:00 +0200 Subject: [PATCH 50/53] Clean dist+build folders before releasing --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3149d8d..d43c5c9 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ all: clean: rm -rf build/ dist/ -release: +release: clean # Check if latest tag is the current head we're releasing echo "Latest tag = $$(git tag | sort -nr | head -n1)" echo "HEAD SHA = $$(git sha head)" From 3b67894489ad1e46792632e026cd8c1b802abde7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 10:49:28 +0200 Subject: [PATCH 51/53] Rename variable in test --- tests/test_job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index a078ce6..6971d7b 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -35,14 +35,14 @@ class TestJob(RQTestCase): if not PY2: # Python 3 - test_string = "myfunc(12, '☃', null=None, snowman='☃')" + expected_string = "myfunc(12, '☃', null=None, snowman='☃')" else: # Python 2 - test_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') + expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') self.assertEquals( job.description, - test_string, + expected_string, ) def test_create_empty_job(self): From 72fdbaf50970a6c48ad833179b4d168d3a54a2f8 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 10:55:16 +0200 Subject: [PATCH 52/53] Update changelog --- CHANGES.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 00f248a..231059e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,13 +1,14 @@ ### 0.5.3 -(June 1st, 2015) +(June 3rd, 2015) -- enqueue_call() no longer ignores `ttl`. Thanks @mbodock! -- Better support for unicode kwargs. Thanks @nealtodd and @brownstein! - Better API for instantiating Workers. Thanks @RyanMTB! -- Jobs that are moved from `StartedJobRegistry` to `FailedQueue` now - have their statuses assigned properly. -- Workers now automatically cleans up job registries every hour. +- Better support for unicode kwargs. Thanks @nealtodd and @brownstein! +- Workers now automatically cleans up job registries every hour +- Jobs in `FailedQueue` now have their statuses set properly +- `enqueue_call()` no longer ignores `ttl`. Thanks @mbodock! +- Improved logging. Thanks @trevorprater! + ### 0.5.2 From 0de225ec62fffc1f008fdee10e32afe45f7067a9 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 3 Jun 2015 11:33:28 +0200 Subject: [PATCH 53/53] Emphasize private nature of `stopped` property --- rq/worker.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3221da2..e4ab4da 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -146,7 +146,7 @@ class Worker(object): self._state = 'starting' self._is_horse = False self._horse_pid = 0 - self._stopped = False + self._stop_requested = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) self.last_cleaned_at = None @@ -308,10 +308,6 @@ class Worker(object): return self.job_class.fetch(job_id, self.connection) - @property - def stopped(self): - return self._stopped - def _install_signal_handlers(self): """Installs signal handlers for handling SIGINT and SIGTERM gracefully. @@ -350,7 +346,7 @@ class Worker(object): # If shutdown is requested in the middle of a job, wait until # finish before shutting down if self.get_state() == 'busy': - self._stopped = True + self._stop_requested = True self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') else: @@ -365,7 +361,7 @@ class Worker(object): before_state = None notified = False - while not self.stopped and is_suspended(self.connection): + while not self._stop_requested and is_suspended(self.connection): if burst: self.log.info('Suspended in burst mode, exiting') @@ -407,7 +403,7 @@ class Worker(object): if self.should_run_maintenance_tasks: self.clean_registries() - if self.stopped: + if self._stop_requested: self.log.info('Stopping on request') break