diff --git a/CHANGES.md b/CHANGES.md index 75f76be..7e1af3d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,11 +5,14 @@ RQ a timeout value while enqueueing a function, use the explicit invocation instead: + ```python q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30) + ``` - Add a `@job` decorator, which can be used to do Celery-style delayed invocations: + ```python from redis import Redis from rq.decorators import job @@ -19,12 +22,21 @@ @job('high', timeout=10, connection=redis) def some_work(x, y): return x + y + ``` Then, in another module, you can call `some_work`: + ```python from foo.bar import some_work some_work.delay(2, 3) + ``` + + +### 0.2.2 +(August 1st, 2012) + +- Fix bug where return values that couldn't be pickled crashed the worker ### 0.2.1 diff --git a/README.md b/README.md index b2075c2..4dbd6a3 100644 --- a/README.md +++ b/README.md @@ -10,29 +10,38 @@ easily. First, run a Redis server, of course: - $ redis-server +```console +$ redis-server +``` To put jobs on queues, you don't have to do anything special, just define your typically lengthy or blocking function: - import requests +```python +import requests - def count_words_at_url(url): - resp = requests.get(url) - return len(resp.text.split()) +def count_words_at_url(url): + """Just an example function that's called async.""" + resp = requests.get(url) + return len(resp.text.split()) +``` You do use the excellent [requests][r] package, don't you? Then, create a RQ queue: - from rq import Queue, use_connection - use_connection() - q = Queue() +```python +from rq import Queue, use_connection +use_connection() +q = Queue() +``` And enqueue the function call: - from my_module import count_words_at_url - result = q.enqueue(count_words_at_url, 'http://nvie.com') +```python +from my_module import count_words_at_url +result = q.enqueue(count_words_at_url, 'http://nvie.com') +``` For a more complete example, refer to the [docs][d]. But this is the essence. @@ -42,11 +51,13 @@ For a more complete example, refer to the [docs][d]. But this is the essence. To start executing enqueued function calls in the background, start a worker from your project's directory: - $ rqworker - *** Listening for work on default - Got count_words_at_url('http://nvie.com') from default - Job result = 818 - *** Listening for work on default +```console +$ rqworker +*** Listening for work on default +Got count_words_at_url('http://nvie.com') from default +Job result = 818 +*** Listening for work on default +``` That's about it. diff --git a/rq/version.py b/rq/version.py index 887788d..ace1dd7 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.2.1' +VERSION = '0.2.2' diff --git a/rq/worker.py b/rq/worker.py index cb0c466..1e7ea0d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -21,6 +21,7 @@ from .connections import get_current_connection from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after +from .version import VERSION green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -246,10 +247,6 @@ class Worker(object): signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) - if self.is_horse: - self.log.debug('Ignoring signal %s.' % signal_name(signum)) - return - msg = 'Warm shut down requested.' self.log.warning(msg) # If shutdown is requested in the middle of a job, wait until finish @@ -278,6 +275,7 @@ class Worker(object): did_perform_work = False self.register_birth() + self.log.info('RQ worker started, version %s' % VERSION) self.state = 'starting' try: while True: @@ -352,6 +350,15 @@ class Worker(object): # After fork()'ing, always assure we are generating random sequences # that are different from the worker. random.seed() + + # Always ignore Ctrl+C in the work horse, as it might abort the + # currently running job. + # The main worker catches the Ctrl+C and requests graceful shutdown + # after the current work is done. When cold shutdown is requested, it + # kills the current job anyway. + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + self._is_horse = True self.log = Logger('horse') @@ -372,6 +379,10 @@ class Worker(object): try: with death_penalty_after(job.timeout or 180): rv = job.perform() + + # Pickle the result in the same try-except block since we need to + # use the same exc handling when pickling fails + pickled_rv = dumps(rv) except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) @@ -387,7 +398,7 @@ class Worker(object): if rv is not None: p = self.connection.pipeline() - p.hset(job.key, 'result', dumps(rv)) + p.hset(job.key, 'result', pickled_rv) p.expire(job.key, self.rv_ttl) p.execute() else: