From 1bc0c3d223714960b436577616cf127134ac2161 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 1 Aug 2012 13:51:01 +0200 Subject: [PATCH 01/10] Fix bug where pickling the return value caused an uncaught exception. --- rq/worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 8f3d665..9aba75e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -359,6 +359,10 @@ class Worker(object): try: with death_penalty_after(job.timeout or 180): rv = job.perform() + + # Pickle the result in the same try-except block since we need to + # use the same exc handling when pickling fails + pickled_rv = dumps(rv) except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) @@ -374,7 +378,7 @@ class Worker(object): if rv is not None: p = self.connection.pipeline() - p.hset(job.key, 'result', dumps(rv)) + p.hset(job.key, 'result', pickled_rv) p.expire(job.key, self.rv_ttl) p.execute() else: From 740d32ebabb28be47fce16742eb6d595b75427be Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 1 Aug 2012 13:54:05 +0200 Subject: [PATCH 02/10] Update the changelog. --- CHANGES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 2c6296d..88f358e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +### 0.2.2 +(August 1st, 2012) + +- Fix bug where return values that couldn't be pickled crashed the worker + + ### 0.2.1 (July 20th, 2012) From 080431702d533b3468d50af0f32c1b4fa1d41c96 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 1 Aug 2012 13:54:21 +0200 Subject: [PATCH 03/10] Bump the version. --- rq/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/version.py b/rq/version.py index 887788d..ace1dd7 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.2.1' +VERSION = '0.2.2' From 4d9881eef224ee9fdd058123235e3e048f7c3fe2 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 1 Aug 2012 14:03:45 +0200 Subject: [PATCH 04/10] Print version number when running the server. --- rq/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 9fb3401..82f4d83 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,6 +21,7 @@ from .connections import get_current_connection from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after +from .version import VERSION green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -268,6 +269,7 @@ class Worker(object): did_perform_work = False self.register_birth() + self.log.info('RQ worker started, version %s' % VERSION) self.state = 'starting' try: while True: From 530930bc7b7bb4b9aa028b682d32aec1157f3119 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Thu, 2 Aug 2012 16:52:12 +0400 Subject: [PATCH 05/10] Fixed code snippets' styling. --- CHANGES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index f4c4f4b..7e1af3d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,11 +5,14 @@ RQ a timeout value while enqueueing a function, use the explicit invocation instead: + ```python q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30) + ``` - Add a `@job` decorator, which can be used to do Celery-style delayed invocations: + ```python from redis import Redis from rq.decorators import job @@ -19,12 +22,15 @@ @job('high', timeout=10, connection=redis) def some_work(x, y): return x + y + ``` Then, in another module, you can call `some_work`: + ```python from foo.bar import some_work some_work.delay(2, 3) + ``` ### 0.2.2 From 4d2157cdb5431d302fcfb7a8e29377c58f284345 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 3 Aug 2012 13:20:19 +0200 Subject: [PATCH 06/10] Add the same treatment to the README file. --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index b2075c2..5d4818b 100644 --- a/README.md +++ b/README.md @@ -15,24 +15,30 @@ First, run a Redis server, of course: To put jobs on queues, you don't have to do anything special, just define your typically lengthy or blocking function: + ```python import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split()) + ``` You do use the excellent [requests][r] package, don't you? Then, create a RQ queue: + ```python from rq import Queue, use_connection use_connection() q = Queue() + ``` And enqueue the function call: + ```python from my_module import count_words_at_url result = q.enqueue(count_words_at_url, 'http://nvie.com') + ``` For a more complete example, refer to the [docs][d]. But this is the essence. From f3890c85a06d633ad3aca6ba5058aaf1543694f7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 3 Aug 2012 15:04:18 +0200 Subject: [PATCH 07/10] Reformat Markdown for Python highlighting. --- README.md | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 5d4818b..a8c72b2 100644 --- a/README.md +++ b/README.md @@ -10,35 +10,37 @@ easily. First, run a Redis server, of course: - $ redis-server +```console +$ redis-server +``` To put jobs on queues, you don't have to do anything special, just define your typically lengthy or blocking function: - ```python - import requests +```python +import requests - def count_words_at_url(url): - resp = requests.get(url) - return len(resp.text.split()) - ``` +def count_words_at_url(url): + resp = requests.get(url) + return len(resp.text.split()) +``` You do use the excellent [requests][r] package, don't you? Then, create a RQ queue: - ```python - from rq import Queue, use_connection - use_connection() - q = Queue() - ``` +```python +from rq import Queue, use_connection +use_connection() +q = Queue() +``` And enqueue the function call: - ```python - from my_module import count_words_at_url - result = q.enqueue(count_words_at_url, 'http://nvie.com') - ``` +```python +from my_module import count_words_at_url +result = q.enqueue(count_words_at_url, 'http://nvie.com') +``` For a more complete example, refer to the [docs][d]. But this is the essence. @@ -48,11 +50,13 @@ For a more complete example, refer to the [docs][d]. But this is the essence. To start executing enqueued function calls in the background, start a worker from your project's directory: - $ rqworker - *** Listening for work on default - Got count_words_at_url('http://nvie.com') from default - Job result = 818 - *** Listening for work on default +```console +$ rqworker +*** Listening for work on default +Got count_words_at_url('http://nvie.com') from default +Job result = 818 +*** Listening for work on default +``` That's about it. From 6b075f6cb7ed9b976701918be8eb1c2e04c0b9fd Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sat, 4 Aug 2012 09:18:46 +0200 Subject: [PATCH 08/10] Add comment to the README. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index a8c72b2..4dbd6a3 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ your typically lengthy or blocking function: import requests def count_words_at_url(url): + """Just an example function that's called async.""" resp = requests.get(url) return len(resp.text.split()) ``` From a6bb526773543d0304109fd13b31ba09333e5451 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 5 Aug 2012 09:13:49 +0200 Subject: [PATCH 09/10] Fix that doens't abort the currently running job on Ctrl+C. --- rq/worker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index b00d732..9c666e5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -342,6 +342,14 @@ class Worker(object): # After fork()'ing, always assure we are generating random sequences # that are different from the worker. random.seed() + + # Always ignore Ctrl+C in the work horse, as it might abort the + # currently running job. + # The main worker catches the Ctrl+C and requests graceful shutdown + # after the current work is done. When cold shutdown is requested, it + # kills the current job anyway. + signal.signal(signal.SIGINT, signal.SIG_IGN) + self._is_horse = True self.log = Logger('horse') From 1536546613128bf299ad28a66fc272143d8fc370 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 5 Aug 2012 09:21:23 +0200 Subject: [PATCH 10/10] Worker/horse distinction in signal handler is obsolete. --- rq/worker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 9c666e5..56d5aad 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -243,10 +243,6 @@ class Worker(object): signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) - if self.is_horse: - self.log.debug('Ignoring signal %s.' % signal_name(signum)) - return - msg = 'Warm shut down. Press Ctrl+C again for a cold shutdown.' self.log.warning(msg) self._stopped = True @@ -349,6 +345,7 @@ class Worker(object): # after the current work is done. When cold shutdown is requested, it # kills the current job anyway. signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_DFL) self._is_horse = True self.log = Logger('horse')