* Added FailedJobRegistry.

* Added job.failure_ttl.

* queue.enqueue() now supports failure_ttl

* Added registry.get_queue().

* FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL.

* StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry.

* Failed jobs are now added to FailedJobRegistry.

* Added FailedJobRegistry.requeue()

* Document the new `FailedJobRegistry` and changes in custom exception handler behavior.

* Added worker.disable_default_exception_handler.

* Document --disable-default-exception-handler option.

* Deleted worker.failed_queue.

* Deleted "move_to_failed_queue" exception handler.

* StartedJobRegistry should no longer move jobs to FailedQueue.

* Deleted requeue_job

* Fixed test error.

* Make requeue cli command work with FailedJobRegistry

* Added .pytest_cache to gitignore.

* Custom exception handlers are no longer run in reverse

* Restored requeue_job function

* Removed get_failed_queue

* Deleted FailedQueue

* Updated changelog.

* Document `failure_ttl`

* Updated docs.

* Remove job.status

* Fixed typo in test_registry.py

* Replaced _pipeline() with pipeline()

* FailedJobRegistry no longer fails on redis-py>=3

* Fixes test_clean_registries

* Worker names are now randomized

* Added a note about random worker names in CHANGES.md

* Worker will now stop working when encountering an unhandled exception.

* Worker should reraise SystemExit on cold shutdowns

* Added anchor.js to docs

* Support for Sentry-SDK (#1045)

* Updated RQ to support sentry-sdk

* Document Sentry integration

* Install sentry-sdk before running tests

* Improved rq info CLI command to be more efficient when displaying lar… (#1046)

* Improved rq info CLI command to be more efficient when displaying large number of workers

* Fixed an rq info --by-queue bug

* Fixed worker.total_working_time bug (#1047)

* queue.enqueue() no longer accepts `timeout` argument (#1055)

* Clean worker registry (#1056)

* queue.enqueue() no longer accepts `timeout` argument

* Added clean_worker_registry()

* Show worker hostname and PID on cli (#1058)

* Show worker hostname and PID on cli

* Improve test coverage

* Remove Redis version check when SSL is used

* Bump version to 1.0

* Removed pytest_cache/README.md

* Changed worker logging to use exc_info=True

* Removed unused queue.dequeue()

* Fixed typo in CHANGES.md

* setup_loghandlers() should always call logger.setLevel() if specified
main
Selwin Ong 6 years ago committed by GitHub
parent abf6881114
commit c4cbb3af2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

5
.gitignore vendored

@ -10,16 +10,13 @@
/dist /dist
/build /build
.tox .tox
.pytest_cache/
.vagrant .vagrant
Vagrantfile Vagrantfile
.idea/ .idea/
.coverage* .coverage*
/.cache /.cache
<<<<<<< HEAD
.pytest_cache/
=======
Gemfile Gemfile
Gemfile.lock Gemfile.lock
_site/ _site/
>>>>>>> 600be0a... Added anchor.js to docs

@ -11,9 +11,7 @@ python:
- "pypy" - "pypy"
install: install:
- pip install -e . - pip install -e .
- pip install pytest-cov - pip install pytest-cov sentry-sdk codecov
# - pip install coveralls
- pip install codecov
#- pip install pytest # installed by Travis by default already #- pip install pytest # installed by Travis by default already
script: script:
- RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5

@ -1,3 +1,31 @@
### 1.0 (Not Yet Released)
Backward incompatible changes:
- `job.status` has been removed. Use `job.get_status()` and `job.set_status()` instead. Thanks @selwin!
- `FailedQueue` has been replaced with `FailedJobRegistry`:
* `get_failed_queue()` function has been removed. Please use `FailedJobRegistry(queue=queue)` instead.
* `move_to_failed_queue()` has been removed.
* RQ now provides a mechanism to automatically cleanup failed jobs. By default, failed jobs are kept for 1 year.
* Thanks @selwin!
- RQ's custom job exception handling mechanism has also changed slightly:
* RQ's default exception handling mechanism (moving jobs to `FailedJobRegistry`) can be disabled by doing `Worker(disable_default_exception_handler=True)`.
* Custom exception handlers are no longer executed in reverse order.
* Thanks @selwin!
- `Worker` names are now randomized. Thanks @selwin!
- `timeout` argument on `queue.enqueue()` has been deprecated in favor of `job_timeout`. Thanks @selwin!
- Sentry integration has been reworked:
* RQ now uses the new [sentry-sdk](https://pypi.org/project/sentry-sdk/) in place of the deprecated [Raven](https://pypi.org/project/raven/) library
* RQ will look for the more explicit `RQ_SENTRY_DSN` environment variable instead of `SENTRY_DSN` before instantiating Sentry integration
* Thanks @selwin!
- Fixed `Worker.total_working_time` accounting bug. Thanks @selwin!
### 0.13.0 (2018-12-11) ### 0.13.0 (2018-12-11)
- Compatibility with Redis 3.0. Thanks @dash-rai! - Compatibility with Redis 3.0. Thanks @dash-rai!
- Added `job_timeout` argument to `queue.enqueue()`. This argument will eventually replace `timeout` argument. Thanks @selwin! - Added `job_timeout` argument to `queue.enqueue()`. This argument will eventually replace `timeout` argument. Thanks @selwin!

@ -6,27 +6,25 @@ layout: docs
Jobs can fail due to exceptions occurring. When your RQ workers run in the Jobs can fail due to exceptions occurring. When your RQ workers run in the
background, how do you get notified of these exceptions? background, how do you get notified of these exceptions?
## Default: the `failed` queue ## Default: the `FailedJobRegistry`
The default safety net for RQ is the `failed` queue. Every job that fails The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't
execution is stored in here, along with its exception information (type, execute successfully is stored here, along with its exception information (type,
value, traceback). While this makes sure no failing jobs "get lost", this is value, traceback). While this makes sure no failing jobs "get lost", this is
of no use to get notified pro-actively about job failure. of no use to get notified pro-actively about job failure.
## Custom exception handlers ## Custom Exception Handlers
Starting from version 0.3.1, RQ supports registering custom exception RQ supports registering custom exception handlers. This makes it possible to
handlers. This makes it possible to replace the default behaviour (sending inject your own error handling logic to your workers.
the job to the `failed` queue) altogether, or to take additional steps when an
exception occurs.
This is how you register custom exception handler(s) to an RQ worker: This is how you register custom exception handler(s) to an RQ worker:
```python ```python
from rq.handlers import move_to_failed_queue # RQ's default exception handler from exception_handlers import foo_handler, bar_handler
w = Worker([q], exception_handlers=[my_handler, move_to_failed_queue]) w = Worker([q], exception_handlers=[foo_handler, bar_handler])
``` ```
The handler itself is a function that takes the following parameters: `job`, The handler itself is a function that takes the following parameters: `job`,
@ -46,11 +44,19 @@ def my_handler(job, *exc_info):
# do custom things here # do custom things here
``` ```
## Chaining exception handlers {% highlight python %}
from exception_handlers import foo_handler
w = Worker([q], exception_handlers=[foo_handler],
disable_default_exception_handler=True)
{% endhighlight %}
## Chaining Exception Handlers
The handler itself is responsible for deciding whether or not the exception The handler itself is responsible for deciding whether or not the exception
handling is done, or should fall through to the next handler on the stack. handling is done, or should fall through to the next handler on the stack.
The handler can indicate this by returning a boolean. `False` means stop The handler can indicate this by returning a boolean. `False` means stop
processing exceptions, `True` means continue and fall through to the next processing exceptions, `True` means continue and fall through to the next
exception handler on the stack. exception handler on the stack.
@ -58,7 +64,7 @@ It's important to know for implementors that, by default, when the handler
doesn't have an explicit return value (thus `None`), this will be interpreted doesn't have an explicit return value (thus `None`), this will be interpreted
as `True` (i.e. continue with the next handler). as `True` (i.e. continue with the next handler).
To replace the default behaviour (i.e. moving the job to the `failed` queue), To prevent the next exception handler in the handler chain from executing,
use a custom exception handler that doesn't fall through, for example: use a custom exception handler that doesn't fall through, for example:
```python ```python

@ -61,7 +61,7 @@ In addition, you can add a few options to modify the behaviour of the queued
job. By default, these are popped out of the kwargs that will be passed to the job. By default, these are popped out of the kwargs that will be passed to the
job function. job function.
* `timeout` specifies the maximum runtime of the job before it's interrupted * `job_timeout` specifies the maximum runtime of the job before it's interrupted
and marked as `failed`. Its default unit is second and it can be an integer or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second(e.g. `'1h'`, `'3m'`, `'5s'`). and marked as `failed`. Its default unit is second and it can be an integer or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second(e.g. `'1h'`, `'3m'`, `'5s'`).
* `result_ttl` specifies the expiry time of the key where the job result will * `result_ttl` specifies the expiry time of the key where the job result will
be stored be stored
@ -72,8 +72,9 @@ job function.
* `job_id` allows you to manually specify this job's `job_id` * `job_id` allows you to manually specify this job's `job_id`
* `at_front` will place the job at the *front* of the queue, instead of the * `at_front` will place the job at the *front* of the queue, instead of the
back back
* `description` to add additional description to enqueued jobs.
* `kwargs` and `args` lets you bypass the auto-pop of these arguments, ie: * `kwargs` and `args` lets you bypass the auto-pop of these arguments, ie:
specify a `timeout` argument for the underlying job function. specify a `description` argument for the underlying job function.
In the last case, it may be advantageous to instead use the explicit version of In the last case, it may be advantageous to instead use the explicit version of
`.enqueue()`, `.enqueue_call()`: `.enqueue()`, `.enqueue_call()`:
@ -82,7 +83,7 @@ In the last case, it may be advantageous to instead use the explicit version of
q = Queue('low', connection=redis_conn) q = Queue('low', connection=redis_conn)
q.enqueue_call(func=count_words_at_url, q.enqueue_call(func=count_words_at_url,
args=('http://nvie.com',), args=('http://nvie.com',),
timeout=30) job_timeout=30)
``` ```
For cases where the web process doesn't have access to the source code running For cases where the web process doesn't have access to the source code running

@ -88,31 +88,52 @@ job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)
## Failed Jobs ## Failed Jobs
If a job fails and raises an exception, the worker will put the job in a failed job queue. If a job fails during execution, the worker will put the job in a FailedJobRegistry.
On the Job instance, the `is_failed` property will be true. To fetch all failed jobs, scan On the Job instance, the `is_failed` property will be true. FailedJobRegistry
through the `get_failed_queue()` queue. can be accessed through `queue.failed_job_registry`.
```python ```python
from redis import StrictRedis from redis import StrictRedis
from rq import push_connection, get_failed_queue, Queue from rq import Queue
from rq.job import Job from rq.job import Job
con = StrictRedis()
push_connection(con)
def div_by_zero(x): def div_by_zero(x):
return x / 0 return x / 0
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
fq = get_failed_queue()
fq.quarantine(job, Exception('Some fake error'))
assert fq.count == 1
fq.requeue(job.id) connection = StrictRedis()
queue = Queue(connection=connection)
job = queue.enqueue(div_by_zero, 1)
registry = queue.failed_job_registry
worker = Worker([queue])
worker.work(burst=True)
assert len(registry) == 1 # Failed jobs are kept in FailedJobRegistry
registry.requeue(job) # Puts job back in its original queue
assert len(registry) == 0
assert queue.count == 1
```
By default, failed jobs are kept for 1 year. You can change this by specifying
`failure_ttl` (in seconds) when enqueueing jobs.
```python
job = queue.enqueue(foo_job, failure_ttl=300) # 5 minutes in seconds
```
## Requeueing Failed Jobs
RQ also provides a CLI tool that makes requeueing failed jobs easy.
```console
# This will requeue foo_job_id and bar_job_id from myqueue's failed job registry
rq requeue --queue myqueue -u redis://localhost:6379 foo_job_id bar_job_id
assert fq.count == 0 # This command will requeue all jobs in myqueue's failed job registry
assert Queue('fake').count == 1 rq requeue --queue myqueue -u redis://localhost:6379 --all
``` ```

@ -68,7 +68,7 @@ This makes it possible to inspect and interpret the problem manually and
possibly resubmit the job. possibly resubmit the job.
## Dealing with interruption ## Dealing With Interruptions
When workers get killed in the polite way (Ctrl+C or `kill`), RQ tries hard not When workers get killed in the polite way (Ctrl+C or `kill`), RQ tries hard not
to lose any work. The current work is finished after which the worker will to lose any work. The current work is finished after which the worker will
@ -83,7 +83,7 @@ damage.
Just sayin'. Just sayin'.
## Dealing with job timeouts ## Dealing With Job Timeouts
By default, jobs should execute within 180 seconds. After that, the worker By default, jobs should execute within 180 seconds. After that, the worker
kills the work horse and puts the job onto the `failed` queue, indicating the kills the work horse and puts the job onto the `failed` queue, indicating the
@ -95,7 +95,7 @@ can be loosened (or tightened), by specifying it as a keyword argument to the
```python ```python
q = Queue() q = Queue()
q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, job_timeout=600) # 10 mins
``` ```
You can also change the default timeout for jobs that are enqueued via specific You can also change the default timeout for jobs that are enqueued via specific
@ -108,7 +108,7 @@ high = Queue('high', default_timeout=8) # 8 secs
low = Queue('low', default_timeout=600) # 10 mins low = Queue('low', default_timeout=600) # 10 mins
# Individual jobs can still override these defaults # Individual jobs can still override these defaults
low.enqueue(really_really_slow, timeout=3600) # 1 hr low.enqueue(really_really_slow, job_timeout=3600) # 1 hr
``` ```
Individual jobs can still specify an alternative timeout, as workers will Individual jobs can still specify an alternative timeout, as workers will

@ -8,7 +8,7 @@ solely as a work horse to perform lengthy or blocking tasks that you don't want
to perform inside web processes. to perform inside web processes.
## Starting workers ## Starting Workers
To start crunching work, simply start a worker from the root of your project To start crunching work, simply start a worker from the root of your project
directory: directory:
@ -30,7 +30,7 @@ concurrent processing going on. If you want to perform jobs concurrently,
simply start more workers. simply start more workers.
### Burst mode ### Burst Mode
By default, workers will start working immediately and will block and wait for By default, workers will start working immediately and will block and wait for
new work when they run out of work. Workers can also be started in _burst new work when they run out of work. Workers can also be started in _burst
@ -50,7 +50,7 @@ This can be useful for batch work that needs to be processed periodically, or
just to scale up your workers temporarily during peak periods. just to scale up your workers temporarily during peak periods.
### Worker arguments ### Worker Arguments
In addition to `--burst`, `rq worker` also accepts these arguments: In addition to `--burst`, `rq worker` also accepts these arguments:
@ -67,7 +67,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
## Inside the worker ## Inside the worker
### The worker life-cycle ### The Worker Lifecycle
The life-cycle of a worker consists of a few phases: The life-cycle of a worker consists of a few phases:
@ -86,11 +86,11 @@ The life-cycle of a worker consists of a few phases:
7. _Cleanup job execution_. The worker sets its status to `idle` and sets both 7. _Cleanup job execution_. The worker sets its status to `idle` and sets both
the job and its result to expire based on `result_ttl`. Job is also removed the job and its result to expire based on `result_ttl`. Job is also removed
from `StartedJobRegistry` and added to to `FinishedJobRegistry` in the case from `StartedJobRegistry` and added to to `FinishedJobRegistry` in the case
of successful execution, or `FailedQueue` in the case of failure. of successful execution, or `FailedJobRegistry` in the case of failure.
8. _Loop_. Repeat from step 3. 8. _Loop_. Repeat from step 3.
## Performance notes ## Performance Notes
Basically the `rq worker` shell script is a simple fetch-fork-execute loop. Basically the `rq worker` shell script is a simple fetch-fork-execute loop.
When a lot of your jobs do lengthy setups, or they all depend on the same set When a lot of your jobs do lengthy setups, or they all depend on the same set
@ -124,17 +124,29 @@ with Connection():
``` ```
### Worker names ### Worker Names
Workers are registered to the system under their names, see [monitoring][m]. Workers are registered to the system under their names, which are generated
By default, the name of a worker is equal to the concatenation of the current randomly during instantiation (see [monitoring][m]). To override this default,
hostname and the current PID. To override this default, specify the name when specify the name when starting the worker, or use the `--name` cli option.
starting the worker, using the `--name` option.
{% highlight python %}
from redis import Redis
from rq import Queue, Worker
redis = Redis()
queue = Queue('queue_name')
# Start a worker with a custom name
worker = Worker([queue], connection=redis, name='foo')
{% endhighlight %}
[m]: /docs/monitoring/ [m]: /docs/monitoring/
### Retrieving worker information ### Retrieving Worker Information
_Updated in version 0.10.0._
`Worker` instances store their runtime information in Redis. Here's how to `Worker` instances store their runtime information in Redis. Here's how to
retrieve them: retrieve them:
@ -150,11 +162,25 @@ workers = Worker.all(connection=redis)
# Returns all workers in this queue (new in version 0.10.0) # Returns all workers in this queue (new in version 0.10.0)
queue = Queue('queue_name') queue = Queue('queue_name')
workers = Worker.all(queue=queue) workers = Worker.all(queue=queue)
worker = workers[0]
print(worker.name)
``` ```
Aside from `worker.name`, worker also have the following properties:
* `hostname` - the host where this worker is run
* `pid` - worker's process ID
* `queues` - queues on which this worker is listening for jobs
* `state` - possible states are `suspended`, `started`, `busy` and `idle`
* `current_job` - the job it's currently executing (if any)
* `last_heartbeat` - the last time this worker was seen
* `birth_date` - time of worker's instantiation
* `successful_job_count` - number of jobs finished successfully
* `failed_job_count` - number of failed jobs processed
* `total_working_time` - amount of time spent executing jobs, in seconds
_New in version 0.10.0._ _New in version 0.10.0._
If you only want to know the number of workers for monitoring purposes, using If you only want to know the number of workers for monitoring purposes,
`Worker.count()` is much more performant. `Worker.count()` is much more performant.
```python ```python
@ -172,7 +198,7 @@ workers = Worker.all(queue=queue)
``` ```
### Worker statistics ### Worker Statistics
_New in version 0.9.0._ _New in version 0.9.0._
@ -184,12 +210,12 @@ from rq.worker import Worker
worker = Worker.find_by_key('rq:worker:name') worker = Worker.find_by_key('rq:worker:name')
worker.successful_job_count # Number of jobs finished successfully worker.successful_job_count # Number of jobs finished successfully
worker.failed_job_count. # Number of failed jobs processed by this worker worker.failed_job_count # Number of failed jobs processed by this worker
worker.total_working_time # Number of time spent executing jobs worker.total_working_time # Amount of time spent executing jobs (in seconds)
``` ```
## Taking down workers ## Taking Down Workers
If, at any time, the worker receives `SIGINT` (via Ctrl+C) or `SIGTERM` (via If, at any time, the worker receives `SIGINT` (via Ctrl+C) or `SIGTERM` (via
`kill`), the worker wait until the currently running task is finished, stop `kill`), the worker wait until the currently running task is finished, stop
@ -200,9 +226,7 @@ worker will forcefully terminate the child process (sending it `SIGKILL`), but
will still try to register its own death. will still try to register its own death.
## Using a config file ## Using a Config File
_New in version 0.3.2._
If you'd like to configure `rq worker` via a configuration file instead of If you'd like to configure `rq worker` via a configuration file instead of
through command line arguments, you can do this by creating a Python file like through command line arguments, you can do this by creating a Python file like
@ -240,9 +264,7 @@ $ rq worker -c settings
``` ```
## Custom worker classes ## Custom Worker Classes
_New in version 0.4.0._
There are times when you want to customize the worker's behavior. Some of the There are times when you want to customize the worker's behavior. Some of the
more common requests so far are: more common requests so far are:
@ -259,9 +281,7 @@ $ rq worker -w 'path.to.GeventWorker'
``` ```
## Custom Job and Queue classes ## Custom Job and Queue Classes
_Will be available in next release._
You can tell the worker to use a custom class for jobs and queues using You can tell the worker to use a custom class for jobs and queues using
`--job-class` and/or `--queue-class`. `--job-class` and/or `--queue-class`.
@ -289,7 +309,7 @@ queue.enqueue(some_func)
``` ```
## Custom DeathPenalty classes ## Custom DeathPenalty Classes
When a Job times-out, the worker will try to kill it using the supplied When a Job times-out, the worker will try to kill it using the supplied
`death_penalty_class` (default: `UnixSignalDeathPenalty`). This can be overridden `death_penalty_class` (default: `UnixSignalDeathPenalty`). This can be overridden
@ -299,9 +319,7 @@ DeathPenalty classes are constructed with the following arguments
`BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)` `BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)`
## Custom exception handlers ## Custom Exception Handlers
_New in version 0.5.5._
If you need to handle errors differently for different types of jobs, or simply want to customize If you need to handle errors differently for different types of jobs, or simply want to customize
RQ's default error handling behavior, run `rq worker` using the `--exception-handler` option: RQ's default error handling behavior, run `rq worker` using the `--exception-handler` option:
@ -312,3 +330,9 @@ $ rq worker --exception-handler 'path.to.my.ErrorHandler'
# Multiple exception handlers is also supported # Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler' $ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
``` ```
If you want to disable RQ's default exception handler, use the `--disable-default-exception-handler` option:
```console
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler
```

@ -3,45 +3,47 @@ title: "RQ: Sending exceptions to Sentry"
layout: patterns layout: patterns
--- ---
## Sending exceptions to Sentry ## Sending Exceptions to Sentry
[Sentry](https://www.getsentry.com/) is a popular exception gathering service [Sentry](https://www.getsentry.com/) is a popular exception gathering service.
that RQ supports integrating with since version 0.3.1, through its custom RQ allows you to very easily send job exceptions to Sentry. To do this, you'll
exception handlers. need to have [sentry-sdk](https://pypi.org/project/sentry-sdk/) installed.
RQ includes a convenience function that registers your existing Sentry client There are a few ways to start sending job exceptions to Sentry.
to send all exceptions to.
An example: ### Configuring Sentry Through CLI
{% highlight python %} Simply invoke the `rqworker` script using the ``--sentry-dsn`` argument.
from raven import Client
from raven.transport.http import HTTPTransport ```console
from rq.contrib.sentry import register_sentry rq worker --sentry-dsn https://my-dsn@sentry.io/123
```
client = Client('<YOUR_DSN>', transport=HTTPTransport)
register_sentry(client, worker)
{% endhighlight %} ### Configuring Sentry Through a Config File
Where `worker` is your RQ worker instance. After that, call `worker.work(...)` Declare `SENTRY_DSN` in RQ's config file like this:
to start the worker. All exceptions that occur are reported to Sentry
automatically. ```python
SENTRY_DSN = 'https://my-dsn@sentry.io/123'
<div class="warning" style="margin-top: 20px"> ```
<img style="float: right; margin-right: -60px; margin-top: -38px" src="{{site.baseurl}}img/warning.png" />
<strong>Note:</strong> And run RQ's worker with your config file:
<p>
Error delivery to Sentry is known to be unreliable with RQ when using ```console
async transports (the default is). So you are encouraged to use the rq worker -c my_settings
<code>HTTPTransport</code> or <code>RequestsHTTPTransport</code> when ```
creating your client. See the code sample above, or the <a
href="http://raven.readthedocs.org/en/latest/transports/index.html">Raven Visit [this page](https://python-rq.org/docs/workers/#using-a-config-file)
documentation</a>. to read more about running RQ using a config file.
</p>
<p>
For more info, see the ### Configuring Sentry Through Environment Variable
<a href="http://raven.readthedocs.org/en/latest/transports/index.html#transports">Raven docs</a>.
</p> Simple set `RQ_SENTRY_DSN` in your environment variable and RQ will
</div> automatically start Sentry integration for you.
Read more on RQ's [custom exception handling](/docs/exceptions/) capabilities. ```console
RQ_SENTRY_DSN="https://my-dsn@sentry.io/123" rq worker
```

@ -6,7 +6,7 @@ from __future__ import (absolute_import, division, print_function,
from .connections import (Connection, get_current_connection, pop_connection, from .connections import (Connection, get_current_connection, pop_connection,
push_connection, use_connection) push_connection, use_connection)
from .job import cancel_job, get_current_job, requeue_job from .job import cancel_job, get_current_job, requeue_job
from .queue import get_failed_queue, Queue from .queue import Queue
from .version import VERSION from .version import VERSION
from .worker import SimpleWorker, Worker from .worker import SimpleWorker, Worker

@ -12,7 +12,7 @@ import sys
import click import click
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, __version__ as version from rq import Connection, __version__ as version
from rq.cli.helpers import (read_config_file, refresh, from rq.cli.helpers import (read_config_file, refresh,
setup_loghandlers_from_args, setup_loghandlers_from_args,
show_both, show_queues, show_workers, CliConfig) show_both, show_queues, show_workers, CliConfig)
@ -23,6 +23,7 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry
from rq.utils import import_attribute from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend, from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended) resume as connection_resume, is_suspended)
@ -112,16 +113,16 @@ def empty(cli_config, all, queues, **options):
@main.command() @main.command()
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') @click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1) @click.argument('job_ids', nargs=-1)
@pass_cli_config @pass_cli_config
def requeue(cli_config, all, job_class, job_ids, **options): def requeue(cli_config, queue, all, job_class, job_ids, **options):
"""Requeue failed jobs.""" """Requeue failed jobs."""
failed_queue = get_failed_queue(connection=cli_config.connection, failed_job_registry = FailedJobRegistry(queue,
job_class=cli_config.job_class) connection=cli_config.connection)
if all: if all:
job_ids = failed_queue.job_ids job_ids = failed_job_registry.get_job_ids()
if not job_ids: if not job_ids:
click.echo('Nothing to do') click.echo('Nothing to do')
@ -132,12 +133,12 @@ def requeue(cli_config, all, job_class, job_ids, **options):
with click.progressbar(job_ids) as job_ids: with click.progressbar(job_ids) as job_ids:
for job_id in job_ids: for job_id in job_ids:
try: try:
failed_queue.requeue(job_id) failed_job_registry.requeue(job_id)
except InvalidJobOperationError: except InvalidJobOperationError:
fail_count += 1 fail_count += 1
if fail_count > 0: if fail_count > 0:
click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red') click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red')
@main.command() @main.command()
@ -183,16 +184,17 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') @click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
@click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') @click.option('--sentry-dsn', envvar='RQ_SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl, def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn, worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn,
exception_handler, pid, queues, log_format, date_format, **options): exception_handler, pid, disable_default_exception_handler, queues,
log_format, date_format, **options):
"""Starts an RQ worker.""" """Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {} settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments # Worker specific default arguments
queues = queues or settings.get('QUEUES', ['default']) queues = queues or settings.get('QUEUES', ['default'])
@ -220,23 +222,19 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
connection=cli_config.connection, connection=cli_config.connection,
job_class=cli_config.job_class) job_class=cli_config.job_class)
for queue in queues] for queue in queues]
worker = cli_config.worker_class(queues, worker = cli_config.worker_class(
name=name, queues, name=name, connection=cli_config.connection,
connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl,
default_worker_ttl=worker_ttl, job_monitoring_interval=job_monitoring_interval,
default_result_ttl=results_ttl, job_class=cli_config.job_class, queue_class=cli_config.queue_class,
job_monitoring_interval=job_monitoring_interval, exception_handlers=exception_handlers or None,
job_class=cli_config.job_class, disable_default_exception_handler=disable_default_exception_handler
queue_class=cli_config.queue_class, )
exception_handlers=exception_handlers or None)
# Should we configure Sentry? # Should we configure Sentry?
if sentry_dsn: if sentry_dsn:
from raven import Client
from raven.transport.http import HTTPTransport
from rq.contrib.sentry import register_sentry from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn, transport=HTTPTransport) register_sentry(sentry_dsn)
register_sentry(client, worker)
worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format) worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format)
except ConnectionError as e: except ConnectionError as e:

@ -53,22 +53,9 @@ def get_redis_from_config(settings, connection_class=Redis):
'port': settings.get('REDIS_PORT', 6379), 'port': settings.get('REDIS_PORT', 6379),
'db': settings.get('REDIS_DB', 0), 'db': settings.get('REDIS_DB', 0),
'password': settings.get('REDIS_PASSWORD', None), 'password': settings.get('REDIS_PASSWORD', None),
'ssl': settings.get('REDIS_SSL', False),
} }
use_ssl = settings.get('REDIS_SSL', False)
if use_ssl:
# If SSL is required, we need to depend on redis-py being 2.10 at
# least
def safeint(x):
try:
return int(x)
except ValueError:
return 0
version_info = tuple(safeint(x) for x in redis.__version__.split('.'))
if not version_info >= (2, 10):
raise RuntimeError('Using SSL requires a redis-py version >= 2.10')
kwargs['ssl'] = use_ssl
return connection_class(**kwargs) return connection_class(**kwargs)
@ -137,48 +124,49 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
if queues: if queues:
qs = list(map(queue_class, queues)) qs = list(map(queue_class, queues))
def any_matching_queue(worker): workers = set()
def queue_matches(q): for queue in qs:
return q in qs for worker in worker_class.all(queue=queue):
return any(map(queue_matches, worker.queues)) workers.add(worker)
# Filter out workers that don't match the queue filter
ws = [w for w in worker_class.all() if any_matching_queue(w)]
def filter_queues(queue_names):
return [qname for qname in queue_names if queue_class(qname) in qs]
else: else:
qs = queue_class.all() qs = queue_class.all()
ws = worker_class.all() workers = worker_class.all()
filter_queues = (lambda x: x)
if not by_queue: if not by_queue:
for w in ws:
worker_queues = filter_queues(w.queue_names()) for worker in workers:
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s)' % (worker.name, worker.hostname, worker.pid)
if not raw: if not raw:
click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) click.echo('%s: %s %s' % (name, state_symbol(worker.get_state()), queue_names))
else: else:
click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) click.echo('worker %s %s %s' % (name, worker.get_state(), queue_names))
else: else:
# Create reverse lookup table # Display workers by queue
queues = dict([(q, []) for q in qs]) queue_dict = {}
for w in ws: for queue in qs:
for q in w.queues: queue_dict[queue] = worker_class.all(queue=queue)
if q not in queues:
continue if queue_dict:
queues[q].append(w) max_length = max([len(q.name) for q, in queue_dict.keys()])
else:
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 max_length = 0
for q in queues:
if queues[q]: for queue in queue_dict:
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa if queue_dict[queue]:
queues_str = ", ".join(
sorted(
map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])
)
)
else: else:
queues_str = '' queues_str = ''
click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) click.echo('%s %s' % (pad(queue.name + ':', max_length + 1), queues_str))
if not raw: if not raw:
click.echo('%d workers, %d queues' % (len(ws), len(qs))) click.echo('%d workers, %d queues' % (len(workers), len(qs)))
def show_both(queues, raw, by_queue, queue_class, worker_class): def show_both(queues, raw, by_queue, queue_class, worker_class):

@ -3,19 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
def register_sentry(client, worker): def register_sentry(sentry_dsn):
"""Given a Raven client and an RQ worker, registers exception handlers """Given a Raven client and an RQ worker, registers exception handlers
with the worker so exceptions are logged to Sentry. with the worker so exceptions are logged to Sentry.
""" """
def send_to_sentry(job, *exc_info): import sentry_sdk
client.captureException( from sentry_sdk.integrations.rq import RqIntegration
exc_info=exc_info, sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()])
extra={
'job_id': job.id,
'func': job.func_name,
'args': job.args,
'kwargs': job.kwargs,
'description': job.description,
})
worker.push_exc_handler(send_to_sentry)

@ -5,5 +5,6 @@ DEFAULT_CONNECTION_CLASS = 'redis.Redis'
DEFAULT_WORKER_TTL = 420 DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500 DEFAULT_RESULT_TTL = 500
DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'

@ -15,6 +15,10 @@ class InvalidJobOperationError(Exception):
pass pass
class InvalidJobOperation(Exception):
pass
class UnpickleError(Exception): class UnpickleError(Exception):
def __init__(self, message, raw_data, inner_exception=None): def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message, inner_exception) super(UnpickleError, self).__init__(message, inner_exception)

@ -1,12 +0,0 @@
import traceback
from .connections import get_current_connection
from .queue import get_failed_queue
from .worker import Worker
def move_to_failed_queue(job, *exc_info):
"""Default exception handler: move the job to the failed queue."""
exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info))
failed_queue = get_failed_queue(get_current_connection(), job.__class__)
failed_queue.quarantine(job, exc_info=exc_string)

@ -20,6 +20,7 @@ try:
except ImportError: # noqa # pragma: no cover except ImportError: # noqa # pragma: no cover
import pickle import pickle
# Serialize pickle dumps using the highest pickle protocol (binary, default # Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii) # uses ascii)
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
@ -61,16 +62,6 @@ def cancel_job(job_id, connection=None):
Job.fetch(job_id, connection=connection).cancel() Job.fetch(job_id, connection=connection).cancel()
def requeue_job(job_id, connection=None, job_class=None):
"""Requeues the job with the given job ID. If no such job exists, just
remove the job ID from the failed queue, otherwise the job ID should refer
to a failed job (i.e. it should be on the failed queue).
"""
from .queue import get_failed_queue
failed_queue = get_failed_queue(connection=connection, job_class=job_class)
return failed_queue.requeue(job_id)
def get_current_job(connection=None, job_class=None): def get_current_job(connection=None, job_class=None):
"""Returns the Job instance that is currently being executed. If this """Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context, None is returned. function is invoked from outside a job context, None is returned.
@ -81,6 +72,11 @@ def get_current_job(connection=None, job_class=None):
return _job_stack.top return _job_stack.top
def requeue_job(job_id, connection):
job = Job.fetch(job_id, connection=connection)
return job.requeue()
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
@ -90,7 +86,8 @@ class Job(object):
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None, result_ttl=None, ttl=None, status=None, description=None,
depends_on=None, timeout=None, id=None, origin=None, meta=None): depends_on=None, timeout=None, id=None, origin=None, meta=None,
failure_ttl=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -131,6 +128,7 @@ class Job(object):
# Extra meta data # Extra meta data
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job.failure_ttl = failure_ttl
job.ttl = ttl job.ttl = ttl
job.timeout = parse_timeout(timeout) job.timeout = parse_timeout(timeout)
job._status = status job._status = status
@ -145,27 +143,11 @@ class Job(object):
self._status = as_text(self.connection.hget(self.key, 'status')) self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status return self._status
def _get_status(self):
warnings.warn(
"job.status is deprecated. Use job.get_status() instead",
DeprecationWarning
)
return self.get_status()
def set_status(self, status, pipeline=None): def set_status(self, status, pipeline=None):
self._status = status self._status = status
connection = pipeline or self.connection connection = pipeline or self.connection
connection.hset(self.key, 'status', self._status) connection.hset(self.key, 'status', self._status)
def _set_status(self, status):
warnings.warn(
"job.status is deprecated. Use job.set_status() instead",
DeprecationWarning
)
self.set_status(status)
status = property(_get_status, _set_status)
@property @property
def is_finished(self): def is_finished(self):
return self.get_status() == JobStatus.FINISHED return self.get_status() == JobStatus.FINISHED
@ -323,6 +305,7 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self.failure_ttl = None
self.ttl = None self.ttl = None
self._status = None self._status = None
self._dependency_id = None self._dependency_id = None
@ -447,6 +430,7 @@ class Job(object):
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None) self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None)) self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
@ -492,6 +476,8 @@ class Job(object):
obj['timeout'] = self.timeout obj['timeout'] = self.timeout
if self.result_ttl is not None: if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl obj['result_ttl'] = self.result_ttl
if self.failure_ttl is not None:
obj['failure_ttl'] = self.failure_ttl
if self._status is not None: if self._status is not None:
obj['status'] = self._status obj['status'] = self._status
if self._dependency_id is not None: if self._dependency_id is not None:
@ -538,11 +524,14 @@ class Job(object):
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
def requeue(self):
"""Requeues job."""
self.failed_job_registry.requeue(self)
def delete(self, pipeline=None, remove_from_queue=True, def delete(self, pipeline=None, remove_from_queue=True,
delete_dependents=False): delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending """Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well.""" on this job can optionally be deleted as well."""
if remove_from_queue: if remove_from_queue:
self.cancel(pipeline=pipeline) self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
@ -569,10 +558,7 @@ class Job(object):
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_failed: elif self.is_failed:
from .queue import get_failed_queue self.failed_job_registry.remove(self, pipeline=pipeline)
failed_queue = get_failed_queue(connection=self.connection,
job_class=self.__class__)
failed_queue.remove(self, pipeline=pipeline)
if delete_dependents: if delete_dependents:
self.delete_dependents(pipeline=pipeline) self.delete_dependents(pipeline=pipeline)
@ -655,6 +641,12 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
@property
def failed_job_registry(self):
from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)
def register_dependency(self, pipeline=None): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as depend on is successfully performed. We record this relation as

@ -9,16 +9,19 @@ from rq.defaults import (DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_LOGGING_DATE_FORMAT)
def setup_loghandlers(level, date_format=DEFAULT_LOGGING_DATE_FORMAT, def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT): log_format=DEFAULT_LOGGING_FORMAT):
logger = logging.getLogger('rq.worker') logger = logging.getLogger('rq.worker')
if not _has_effective_handler(logger): if not _has_effective_handler(logger):
logger.setLevel(level)
formatter = logging.Formatter(fmt=log_format, datefmt=date_format) formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
handler = ColorizingStreamHandler() handler = ColorizingStreamHandler()
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
if level is not None:
logger.setLevel(level)
def _has_effective_handler(logger): def _has_effective_handler(logger):
""" """

@ -10,17 +10,12 @@ from redis import WatchError
from .compat import as_text, string_types, total_ordering from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobDependency, from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError,
InvalidJobOperationError, NoSuchJobError, UnpickleError) UnpickleError)
from .job import Job, JobStatus from .job import Job, JobStatus
from .utils import backend_class, import_attribute, utcnow, parse_timeout from .utils import backend_class, import_attribute, utcnow, parse_timeout
def get_failed_queue(connection=None, job_class=None):
"""Returns a handle to the special failed queue."""
return FailedQueue(connection=connection, job_class=job_class)
def compact(lst): def compact(lst):
return [item for item in lst if item is not None] return [item for item in lst if item is not None]
@ -94,6 +89,17 @@ class Queue(object):
"""Returns the Redis key for this Queue.""" """Returns the Redis key for this Queue."""
return self._key return self._key
@property
def registry_cleaning_key(self):
"""Redis key used to indicate this queue has been cleaned."""
return 'rq:clean_registries:%s' % self.name
def acquire_cleaning_lock(self):
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
"""
return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
def empty(self): def empty(self):
"""Removes all messages on the queue.""" """Removes all messages on the queue."""
script = """ script = """
@ -141,8 +147,7 @@ class Queue(object):
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
else: else:
if job.origin == self.name or \ if job.origin == self.name:
(job.is_failed and self == get_failed_queue(connection=self.connection, job_class=self.job_class)):
return job return job
def get_job_ids(self, offset=0, length=-1): def get_job_ids(self, offset=0, length=-1):
@ -175,6 +180,12 @@ class Queue(object):
"""Returns a count of all messages in the queue.""" """Returns a count of all messages in the queue."""
return self.connection.llen(self.key) return self.connection.llen(self.key)
@property
def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry
return FailedJobRegistry(queue=self)
def remove(self, job_or_id, pipeline=None): def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
@ -210,8 +221,9 @@ class Queue(object):
connection.rpush(self.key, job_id) connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, description=None, result_ttl=None, ttl=None, failure_ttl=None,
depends_on=None, job_id=None, at_front=False, meta=None): description=None, depends_on=None, job_id=None,
at_front=False, meta=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -221,13 +233,15 @@ class Queue(object):
""" """
timeout = parse_timeout(timeout) or self._default_timeout timeout = parse_timeout(timeout) or self._default_timeout
result_ttl = parse_timeout(result_ttl) result_ttl = parse_timeout(result_ttl)
failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl) ttl = parse_timeout(ttl)
job = self.job_class.create( job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection, func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, status=JobStatus.QUEUED, description=description,
timeout=timeout, id=job_id, origin=self.name, meta=meta) depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it. # parent's dependents instead of enqueueing it.
@ -290,16 +304,12 @@ class Queue(object):
'by workers') 'by workers')
# Detect explicit invocations, i.e. of the form: # Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30)
timeout = kwargs.pop('job_timeout', None) timeout = kwargs.pop('job_timeout', None)
if timeout is None:
timeout = kwargs.pop('timeout', None)
if timeout:
warnings.warn('The `timeout` keyword is deprecated. Use `job_timeout` instead', DeprecationWarning)
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None) ttl = kwargs.pop('ttl', None)
failure_ttl = kwargs.pop('failure_ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None) job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False) at_front = kwargs.pop('at_front', False)
@ -310,10 +320,12 @@ class Queue(object):
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(
timeout=timeout, result_ttl=result_ttl, ttl=ttl, func=f, args=args, kwargs=kwargs, timeout=timeout,
description=description, depends_on=depends_on, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
job_id=job_id, at_front=at_front, meta=meta) description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta
)
def enqueue_job(self, job, pipeline=None, at_front=False): def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
@ -429,28 +441,6 @@ class Queue(object):
return queue_key, blob return queue_key, blob
return None return None
def dequeue(self):
"""Dequeues the front-most job from this queue.
Returns a job_class instance, which can be executed or inspected.
"""
while True:
job_id = self.pop_job_id()
if job_id is None:
return None
try:
job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore),
continue
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
e.job_id = job_id
e.queue = self
raise e
return job
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None, job_class=None): def dequeue_any(cls, queues, timeout, connection=None, job_class=None):
"""Class method returning the job_class instance at the front of the given """Class method returning the job_class instance at the front of the given
@ -509,48 +499,3 @@ class Queue(object):
def __str__(self): def __str__(self):
return '<{0} {1}>'.format(self.__class__.__name__, self.name) return '<{0} {1}>'.format(self.__class__.__name__, self.name)
class FailedQueue(Queue):
def __init__(self, connection=None, job_class=None):
super(FailedQueue, self).__init__(JobStatus.FAILED,
connection=connection,
job_class=job_class)
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
queue).
"""
with self.connection.pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.exc_info = exc_info
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
self.push_job_id(str(job.id), pipeline=pipeline)
pipeline.execute()
return job
def requeue(self, job_id):
"""Requeues the job with the given job ID."""
try:
job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing)
self.remove(job_id)
return
# Delete it from the failed queue (raise an error if that failed)
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs')
job.set_status(JobStatus.QUEUED)
job.exc_info = None
queue = Queue(job.origin,
connection=self.connection,
job_class=self.job_class)
return queue.enqueue_job(job)

@ -1,8 +1,9 @@
from .compat import as_text from .compat import as_text
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
from .job import Job, JobStatus from .job import Job, JobStatus
from .queue import FailedQueue from .queue import Queue
from .utils import backend_class, current_timestamp from .utils import backend_class, current_timestamp
@ -27,11 +28,20 @@ class BaseRegistry(object):
self.key = self.key_template.format(self.name) self.key = self.key_template.format(self.name)
self.job_class = backend_class(self, 'job_class', override=job_class) self.job_class = backend_class(self, 'job_class', override=job_class)
def __len__(self): def __len__(self):
"""Returns the number of jobs in this registry""" """Returns the number of jobs in this registry"""
return self.count return self.count
def __contains__(self, item):
"""
Returns a boolean indicating registry contains the given
job instance or job id.
"""
job_id = item
if isinstance(item, self.job_class):
job_id = item.id
return self.connection.zscore(self.key, job_id) is not None
@property @property
def count(self): def count(self):
"""Returns the number of jobs in this registry""" """Returns the number of jobs in this registry"""
@ -69,6 +79,10 @@ class BaseRegistry(object):
return [as_text(job_id) for job_id in return [as_text(job_id) for job_id in
self.connection.zrange(self.key, start, end)] self.connection.zrange(self.key, start, end)]
def get_queue(self):
"""Returns Queue object associated with this registry."""
return Queue(self.name, connection=self.connection)
class StartedJobRegistry(BaseRegistry): class StartedJobRegistry(BaseRegistry):
""" """
@ -82,7 +96,7 @@ class StartedJobRegistry(BaseRegistry):
key_template = 'rq:wip:{0}' key_template = 'rq:wip:{0}'
def cleanup(self, timestamp=None): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue. """Remove expired jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if seconds since the Unix epoch. timestamp defaults to call time if
@ -92,8 +106,7 @@ class StartedJobRegistry(BaseRegistry):
job_ids = self.get_expired_job_ids(score) job_ids = self.get_expired_job_ids(score)
if job_ids: if job_ids:
failed_queue = FailedQueue(connection=self.connection, failed_job_registry = FailedJobRegistry(self.name, self.connection)
job_class=self.job_class)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
@ -103,7 +116,7 @@ class StartedJobRegistry(BaseRegistry):
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) job.cleanup(ttl=-1, pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl)
except NoSuchJobError: except NoSuchJobError:
pass pass
@ -131,6 +144,61 @@ class FinishedJobRegistry(BaseRegistry):
self.connection.zremrangebyscore(self.key, 0, score) self.connection.zremrangebyscore(self.key, 0, score)
class FailedJobRegistry(BaseRegistry):
"""
Registry of containing failed jobs.
"""
key_template = 'rq:failed:{0}'
def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
def add(self, job, ttl=None, exc_string='', pipeline=None):
"""
Adds a job to a registry with expiry time of now + ttl.
`ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
"""
if ttl is None:
ttl = DEFAULT_FAILURE_TTL
score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline:
p = pipeline
else:
p = self.connection.pipeline()
job.exc_info = exc_string
job.save(pipeline=p, include_meta=False)
job.cleanup(ttl=-1, pipeline=p) # failed job won't expire
p.zadd(self.key, {job.id: score})
if not pipeline:
p.execute()
def requeue(self, job_or_id):
"""Requeues the job with the given job ID."""
if isinstance(job_or_id, self.job_class):
job = job_or_id
else:
job = self.job_class.fetch(job_or_id, connection=self.connection)
result = self.connection.zrem(self.key, job.id)
if not result:
raise InvalidJobOperation
queue = Queue(job.origin, connection=self.connection,
job_class=self.job_class)
return queue.enqueue_job(job)
class DeferredJobRegistry(BaseRegistry): class DeferredJobRegistry(BaseRegistry):
""" """
Registry of deferred jobs (waiting for another job to finish). Registry of deferred jobs (waiting for another job to finish).
@ -154,3 +222,8 @@ def clean_registries(queue):
connection=queue.connection, connection=queue.connection,
job_class=queue.job_class) job_class=queue.job_class)
registry.cleanup() registry.cleanup()
registry = FailedJobRegistry(name=queue.name,
connection=queue.connection,
job_class=queue.job_class)
registry.cleanup()

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '0.13.0' VERSION = '1.0'

@ -13,6 +13,7 @@ import time
import traceback import traceback
import warnings import warnings
from datetime import timedelta from datetime import timedelta
from uuid import uuid4
try: try:
from signal import SIGKILL from signal import SIGKILL
@ -24,19 +25,22 @@ from redis import WatchError
from . import worker_registration from . import worker_registration
from .compat import PY2, as_text, string_types, text_type from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
from .defaults import (DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from .exceptions import DequeueTimeout, ShutDownImminentException from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import Queue, get_failed_queue from .queue import Queue
from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries from .registry import (FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry, clean_registries)
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum, from .utils import (backend_class, ensure_list, enum,
make_colorizer, utcformat, utcnow, utcparse) make_colorizer, utcformat, utcnow, utcparse)
from .version import VERSION from .version import VERSION
from .worker_registration import get_keys from .worker_registration import clean_worker_registry, get_keys
try: try:
from procname import setprocname from procname import setprocname
@ -56,10 +60,6 @@ class StopRequested(Exception):
pass pass
def iterable(x):
return hasattr(x, '__iter__')
def compact(l): def compact(l):
return [x for x in l if x is not None] return [x for x in l if x is not None]
@ -148,11 +148,8 @@ class Worker(object):
return None return None
name = worker_key[len(prefix):] name = worker_key[len(prefix):]
worker = cls([], worker = cls([], name, connection=connection, job_class=job_class,
name, queue_class=queue_class, prepare_for_work=False)
connection=connection,
job_class=job_class,
queue_class=queue_class)
worker.refresh() worker.refresh()
@ -161,13 +158,21 @@ class Worker(object):
def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL, def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL,
connection=None, exc_handler=None, exception_handlers=None, connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None, default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None,
queue_class=None, queue_class=None, log_job_description=True,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
log_job_description=True): # noqa disable_default_exception_handler=False,
prepare_for_work=True): # noqa
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
if prepare_for_work:
self.hostname = socket.gethostname()
self.pid = os.getpid()
else:
self.hostname = None
self.pid = None
self.job_class = backend_class(self, 'job_class', override=job_class) self.job_class = backend_class(self, 'job_class', override=job_class)
self.queue_class = backend_class(self, 'queue_class', override=queue_class) self.queue_class = backend_class(self, 'queue_class', override=queue_class)
@ -176,7 +181,8 @@ class Worker(object):
job_class=self.job_class) job_class=self.job_class)
if isinstance(q, string_types) else q if isinstance(q, string_types) else q
for q in ensure_list(queues)] for q in ensure_list(queues)]
self._name = name
self.name = name or uuid4().hex
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
self._exc_handlers = [] self._exc_handlers = []
@ -191,27 +197,17 @@ class Worker(object):
self._stop_requested = False self._stop_requested = False
self.log = logger self.log = logger
self.log_job_description = log_job_description self.log_job_description = log_job_description
self.failed_queue = get_failed_queue(connection=self.connection,
job_class=self.job_class)
self.last_cleaned_at = None self.last_cleaned_at = None
self.successful_job_count = 0 self.successful_job_count = 0
self.failed_job_count = 0 self.failed_job_count = 0
self.total_working_time = 0 self.total_working_time = 0
self.birth_date = None self.birth_date = None
# By default, push the "move-to-failed-queue" exception handler onto self.disable_default_exception_handler = disable_default_exception_handler
# the stack
if exception_handlers is None: if isinstance(exception_handlers, list):
self.push_exc_handler(self.move_to_failed_queue) for handler in exception_handlers:
if exc_handler is not None: self.push_exc_handler(handler)
self.push_exc_handler(exc_handler)
warnings.warn(
"exc_handler is deprecated, pass a list to exception_handlers instead.",
DeprecationWarning
)
elif isinstance(exception_handlers, list):
for h in exception_handlers:
self.push_exc_handler(h)
elif exception_handlers is not None: elif exception_handlers is not None:
self.push_exc_handler(exception_handlers) self.push_exc_handler(exception_handlers)
@ -229,30 +225,11 @@ class Worker(object):
"""Returns the Redis keys representing this worker's queues.""" """Returns the Redis keys representing this worker's queues."""
return list(map(lambda q: q.key, self.queues)) return list(map(lambda q: q.key, self.queues))
@property
def name(self):
"""Returns the name of the worker, under which it is registered to the
monitoring system.
By default, the name of the worker is constructed from the current
(short) host name and the current PID.
"""
if self._name is None:
hostname = socket.gethostname()
shortname, _, _ = hostname.partition('.')
self._name = '{0}.{1}'.format(shortname, self.pid)
return self._name
@property @property
def key(self): def key(self):
"""Returns the worker's Redis hash key.""" """Returns the worker's Redis hash key."""
return self.redis_worker_namespace_prefix + self.name return self.redis_worker_namespace_prefix + self.name
@property
def pid(self):
"""The current process ID."""
return os.getpid()
@property @property
def horse_pid(self): def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return """The horse's process ID. Only available in the worker. Will return
@ -289,6 +266,8 @@ class Worker(object):
p.hset(key, 'birth', now_in_string) p.hset(key, 'birth', now_in_string)
p.hset(key, 'last_heartbeat', now_in_string) p.hset(key, 'last_heartbeat', now_in_string)
p.hset(key, 'queues', queues) p.hset(key, 'queues', queues)
p.hset(key, 'pid', self.pid)
p.hset(key, 'hostname', self.hostname)
worker_registration.register(self, p) worker_registration.register(self, p)
p.expire(key, self.default_worker_ttl) p.expire(key, self.default_worker_ttl)
p.execute() p.execute()
@ -502,6 +481,17 @@ class Worker(object):
except StopRequested: except StopRequested:
break break
except SystemExit:
# Cold shutdown detected
raise
except: # noqa
self.log.error(
'Worker %s: found an unhandled exception, quitting...',
self.name, exc_info=True
)
break
finally: finally:
if not self.is_horse: if not self.is_horse:
self.register_death() self.register_death()
@ -526,12 +516,11 @@ class Worker(object):
job, queue = result job, queue = result
if self.log_job_description: if self.log_job_description:
self.log.info('%s: %s (%s)', green(queue.name), self.log.info(
blue(job.description), '%s: %s (%s)', green(queue.name),
job.id) blue(job.description), job.id)
else: else:
self.log.info('%s:%s', green(queue.name), self.log.info('%s:%s', green(queue.name), job.id)
job.id)
break break
except DequeueTimeout: except DequeueTimeout:
@ -561,10 +550,14 @@ class Worker(object):
def refresh(self): def refresh(self):
data = self.connection.hmget( data = self.connection.hmget(
self.key, 'queues', 'state', 'current_job', 'last_heartbeat', self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
'birth', 'failed_job_count', 'successful_job_count', 'total_working_time' 'birth', 'failed_job_count', 'successful_job_count',
'total_working_time', 'hostname', 'pid'
) )
queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data (queues, state, job_id, last_heartbeat, birth, failed_job_count,
successful_job_count, total_working_time, hostname, pid) = data
queues = as_text(queues) queues = as_text(queues)
self.hostname = hostname
self.pid = int(pid) if pid else None
self._state = as_text(state or '?') self._state = as_text(state or '?')
self._job_id = job_id or None self._job_id = job_id or None
if last_heartbeat: if last_heartbeat:
@ -598,7 +591,7 @@ class Worker(object):
def increment_total_working_time(self, job_execution_time, pipeline): def increment_total_working_time(self, job_execution_time, pipeline):
pipeline.hincrbyfloat(self.key, 'total_working_time', pipeline.hincrbyfloat(self.key, 'total_working_time',
job_execution_time.microseconds) job_execution_time.total_seconds())
def fork_work_horse(self, job, queue): def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.
@ -648,19 +641,17 @@ class Worker(object):
if not job.ended_at: if not job.ended_at:
job.ended_at = utcnow() job.ended_at = utcnow()
self.handle_job_failure(job=job)
# Unhandled failure: move the job to the failed queue # Unhandled failure: move the job to the failed queue
self.log.warning(( self.log.warning((
'Moving job to %r queue ' 'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned %s)' '(work-horse terminated unexpectedly; waitpid returned {})'
), self.failed_queue.name, ret_val) ).format(ret_val))
self.failed_queue.quarantine(
exc_string = "Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val
self.handle_job_failure(
job, job,
exc_info=( exc_string="Work-horse process was terminated unexpectedly "
"Work-horse process was terminated unexpectedly " "(waitpid returned %s)" % ret_val
"(waitpid returned {0})"
).format(ret_val)
) )
def execute_job(self, job, queue): def execute_job(self, job, queue):
@ -727,24 +718,37 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time())) self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job, started_job_registry=None): def handle_job_failure(self, job, started_job_registry=None,
exc_string=''):
"""Handles the failure or an executing job by: """Handles the failure or an executing job by:
1. Setting the job status to failed 1. Setting the job status to failed
2. Removing the job from the started_job_registry 2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None 3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
""" """
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
if started_job_registry is None: if started_job_registry is None:
started_job_registry = StartedJobRegistry(job.origin, started_job_registry = StartedJobRegistry(
self.connection, job.origin,
job_class=self.job_class) self.connection,
job_class=self.job_class
)
job.set_status(JobStatus.FAILED, pipeline=pipeline) job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler:
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline) self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at: if job.started_at and job.ended_at:
self.increment_total_working_time(job.ended_at - job.started_at, self.increment_total_working_time(
pipeline) job.ended_at - job.started_at,
pipeline
)
try: try:
pipeline.execute() pipeline.execute()
@ -795,7 +799,6 @@ class Worker(object):
inside the work horse's process. inside the work horse's process.
""" """
self.prepare_job_execution(job, heartbeat_ttl) self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection) push_connection(self.connection)
started_job_registry = StartedJobRegistry(job.origin, started_job_registry = StartedJobRegistry(job.origin,
@ -813,15 +816,18 @@ class Worker(object):
# Pickle the result in the same try-except block since we need # Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails # to use the same exc handling when pickling fails
job._result = rv job._result = rv
self.handle_job_success(job=job, self.handle_job_success(job=job,
queue=queue, queue=queue,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
except: except:
job.ended_at = utcnow() job.ended_at = utcnow()
self.handle_job_failure(job=job, exc_info = sys.exc_info()
exc_string = self._get_safe_exception_string(
traceback.format_exception(*exc_info)
)
self.handle_job_failure(job=job, exc_string=exc_string,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *exc_info)
return False return False
finally: finally:
@ -855,7 +861,7 @@ class Worker(object):
'queue': job.origin, 'queue': job.origin,
}) })
for handler in reversed(self._exc_handlers): for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler) self.log.debug('Invoking exception handler %s', handler)
fallthrough = handler(job, *exc_info) fallthrough = handler(job, *exc_info)
@ -867,12 +873,6 @@ class Worker(object):
if not fallthrough: if not fallthrough:
break break
def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue."""
self.log.warning('Moving job to %r queue', self.failed_queue.name)
from .handlers import move_to_failed_queue
move_to_failed_queue(job, *exc_info)
@staticmethod @staticmethod
def _get_safe_exception_string(exc_strings): def _get_safe_exception_string(exc_strings):
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely.""" """Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
@ -904,16 +904,20 @@ class Worker(object):
def clean_registries(self): def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries.""" """Runs maintenance jobs on each Queue's registries."""
for queue in self.queues: for queue in self.queues:
self.log.info('Cleaning registries for queue: %s', queue.name) # If there are multiple workers running, we only want 1 worker
clean_registries(queue) # to run clean_registries().
if queue.acquire_cleaning_lock():
self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue)
clean_worker_registry(queue)
self.last_cleaned_at = utcnow() self.last_cleaned_at = utcnow()
@property @property
def should_run_maintenance_tasks(self): def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour.""" """Maintenance tasks should run on first startup or 15 minutes."""
if self.last_cleaned_at is None: if self.last_cleaned_at is None:
return True return True
if (utcnow() - self.last_cleaned_at) > timedelta(hours=1): if (utcnow() - self.last_cleaned_at) > timedelta(minutes=15):
return True return True
return False return False

@ -43,3 +43,25 @@ def get_keys(queue=None, connection=None):
redis_key = REDIS_WORKER_KEYS redis_key = REDIS_WORKER_KEYS
return {as_text(key) for key in redis.smembers(redis_key)} return {as_text(key) for key in redis.smembers(redis_key)}
def clean_worker_registry(queue):
"""Delete invalid worker keys in registry"""
keys = list(get_keys(queue))
with queue.connection.pipeline() as pipeline:
for key in keys:
pipeline.exists(key)
results = pipeline.execute()
invalid_keys = []
for i, key_exists in enumerate(results):
if not key_exists:
invalid_keys.append(keys[i])
if invalid_keys:
pipeline.srem(WORKERS_BY_QUEUE_KEY % queue.name, *invalid_keys)
pipeline.srem(REDIS_WORKER_KEYS, *invalid_keys)
pipeline.execute()

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
REDIS_HOST = "testhost.example.com"
SENTRY_DSN = 'https://123@sentry.io/123'

@ -119,6 +119,12 @@ def black_hole(job, *exc_info):
return False return False
def add_meta(job, *exc_info):
job.meta = {'foo': 1}
job.save()
return True
def save_key_ttl(key): def save_key_ttl(key):
# Stores key ttl in meta # Stores key ttl in meta
job = get_current_job() job = get_current_job()

@ -3,19 +3,19 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from click.testing import CliRunner from click.testing import CliRunner
from rq import get_failed_queue, Queue from redis import Redis
from rq.job import Job
from rq import Queue
from rq.cli import main from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
from rq.registry import FailedJobRegistry
from rq.worker import Worker, WorkerStatus
import pytest import pytest
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero from tests.fixtures import div_by_zero, say_hello
try:
from unittest import TestCase
except ImportError:
from unittest2 import TestCase # noqa
class TestRQCli(RQTestCase): class TestRQCli(RQTestCase):
@ -39,20 +39,20 @@ class TestRQCli(RQTestCase):
super(TestRQCli, self).setUp() super(TestRQCli, self).setUp()
db_num = self.testconn.connection_pool.connection_kwargs['db'] db_num = self.testconn.connection_pool.connection_kwargs['db']
self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
self.connection = Redis.from_url(self.redis_url)
job = Job.create(func=div_by_zero, args=(1, 2, 3)) job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake' job.origin = 'fake'
job.save() job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
def test_config_file(self): def test_config_file(self):
settings = read_config_file('tests.dummy_settings') settings = read_config_file('tests.config_files.dummy')
self.assertIn('REDIS_HOST', settings) self.assertIn('REDIS_HOST', settings)
self.assertEqual(settings['REDIS_HOST'], 'testhost.example.com') self.assertEqual(settings['REDIS_HOST'], 'testhost.example.com')
def test_config_file_option(self): def test_config_file_option(self):
"""""" """"""
cli_config = CliConfig(config='tests.dummy_settings') cli_config = CliConfig(config='tests.config_files.dummy')
self.assertEqual( self.assertEqual(
cli_config.connection.connection_pool.connection_kwargs['host'], cli_config.connection.connection_pool.connection_kwargs['host'],
'testhost.example.com', 'testhost.example.com',
@ -63,7 +63,7 @@ class TestRQCli(RQTestCase):
def test_config_file_default_options(self): def test_config_file_default_options(self):
"""""" """"""
cli_config = CliConfig(config='tests.dummy_settings') cli_config = CliConfig(config='tests.config_files.dummy')
self.assertEqual( self.assertEqual(
cli_config.connection.connection_pool.connection_kwargs['host'], cli_config.connection.connection_pool.connection_kwargs['host'],
@ -84,7 +84,7 @@ class TestRQCli(RQTestCase):
def test_config_file_default_options_override(self): def test_config_file_default_options_override(self):
"""""" """"""
cli_config = CliConfig(config='tests.dummy_settings_override') cli_config = CliConfig(config='tests.config_files.dummy_override')
self.assertEqual( self.assertEqual(
cli_config.connection.connection_pool.connection_kwargs['host'], cli_config.connection.connection_pool.connection_kwargs['host'],
@ -110,34 +110,55 @@ class TestRQCli(RQTestCase):
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Nothing to do') self.assertEqual(result.output.strip(), 'Nothing to do')
def test_empty_failed(self):
"""rq empty -u <url> failed"""
runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_empty_all(self):
"""rq empty -u <url> failed --all"""
runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, '--all'])
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self): def test_requeue(self):
"""rq requeue -u <url> --all""" """rq requeue -u <url> --all"""
connection = Redis.from_url(self.redis_url)
queue = Queue('requeue', connection=connection)
registry = queue.failed_job_registry
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
job = queue.enqueue(div_by_zero)
job2 = queue.enqueue(div_by_zero)
job3 = queue.enqueue(div_by_zero)
worker = Worker([queue])
worker.work(burst=True)
self.assertIn(job, registry)
self.assertIn(job2, registry)
self.assertIn(job3, registry)
result = runner.invoke(
main,
['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id]
)
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) # Only the first specified job is requeued
self.assertNotIn(job, registry)
self.assertIn(job2, registry)
self.assertIn(job3, registry)
result = runner.invoke(
main,
['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all']
)
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Nothing to do') # With --all flag, all failed jobs are requeued
self.assertNotIn(job2, registry)
self.assertNotIn(job3, registry)
def test_info(self): def test_info(self):
"""rq info -u <url>""" """rq info -u <url>"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assert_normal_execution(result)
self.assertIn('0 queues, 0 jobs total', result.output)
queue = Queue(connection=self.connection)
queue.enqueue(say_hello)
result = runner.invoke(main, ['info', '-u', self.redis_url]) result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output) self.assertIn('1 queues, 1 jobs total', result.output)
@ -147,6 +168,13 @@ class TestRQCli(RQTestCase):
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-queues']) result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-queues'])
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertIn('0 queues, 0 jobs total', result.output)
queue = Queue(connection=self.connection)
queue.enqueue(say_hello)
result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output) self.assertIn('1 queues, 1 jobs total', result.output)
def test_info_only_workers(self): def test_info_only_workers(self):
@ -154,8 +182,42 @@ class TestRQCli(RQTestCase):
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result) self.assert_normal_execution(result)
self.assertIn('0 workers, 0 queue', result.output)
queue = Queue(connection=self.connection)
queue.enqueue(say_hello)
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('0 workers, 1 queues', result.output) self.assertIn('0 workers, 1 queues', result.output)
foo_queue = Queue(name='foo', connection=self.connection)
foo_queue.enqueue(say_hello)
bar_queue = Queue(name='bar', connection=self.connection)
bar_queue.enqueue(say_hello)
worker = Worker([foo_queue, bar_queue], connection=self.connection)
worker.register_birth()
worker_2 = Worker([foo_queue, bar_queue], connection=self.connection)
worker_2.register_birth()
worker_2.set_state(WorkerStatus.BUSY)
result = runner.invoke(main, ['info', 'foo', 'bar',
'-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('2 workers, 2 queues', result.output)
result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue',
'-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
# Ensure both queues' workers are shown
self.assertIn('foo:', result.output)
self.assertIn('bar:', result.output)
self.assertIn('2 workers, 2 queues', result.output)
def test_worker(self): def test_worker(self):
"""rq worker -u <url> -b""" """rq worker -u <url> -b"""
runner = CliRunner() runner = CliRunner()
@ -172,22 +234,41 @@ class TestRQCli(RQTestCase):
def test_exception_handlers(self): def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>""" """rq worker -u <url> -b --exception-handler <handler>"""
q = Queue() connection = Redis.from_url(self.redis_url)
failed_q = get_failed_queue() q = Queue('default', connection=connection)
failed_q.empty()
runner = CliRunner() runner = CliRunner()
# If exception handler is not given, failed job goes to FailedQueue # If exception handler is not given, no custom exception handler is run
q.enqueue(div_by_zero) job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(failed_q.count, 1) registry = FailedJobRegistry(queue=q)
self.assertTrue(job in registry)
# Black hole exception handler doesn't add failed jobs to FailedQueue # If disable-default-exception-handler is given, job is not moved to FailedJobRegistry
q.enqueue(div_by_zero) job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--disable-default-exception-handler'])
registry = FailedJobRegistry(queue=q)
self.assertFalse(job in registry)
# Both default and custom exception handler is run
job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--exception-handler', 'tests.fixtures.add_meta'])
registry = FailedJobRegistry(queue=q)
self.assertTrue(job in registry)
job.refresh()
self.assertEqual(job.meta, {'foo': 1})
# Only custom exception handler is run
job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b', runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--exception-handler', 'tests.fixtures.black_hole']) '--exception-handler', 'tests.fixtures.add_meta',
self.assertEqual(failed_q.count, 1) '--disable-default-exception-handler'])
registry = FailedJobRegistry(queue=q)
self.assertFalse(job in registry)
job.refresh()
self.assertEqual(job.meta, {'foo': 1})
def test_suspend_and_resume(self): def test_suspend_and_resume(self):
"""rq suspend -u <url> """rq suspend -u <url>

@ -18,8 +18,10 @@ from tests import fixtures, RQTestCase
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job from rq.job import Job, get_current_job, JobStatus, cancel_job
from rq.queue import Queue, get_failed_queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry)
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker
@ -360,6 +362,18 @@ class TestJob(RQTestCase):
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None) self.assertEqual(job.result_ttl, None)
def test_failure_ttl_is_persisted(self):
"""Ensure job.failure_ttl is set and restored properly"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',), failure_ttl=15)
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.failure_ttl, 15)
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.failure_ttl, None)
def test_description_is_persisted(self): def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly""" """Ensure that job's custom description is set properly"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!') job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!')
@ -383,10 +397,11 @@ class TestJob(RQTestCase):
def test_job_access_within_job_function(self): def test_job_access_within_job_function(self):
"""The current job is accessible within the job function.""" """The current job is accessible within the job function."""
q = Queue() q = Queue()
q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts job = q.enqueue(fixtures.access_self)
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
assert get_failed_queue(self.testconn).count == 0 # access_self calls get_current_job() and executes successfully
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_job_access_within_synchronous_job_function(self): def test_job_access_within_synchronous_job_function(self):
queue = Queue(is_async=False) queue = Queue(is_async=False)
@ -483,6 +498,48 @@ class TestJob(RQTestCase):
self.assertNotIn(job.id, queue.get_job_ids()) self.assertNotIn(job.id, queue.get_job_ids())
def test_job_delete_removes_itself_from_registries(self):
"""job.delete() should remove itself from job registries"""
connection = self.testconn
job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED,
connection=self.testconn, origin='default')
job.save()
registry = FailedJobRegistry(connection=self.testconn)
registry.add(job, 500)
job.delete()
self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED,
connection=self.testconn, origin='default')
job.save()
registry = FinishedJobRegistry(connection=self.testconn)
registry.add(job, 500)
job.delete()
self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED,
connection=self.testconn, origin='default')
job.save()
registry = StartedJobRegistry(connection=self.testconn)
registry.add(job, 500)
job.delete()
self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED,
connection=self.testconn, origin='default')
job.save()
registry = DeferredJobRegistry(connection=self.testconn)
registry.add(job, 500)
job.delete()
self.assertFalse(job in registry)
def test_job_with_dependents_delete_parent_with_saved(self): def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the """job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis.""" dependent job was saved, it will remain in redis."""
@ -574,31 +631,6 @@ class TestJob(RQTestCase):
cancel_job(job.id) cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(0, len(queue.get_jobs()))
def test_create_failed_and_cancel_job(self):
"""test creating and using cancel_job deletes job properly"""
failed_queue = get_failed_queue(connection=self.testconn)
job = failed_queue.enqueue(fixtures.say_hello)
job.set_status(JobStatus.FAILED)
self.assertEqual(1, len(failed_queue.get_jobs()))
cancel_job(job.id)
self.assertEqual(0, len(failed_queue.get_jobs()))
def test_create_and_requeue_job(self):
"""Requeueing existing jobs."""
job = Job.create(func=fixtures.div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEqual(get_failed_queue().count, 1)
requeued_job = requeue_job(job.id)
self.assertEqual(get_failed_queue().count, 0)
self.assertEqual(Queue('fake').count, 1)
self.assertEqual(requeued_job.origin, job.origin)
def test_dependents_key_for_should_return_prefixed_job_id(self): def test_dependents_key_for_should_return_prefixed_job_id(self):
"""test redis key to store job dependents hash under""" """test redis key to store job dependents hash under"""
job_id = 'random' job_id = 'random'

@ -3,11 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import (div_by_zero, echo, Number, say_hello, from tests.fixtures import echo, Number, say_hello
some_calculation)
from rq import get_failed_queue, Queue from rq import Queue
from rq.exceptions import InvalidJobDependency, InvalidJobOperationError from rq.exceptions import InvalidJobDependency
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
@ -189,73 +188,6 @@ class TestQueue(RQTestCase):
# ...and assert the queue count when down # ...and assert the queue count when down
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
def test_dequeue(self):
"""Dequeueing jobs from queues."""
# Set up
q = Queue()
result = q.enqueue(say_hello, 'Rick', foo='bar')
# Dequeue a job (not a job ID) off the queue
self.assertEqual(q.count, 1)
job = q.dequeue()
self.assertEqual(job.id, result.id)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, q.name)
self.assertEqual(job.args[0], 'Rick')
self.assertEqual(job.kwargs['foo'], 'bar')
# ...and assert the queue count when down
self.assertEqual(q.count, 0)
def test_dequeue_deleted_jobs(self):
"""Dequeueing deleted jobs from queues don't blow the stack."""
q = Queue()
for _ in range(1, 1000):
job = q.enqueue(say_hello)
job.delete()
q.dequeue()
def test_dequeue_instance_method(self):
"""Dequeueing instance method jobs from queues."""
q = Queue()
n = Number(2)
q.enqueue(n.div, 4)
job = q.dequeue()
# The instance has been pickled and unpickled, so it is now a separate
# object. Test for equality using each object's __dict__ instead.
self.assertEqual(job.instance.__dict__, n.__dict__)
self.assertEqual(job.func.__name__, 'div')
self.assertEqual(job.args, (4,))
def test_dequeue_class_method(self):
"""Dequeueing class method jobs from queues."""
q = Queue()
q.enqueue(Number.divide, 3, 4)
job = q.dequeue()
self.assertEqual(job.instance.__dict__, Number.__dict__)
self.assertEqual(job.func.__name__, 'divide')
self.assertEqual(job.args, (3, 4))
def test_dequeue_ignores_nonexisting_jobs(self):
"""Dequeuing silently ignores non-existing jobs."""
q = Queue()
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
q.push_job_id(uuid)
result = q.enqueue(say_hello, 'Nick', foo='bar')
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEqual(q.count, 4)
self.assertEqual(q.dequeue().id, result.id)
self.assertIsNone(q.dequeue())
self.assertEqual(q.count, 0)
def test_dequeue_any(self): def test_dequeue_any(self):
"""Fetching work from any given queue.""" """Fetching work from any given queue."""
fooq = Queue('foo') fooq = Queue('foo')
@ -319,12 +251,16 @@ class TestQueue(RQTestCase):
self.assertEqual(job.meta['foo'], 'bar') self.assertEqual(job.meta['foo'], 'bar')
self.assertEqual(job.meta['baz'], 42) self.assertEqual(job.meta['baz'], 42)
def test_enqueue_with_failure_ttl(self):
"""enqueue() properly sets job.failure_ttl"""
q = Queue()
job = q.enqueue(say_hello, failure_ttl=10)
job.refresh()
self.assertEqual(job.failure_ttl, 10)
def test_job_timeout(self): def test_job_timeout(self):
"""Timeout can be passed via job_timeout argument""" """Timeout can be passed via job_timeout argument"""
queue = Queue() queue = Queue()
job = queue.enqueue(echo, 1, timeout=15)
self.assertEqual(job.timeout, 15)
job = queue.enqueue(echo, 1, job_timeout=15) job = queue.enqueue(echo, 1, job_timeout=15)
self.assertEqual(job.timeout, 15) self.assertEqual(job.timeout, 15)
@ -572,160 +508,3 @@ class TestQueue(RQTestCase):
job_fetch = q1.fetch_job(job_orig.id) job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch) self.assertIsNotNone(job_fetch)
class TestFailedQueue(RQTestCase):
def test_get_failed_queue(self):
"""Use custom job class"""
class CustomJob(Job):
pass
failed_queue = get_failed_queue(job_class=CustomJob)
self.assertIs(failed_queue.job_class, CustomJob)
failed_queue = get_failed_queue(job_class='rq.job.Job')
self.assertIsNot(failed_queue.job_class, CustomJob)
def test_requeue_job(self):
"""Requeueing existing jobs."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEqual(get_failed_queue().count, 1)
requeued_job = get_failed_queue().requeue(job.id)
self.assertEqual(get_failed_queue().count, 0)
self.assertEqual(Queue('fake').count, 1)
self.assertEqual(requeued_job.origin, job.origin)
def test_get_job_on_failed_queue(self):
default_queue = Queue()
failed_queue = get_failed_queue()
job = default_queue.enqueue(div_by_zero, args=(1, 2, 3))
job_on_default_queue = default_queue.fetch_job(job.id)
job_on_failed_queue = failed_queue.fetch_job(job.id)
self.assertIsNotNone(job_on_default_queue)
self.assertIsNone(job_on_failed_queue)
job.set_status(JobStatus.FAILED)
job_on_default_queue = default_queue.fetch_job(job.id)
job_on_failed_queue = failed_queue.fetch_job(job.id)
self.assertIsNotNone(job_on_default_queue)
self.assertIsNotNone(job_on_failed_queue)
self.assertTrue(job_on_default_queue.is_failed)
def test_requeue_nonfailed_job_fails(self):
"""Requeueing non-failed jobs raises error."""
q = Queue()
job = q.enqueue(say_hello, 'Nick', foo='bar')
# Assert that we cannot requeue a job that's not on the failed queue
with self.assertRaises(InvalidJobOperationError):
get_failed_queue().requeue(job.id)
def test_quarantine_preserves_timeout(self):
"""Quarantine preserves job timeout."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.timeout = 200
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
self.assertEqual(job.timeout, 200)
def test_requeueing_preserves_timeout(self):
"""Requeueing preserves job timeout."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.timeout = 200
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEqual(job.timeout, 200)
def test_requeue_sets_status_to_queued(self):
"""Requeueing a job should set its status back to QUEUED."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl."""
q = Queue()
job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10)
self.assertEqual(job.result_ttl, 10)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(int(job_from_queue.result_ttl), 10)
def test_async_false(self):
"""Job executes and cleaned up immediately if is_async=False."""
q = Queue(is_async=False)
job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6)
self.assertNotEqual(self.testconn.ttl(job.key), -1)
def test_is_async(self):
"""Queue exposes is_async as a property."""
inline_queue = Queue(is_async=False)
self.assertFalse(inline_queue.is_async)
async_queue = Queue(is_async=True)
self.assertTrue(async_queue.is_async)
def test_custom_job_class(self):
"""Ensure custom job class assignment works as expected."""
q = Queue(job_class=CustomJob)
self.assertEqual(q.job_class, CustomJob)
def test_skip_queue(self):
"""Ensure the skip_queue option functions"""
q = Queue('foo')
job1 = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
assert q.dequeue() == job1
skip_job = q.enqueue(say_hello, at_front=True)
assert q.dequeue() == skip_job
assert q.dequeue() == job2
def test_job_deletion(self):
"""Ensure job.delete() removes itself from FailedQueue."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.timeout = 200
job.save()
job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error'))
self.assertTrue(job.id in failed_queue.get_job_ids())
job.delete()
self.assertFalse(job.id in failed_queue.get_job_ids())
def test_job_in_failed_queue_persists(self):
"""Make sure failed job key does not expire"""
q = Queue('foo')
job = q.enqueue(div_by_zero, args=(1,), ttl=5)
self.assertEqual(self.testconn.ttl(job.key), 5)
self.assertRaises(ZeroDivisionError, job.perform)
job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error'))
self.assertEqual(self.testconn.ttl(job.key), -1)

@ -2,12 +2,15 @@
from __future__ import absolute_import from __future__ import absolute_import
from rq.compat import as_text from rq.compat import as_text
from rq.job import Job, JobStatus from rq.defaults import DEFAULT_FAILURE_TTL
from rq.queue import FailedQueue, Queue from rq.exceptions import InvalidJobOperation
from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.registry import (clean_registries, DeferredJobRegistry, from rq.registry import (clean_registries, DeferredJobRegistry,
FinishedJobRegistry, StartedJobRegistry) FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello from tests.fixtures import div_by_zero, say_hello
@ -41,6 +44,19 @@ class TestRegistry(RQTestCase):
registry = StartedJobRegistry(job_class=CustomJob) registry = StartedJobRegistry(job_class=CustomJob)
self.assertFalse(registry.job_class == self.registry.job_class) self.assertFalse(registry.job_class == self.registry.job_class)
def test_contains(self):
registry = StartedJobRegistry(connection=self.testconn)
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.assertFalse(job in registry)
self.assertFalse(job.id in registry)
registry.add(job, 5)
self.assertTrue(job in registry)
self.assertTrue(job.id in registry)
def test_add_and_remove(self): def test_add_and_remove(self):
"""Adding and removing job to StartedJobRegistry.""" """Adding and removing job to StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()
@ -78,23 +94,22 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar']) ['foo', 'bar'])
def test_cleanup(self): def test_cleanup_moves_jobs_to_failed_job_registry(self):
"""Moving expired jobs to FailedQueue.""" """Moving expired jobs to FailedJobRegistry."""
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello) job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, {job.id: 2}) self.testconn.zadd(self.registry.key, {job.id: 2})
# Job has not been moved to FailedJobRegistry
self.registry.cleanup(1) self.registry.cleanup(1)
self.assertNotIn(job.id, failed_queue.job_ids) self.assertNotIn(job, failed_job_registry)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2) self.assertIn(job, self.registry)
self.registry.cleanup() self.registry.cleanup()
self.assertIn(job.id, failed_queue.job_ids) self.assertIn(job.id, failed_job_registry)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None) self.assertNotIn(job, self.registry)
job.refresh() job.refresh()
self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertEqual(job.get_status(), JobStatus.FAILED)
@ -158,9 +173,22 @@ class TestRegistry(RQTestCase):
started_job_registry = StartedJobRegistry(connection=self.testconn) started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, {'foo': 1}) self.testconn.zadd(started_job_registry.key, {'foo': 1})
failed_job_registry = FailedJobRegistry(connection=self.testconn)
self.testconn.zadd(failed_job_registry.key, {'foo': 1})
clean_registries(queue) clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
def test_get_queue(self):
"""registry.get_queue() returns the right Queue object."""
registry = StartedJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_queue(), Queue(connection=self.testconn))
registry = StartedJobRegistry('foo', connection=self.testconn)
self.assertEqual(registry.get_queue(),
Queue('foo', connection=self.testconn))
class TestFinishedJobRegistry(RQTestCase): class TestFinishedJobRegistry(RQTestCase):
@ -225,7 +253,7 @@ class TestDeferredRegistry(RQTestCase):
self.assertEqual(job_ids, [job.id]) self.assertEqual(job_ids, [job.id])
def test_register_dependency(self): def test_register_dependency(self):
"""Ensure job creation and deletion works properly with DeferredJobRegistry.""" """Ensure job creation and deletion works with DeferredJobRegistry."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello) job = queue.enqueue(say_hello)
job2 = queue.enqueue(say_hello, depends_on=job) job2 = queue.enqueue(say_hello, depends_on=job)
@ -236,3 +264,119 @@ class TestDeferredRegistry(RQTestCase):
# When deleted, job removes itself from DeferredJobRegistry # When deleted, job removes itself from DeferredJobRegistry
job2.delete() job2.delete()
self.assertEqual(registry.get_job_ids(), []) self.assertEqual(registry.get_job_ids(), [])
class TestFailedJobRegistry(RQTestCase):
def test_default_failure_ttl(self):
"""Job TTL defaults to DEFAULT_FAILURE_TTL"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
registry = FailedJobRegistry(connection=self.testconn)
key = registry.key
timestamp = current_timestamp()
registry.add(job)
self.assertLess(
self.testconn.zscore(key, job.id),
timestamp + DEFAULT_FAILURE_TTL + 2
)
self.assertGreater(
self.testconn.zscore(key, job.id),
timestamp + DEFAULT_FAILURE_TTL - 2
)
timestamp = current_timestamp()
ttl = 5
registry.add(job, ttl=5)
self.assertLess(
self.testconn.zscore(key, job.id),
timestamp + ttl + 2
)
self.assertGreater(
self.testconn.zscore(key, job.id),
timestamp + ttl - 2
)
def test_requeue(self):
"""FailedJobRegistry.requeue works properly"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(div_by_zero, failure_ttl=5)
worker = Worker([queue])
worker.work(burst=True)
registry = FailedJobRegistry(connection=worker.connection)
self.assertTrue(job in registry)
registry.requeue(job.id)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# Should also work with job instance
registry.requeue(job)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# requeue_job should work the same way
requeue_job(job.id, connection=self.testconn)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# And so does job.requeue()
job.requeue()
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_invalid_job(self):
"""Requeuing a job that's not in FailedJobRegistry raises an error."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
registry = FailedJobRegistry(connection=self.testconn)
with self.assertRaises(InvalidJobOperation):
registry.requeue(job)
def test_worker_handle_job_failure(self):
"""Failed jobs are added to FailedJobRegistry"""
q = Queue(connection=self.testconn)
w = Worker([q])
registry = FailedJobRegistry(connection=w.connection)
timestamp = current_timestamp()
job = q.enqueue(div_by_zero, failure_ttl=5)
w.handle_job_failure(job)
# job is added to FailedJobRegistry with default failure ttl
self.assertIn(job.id, registry.get_job_ids())
self.assertLess(self.testconn.zscore(registry.key, job.id),
timestamp + DEFAULT_FAILURE_TTL + 5)
# job is added to FailedJobRegistry with specified ttl
job = q.enqueue(div_by_zero, failure_ttl=5)
w.handle_job_failure(job)
self.assertLess(self.testconn.zscore(registry.key, job.id),
timestamp + 7)

@ -2,10 +2,17 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from rq import get_failed_queue, Queue, Worker from rq import Queue
from rq.cli import main
from rq.cli.helpers import read_config_file
from rq.contrib.sentry import register_sentry from rq.contrib.sentry import register_sentry
from rq.worker import SimpleWorker
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero
import mock
from click.testing import CliRunner
class FakeSentry(object): class FakeSentry(object):
@ -17,20 +24,33 @@ class FakeSentry(object):
class TestSentry(RQTestCase): class TestSentry(RQTestCase):
def test_work_fails(self): def setUp(self):
"""Non importable jobs should be put on the failed queue event with sentry""" super(TestSentry, self).setUp()
q = Queue() db_num = self.testconn.connection_pool.connection_kwargs['db']
failed_q = get_failed_queue() self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
# Action def test_reading_dsn_from_file(self):
q.enqueue('_non.importable.job') settings = read_config_file('tests.config_files.sentry')
self.assertEqual(q.count, 1) self.assertIn('SENTRY_DSN', settings)
self.assertEqual(settings['SENTRY_DSN'], 'https://123@sentry.io/123')
w = Worker([q])
register_sentry(FakeSentry(), w) @mock.patch('rq.contrib.sentry.register_sentry')
def test_cli_flag(self, mocked):
w.work(burst=True) """rq worker -u <url> -b --exception-handler <handler>"""
# connection = Redis.from_url(self.redis_url)
# Postconditions runner = CliRunner()
self.assertEqual(failed_q.count, 1) runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
self.assertEqual(q.count, 0) '--sentry-dsn', 'https://1@sentry.io/1'])
self.assertEqual(mocked.call_count, 1)
def test_failure_capture(self):
"""Test failure is captured by Sentry SDK"""
from sentry_sdk import Hub
hub = Hub.current
self.assertIsNone(hub.last_event_id())
queue = Queue(connection=self.testconn)
queue.enqueue(div_by_zero)
worker = SimpleWorker(queues=[queue], connection=self.testconn)
register_sentry('https://123@sentry.io/123')
worker.work(burst=True)
self.assertIsNotNone(hub.last_event_id())

@ -27,11 +27,10 @@ from tests.fixtures import (
modify_self_and_error, long_running_job, save_key_ttl modify_self_and_error, long_running_job, save_key_ttl
) )
from rq import (get_failed_queue, Queue, SimpleWorker, Worker, from rq import Queue, SimpleWorker, Worker, get_current_connection
get_current_connection)
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow from rq.utils import utcnow
from rq.worker import HerokuWorker, WorkerStatus from rq.worker import HerokuWorker, WorkerStatus
@ -147,6 +146,8 @@ class TestWorker(RQTestCase):
Worker.find_by_key(worker.key) Worker.find_by_key(worker.key)
self.assertFalse(worker.key in Worker.all_keys(worker.connection)) self.assertFalse(worker.key in Worker.all_keys(worker.connection))
self.assertRaises(ValueError, Worker.find_by_key, 'foo')
def test_worker_ttl(self): def test_worker_ttl(self):
"""Worker ttl.""" """Worker ttl."""
w = Worker([]) w = Worker([])
@ -197,17 +198,14 @@ class TestWorker(RQTestCase):
) )
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue.""" """Unreadable jobs are put on the failed job registry."""
q = Queue() q = Queue()
failed_q = get_failed_queue()
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
# NOTE: We have to fake this enqueueing for this test case. # NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job.create(func=div_by_zero, args=(3,)) job = Job.create(func=div_by_zero, args=(3,), origin=q.name)
job.save() job.save()
job_data = job.data job_data = job.data
@ -225,16 +223,21 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
w.work(burst=True) # should silently pass w.work(burst=True) # should silently pass
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
failed_job_registry = FailedJobRegistry(queue=q)
self.assertTrue(job in failed_job_registry)
def test_heartbeat(self): def test_heartbeat(self):
"""Heartbeat saves last_heartbeat""" """Heartbeat saves last_heartbeat"""
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
w.register_birth() w.register_birth()
birth = self.testconn.hget(w.key, 'birth')
self.assertEqual(str(w.pid), as_text(self.testconn.hget(w.key, 'pid')))
self.assertEqual(w.hostname,
as_text(self.testconn.hget(w.key, 'hostname')))
last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat') last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat')
self.assertTrue(birth is not None) self.assertIsNotNone(self.testconn.hget(w.key, 'birth'))
self.assertTrue(last_heartbeat is not None) self.assertTrue(last_heartbeat is not None)
w = Worker.find_by_key(w.key) w = Worker.find_by_key(w.key)
self.assertIsInstance(w.last_heartbeat, datetime) self.assertIsInstance(w.last_heartbeat, datetime)
@ -268,10 +271,6 @@ class TestWorker(RQTestCase):
def test_work_fails(self): def test_work_fails(self):
"""Failing jobs are put on the failed queue.""" """Failing jobs are put on the failed queue."""
q = Queue() q = Queue()
failed_q = get_failed_queue()
# Preconditions
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
# Action # Action
@ -286,7 +285,8 @@ class TestWorker(RQTestCase):
# Postconditions # Postconditions
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1) failed_job_registry = FailedJobRegistry(queue=q)
self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None) self.assertEqual(w.get_current_job_id(), None)
# Check the job # Check the job
@ -296,65 +296,119 @@ class TestWorker(RQTestCase):
# Should be the original enqueued_at date, not the date of enqueueing # Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue # to the failed queue
self.assertEqual(str(job.enqueued_at), enqueued_at_date) self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info self.assertTrue(job.exc_info) # should contain exc_info
def test_statistics(self): def test_statistics(self):
"""Successful and failed job counts are saved properly""" """Successful and failed job counts are saved properly"""
q = Queue() queue = Queue()
job = q.enqueue(div_by_zero) job = queue.enqueue(div_by_zero)
w = Worker([q]) worker = Worker([queue])
w.register_birth() worker.register_birth()
self.assertEqual(w.failed_job_count, 0) self.assertEqual(worker.failed_job_count, 0)
self.assertEqual(w.successful_job_count, 0) self.assertEqual(worker.successful_job_count, 0)
self.assertEqual(w.total_working_time, 0) self.assertEqual(worker.total_working_time, 0)
registry = StartedJobRegistry(connection=w.connection) registry = StartedJobRegistry(connection=worker.connection)
job.started_at = utcnow() job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75) job.ended_at = job.started_at + timedelta(seconds=0.75)
w.handle_job_failure(job) worker.handle_job_failure(job)
w.handle_job_success(job, q, registry) worker.handle_job_success(job, queue, registry)
worker.refresh()
self.assertEqual(worker.failed_job_count, 1)
self.assertEqual(worker.successful_job_count, 1)
self.assertEqual(worker.total_working_time, 1.5) # 1.5 seconds
worker.handle_job_failure(job)
worker.handle_job_success(job, queue, registry)
worker.refresh()
self.assertEqual(worker.failed_job_count, 2)
self.assertEqual(worker.successful_job_count, 2)
self.assertEqual(worker.total_working_time, 3.0)
def test_total_working_time(self):
"""worker.total_working_time is stored properly"""
queue = Queue()
job = queue.enqueue(long_running_job, 0.05)
worker = Worker([queue])
worker.register_birth()
w.refresh() worker.perform_job(job, queue)
self.assertEqual(w.failed_job_count, 1) worker.refresh()
self.assertEqual(w.successful_job_count, 1) # total_working_time should be around 0.05 seconds
self.assertEqual(w.total_working_time, 1500000) # 1.5 seconds in microseconds self.assertTrue(0.05 <= worker.total_working_time < 0.06)
w.handle_job_failure(job) def test_disable_default_exception_handler(self):
w.handle_job_success(job, q, registry) """
Job is not moved to FailedJobRegistry when default custom exception
handler is disabled.
"""
queue = Queue(name='default', connection=self.testconn)
w.refresh() job = queue.enqueue(div_by_zero)
self.assertEqual(w.failed_job_count, 2) worker = Worker([queue], disable_default_exception_handler=False)
self.assertEqual(w.successful_job_count, 2) worker.work(burst=True)
self.assertEqual(w.total_working_time, 3000000)
registry = FailedJobRegistry(queue=queue)
self.assertTrue(job in registry)
# Job is not added to FailedJobRegistry if
# disable_default_exception_handler is True
job = queue.enqueue(div_by_zero)
worker = Worker([queue], disable_default_exception_handler=True)
worker.work(burst=True)
self.assertFalse(job in registry)
def test_custom_exc_handling(self): def test_custom_exc_handling(self):
"""Custom exception handling.""" """Custom exception handling."""
def first_handler(job, *exc_info):
job.meta = {'first_handler': True}
job.save_meta()
return True
def second_handler(job, *exc_info):
job.meta.update({'second_handler': True})
job.save_meta()
def black_hole(job, *exc_info): def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue) # Don't fall through to default behaviour (moving to failed queue)
return False return False
q = Queue() q = Queue()
failed_q = get_failed_queue()
# Preconditions
self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
job = q.enqueue(div_by_zero)
w = Worker([q], exception_handlers=first_handler)
w.work(burst=True)
# Check the job
job.refresh()
self.assertEqual(job.is_failed, True)
self.assertTrue(job.meta['first_handler'])
# Action
job = q.enqueue(div_by_zero) job = q.enqueue(div_by_zero)
self.assertEqual(q.count, 1) w = Worker([q], exception_handlers=[first_handler, second_handler])
w.work(burst=True)
w = Worker([q], exception_handlers=black_hole) # Both custom exception handlers are run
w.work(burst=True) # should silently pass job.refresh()
self.assertEqual(job.is_failed, True)
self.assertTrue(job.meta['first_handler'])
self.assertTrue(job.meta['second_handler'])
# Postconditions job = q.enqueue(div_by_zero)
self.assertEqual(q.count, 0) w = Worker([q], exception_handlers=[first_handler, black_hole,
self.assertEqual(failed_q.count, 0) second_handler])
w.work(burst=True)
# Check the job # second_handler is not run since it's interrupted by black_hole
job = Job.fetch(job.id) job.refresh()
self.assertEqual(job.is_failed, True) self.assertEqual(job.is_failed, True)
self.assertTrue(job.meta['first_handler'])
self.assertEqual(job.meta.get('second_handler'), None)
def test_cancelled_jobs_arent_executed(self): def test_cancelled_jobs_arent_executed(self):
"""Cancelling jobs.""" """Cancelling jobs."""
@ -691,6 +745,12 @@ class TestWorker(RQTestCase):
self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0)
# worker.clean_registries() only runs once every 15 minutes
# If we add another key, calling clean_registries() should do nothing
self.testconn.zadd(bar_registry.key, {'bar': 1})
worker.clean_registries()
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
def test_should_run_maintenance_tasks(self): def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour.""" """Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
@ -771,7 +831,6 @@ class TestWorker(RQTestCase):
the job itself persists completely through the the job itself persists completely through the
queue/worker/job stack -- even if the job errored""" queue/worker/job stack -- even if the job errored"""
q = Queue() q = Queue()
failed_q = get_failed_queue()
# Also make sure that previously existing metadata # Also make sure that previously existing metadata
# persists properly # persists properly
job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42}, job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42},
@ -782,7 +841,8 @@ class TestWorker(RQTestCase):
# Postconditions # Postconditions
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1) failed_job_registry = FailedJobRegistry(queue=q)
self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None) self.assertEqual(w.get_current_job_id(), None)
job_check = Job.fetch(job.id) job_check = Job.fetch(job.id)
@ -927,8 +987,6 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
completing the job) should set the job's status to FAILED completing the job) should set the job's status to FAILED
""" """
fooq = Queue('foo') fooq = Queue('foo')
failed_q = get_failed_queue()
self.assertEqual(failed_q.count, 0)
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
w = Worker(fooq) w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death' sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
@ -943,7 +1001,8 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
job_status = job.get_status() job_status = job.get_status()
p.join(1) p.join(1)
self.assertEqual(job_status, JobStatus.FAILED) self.assertEqual(job_status, JobStatus.FAILED)
self.assertEqual(failed_q.count, 1) failed_job_registry = FailedJobRegistry(queue=fooq)
self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
@ -966,18 +1025,20 @@ class TestWorkerSubprocess(RQTestCase):
def test_run_access_self(self): def test_run_access_self(self):
"""Schedule a job, then run the worker as subprocess""" """Schedule a job, then run the worker as subprocess"""
q = Queue() q = Queue()
q.enqueue(access_self) job = q.enqueue(access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
assert get_failed_queue().count == 0 registry = FinishedJobRegistry(queue=q)
self.assertTrue(job in registry)
assert q.count == 0 assert q.count == 0
@skipIf('pypy' in sys.version.lower(), 'often times out with pypy') @skipIf('pypy' in sys.version.lower(), 'often times out with pypy')
def test_run_scheduled_access_self(self): def test_run_scheduled_access_self(self):
"""Schedule a job that schedules a job, then run the worker as subprocess""" """Schedule a job that schedules a job, then run the worker as subprocess"""
q = Queue() q = Queue()
q.enqueue(schedule_access_self) job = q.enqueue(schedule_access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
assert get_failed_queue().count == 0 registry = FinishedJobRegistry(queue=q)
self.assertTrue(job in registry)
assert q.count == 0 assert q.count == 0
@ -1082,7 +1143,6 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
super(TestExceptionHandlerMessageEncoding, self).setUp() super(TestExceptionHandlerMessageEncoding, self).setUp()
self.worker = Worker("foo") self.worker = Worker("foo")
self.worker._exc_handlers = [] self.worker._exc_handlers = []
self.worker.failed_queue = Mock()
# Mimic how exception info is actually passed forwards # Mimic how exception info is actually passed forwards
try: try:
raise Exception(u"💪") raise Exception(u"💪")
@ -1092,7 +1152,3 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
def test_handle_exception_handles_non_ascii_in_exception_message(self): def test_handle_exception_handles_non_ascii_in_exception_message(self):
"""worker.handle_exception doesn't crash on non-ascii in exception message.""" """worker.handle_exception doesn't crash on non-ascii in exception message."""
self.worker.handle_exception(Mock(), *self.exc_info) self.worker.handle_exception(Mock(), *self.exc_info)
def test_move_to_failed_queue_handles_non_ascii_in_exception_message(self):
"""Test that move_to_failed_queue doesn't crash on non-ascii in exception message."""
self.worker.move_to_failed_queue(Mock(), *self.exc_info)

@ -1,7 +1,8 @@
from tests import RQTestCase from tests import RQTestCase
from rq import Queue, Worker from rq import Queue, Worker
from rq.worker_registration import (get_keys, register, unregister, from rq.worker_registration import (clean_worker_registry, get_keys, register,
unregister, REDIS_WORKER_KEYS,
WORKERS_BY_QUEUE_KEY) WORKERS_BY_QUEUE_KEY)
@ -17,12 +18,15 @@ class TestWorkerRegistry(RQTestCase):
redis = worker.connection redis = worker.connection
self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertEqual(Worker.count(connection=redis), 1)
self.assertTrue( self.assertTrue(
redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)
) )
self.assertEqual(Worker.count(queue=foo_queue), 1)
self.assertTrue( self.assertTrue(
redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)
) )
self.assertEqual(Worker.count(queue=bar_queue), 1)
unregister(worker) unregister(worker)
self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
@ -68,3 +72,18 @@ class TestWorkerRegistry(RQTestCase):
unregister(worker1) unregister(worker1)
unregister(worker2) unregister(worker2)
unregister(worker3) unregister(worker3)
def test_clean_registry(self):
"""clean_registry removes worker keys that don't exist in Redis"""
queue = Queue(name='foo')
worker = Worker([queue])
register(worker)
redis = worker.connection
self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertTrue(redis.sismember(REDIS_WORKER_KEYS, worker.key))
clean_worker_registry(queue)
self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key))
self.assertFalse(redis.sismember(REDIS_WORKER_KEYS, worker.key))

Loading…
Cancel
Save