Merge branch 'master' into selwin-warm-shutdown-2

Conflicts:
	rq/worker.py
main
Vincent Driessen 13 years ago
commit f2d5ebf2fe

@ -5,11 +5,14 @@
RQ a timeout value while enqueueing a function, use the explicit invocation RQ a timeout value while enqueueing a function, use the explicit invocation
instead: instead:
```python
q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30) q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30)
```
- Add a `@job` decorator, which can be used to do Celery-style delayed - Add a `@job` decorator, which can be used to do Celery-style delayed
invocations: invocations:
```python
from redis import Redis from redis import Redis
from rq.decorators import job from rq.decorators import job
@ -19,12 +22,21 @@
@job('high', timeout=10, connection=redis) @job('high', timeout=10, connection=redis)
def some_work(x, y): def some_work(x, y):
return x + y return x + y
```
Then, in another module, you can call `some_work`: Then, in another module, you can call `some_work`:
```python
from foo.bar import some_work from foo.bar import some_work
some_work.delay(2, 3) some_work.delay(2, 3)
```
### 0.2.2
(August 1st, 2012)
- Fix bug where return values that couldn't be pickled crashed the worker
### 0.2.1 ### 0.2.1

@ -10,29 +10,38 @@ easily.
First, run a Redis server, of course: First, run a Redis server, of course:
```console
$ redis-server $ redis-server
```
To put jobs on queues, you don't have to do anything special, just define To put jobs on queues, you don't have to do anything special, just define
your typically lengthy or blocking function: your typically lengthy or blocking function:
```python
import requests import requests
def count_words_at_url(url): def count_words_at_url(url):
"""Just an example function that's called async."""
resp = requests.get(url) resp = requests.get(url)
return len(resp.text.split()) return len(resp.text.split())
```
You do use the excellent [requests][r] package, don't you? You do use the excellent [requests][r] package, don't you?
Then, create a RQ queue: Then, create a RQ queue:
```python
from rq import Queue, use_connection from rq import Queue, use_connection
use_connection() use_connection()
q = Queue() q = Queue()
```
And enqueue the function call: And enqueue the function call:
```python
from my_module import count_words_at_url from my_module import count_words_at_url
result = q.enqueue(count_words_at_url, 'http://nvie.com') result = q.enqueue(count_words_at_url, 'http://nvie.com')
```
For a more complete example, refer to the [docs][d]. But this is the essence. For a more complete example, refer to the [docs][d]. But this is the essence.
@ -42,11 +51,13 @@ For a more complete example, refer to the [docs][d]. But this is the essence.
To start executing enqueued function calls in the background, start a worker To start executing enqueued function calls in the background, start a worker
from your project's directory: from your project's directory:
```console
$ rqworker $ rqworker
*** Listening for work on default *** Listening for work on default
Got count_words_at_url('http://nvie.com') from default Got count_words_at_url('http://nvie.com') from default
Job result = 818 Job result = 818
*** Listening for work on default *** Listening for work on default
```
That's about it. That's about it.

@ -1 +1 @@
VERSION = '0.2.1' VERSION = '0.2.2'

@ -21,6 +21,7 @@ from .connections import get_current_connection
from .utils import make_colorizer from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_penalty_after from .timeouts import death_penalty_after
from .version import VERSION
green = make_colorizer('darkgreen') green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
@ -246,10 +247,6 @@ class Worker(object):
signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGINT, request_force_stop)
signal.signal(signal.SIGTERM, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop)
if self.is_horse:
self.log.debug('Ignoring signal %s.' % signal_name(signum))
return
msg = 'Warm shut down requested.' msg = 'Warm shut down requested.'
self.log.warning(msg) self.log.warning(msg)
# If shutdown is requested in the middle of a job, wait until finish # If shutdown is requested in the middle of a job, wait until finish
@ -278,6 +275,7 @@ class Worker(object):
did_perform_work = False did_perform_work = False
self.register_birth() self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION)
self.state = 'starting' self.state = 'starting'
try: try:
while True: while True:
@ -352,6 +350,15 @@ class Worker(object):
# After fork()'ing, always assure we are generating random sequences # After fork()'ing, always assure we are generating random sequences
# that are different from the worker. # that are different from the worker.
random.seed() random.seed()
# Always ignore Ctrl+C in the work horse, as it might abort the
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self._is_horse = True self._is_horse = True
self.log = Logger('horse') self.log = Logger('horse')
@ -372,6 +379,10 @@ class Worker(object):
try: try:
with death_penalty_after(job.timeout or 180): with death_penalty_after(job.timeout or 180):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
pickled_rv = dumps(rv)
except Exception as e: except Exception as e:
fq = self.failed_queue fq = self.failed_queue
self.log.exception(red(str(e))) self.log.exception(red(str(e)))
@ -387,7 +398,7 @@ class Worker(object):
if rv is not None: if rv is not None:
p = self.connection.pipeline() p = self.connection.pipeline()
p.hset(job.key, 'result', dumps(rv)) p.hset(job.key, 'result', pickled_rv)
p.expire(job.key, self.rv_ttl) p.expire(job.key, self.rv_ttl)
p.execute() p.execute()
else: else:

Loading…
Cancel
Save