diff --git a/.gitignore b/.gitignore index b218e08..bf5269f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,16 +10,13 @@ /dist /build .tox +.pytest_cache/ .vagrant Vagrantfile .idea/ .coverage* /.cache -<<<<<<< HEAD -.pytest_cache/ -======= Gemfile Gemfile.lock _site/ ->>>>>>> 600be0a... Added anchor.js to docs diff --git a/.travis.yml b/.travis.yml index 80722b9..53b33d2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,9 +11,7 @@ python: - "pypy" install: - pip install -e . - - pip install pytest-cov - # - pip install coveralls - - pip install codecov + - pip install pytest-cov sentry-sdk codecov #- pip install pytest # installed by Travis by default already script: - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 diff --git a/CHANGES.md b/CHANGES.md index 1488693..21c27c3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,31 @@ +### 1.0 (Not Yet Released) +Backward incompatible changes: + +- `job.status` has been removed. Use `job.get_status()` and `job.set_status()` instead. Thanks @selwin! + +- `FailedQueue` has been replaced with `FailedJobRegistry`: + * `get_failed_queue()` function has been removed. Please use `FailedJobRegistry(queue=queue)` instead. + * `move_to_failed_queue()` has been removed. + * RQ now provides a mechanism to automatically cleanup failed jobs. By default, failed jobs are kept for 1 year. + * Thanks @selwin! + +- RQ's custom job exception handling mechanism has also changed slightly: + * RQ's default exception handling mechanism (moving jobs to `FailedJobRegistry`) can be disabled by doing `Worker(disable_default_exception_handler=True)`. + * Custom exception handlers are no longer executed in reverse order. + * Thanks @selwin! + +- `Worker` names are now randomized. Thanks @selwin! + +- `timeout` argument on `queue.enqueue()` has been deprecated in favor of `job_timeout`. Thanks @selwin! + +- Sentry integration has been reworked: + * RQ now uses the new [sentry-sdk](https://pypi.org/project/sentry-sdk/) in place of the deprecated [Raven](https://pypi.org/project/raven/) library + * RQ will look for the more explicit `RQ_SENTRY_DSN` environment variable instead of `SENTRY_DSN` before instantiating Sentry integration + * Thanks @selwin! + +- Fixed `Worker.total_working_time` accounting bug. Thanks @selwin! + + ### 0.13.0 (2018-12-11) - Compatibility with Redis 3.0. Thanks @dash-rai! - Added `job_timeout` argument to `queue.enqueue()`. This argument will eventually replace `timeout` argument. Thanks @selwin! diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md index 913da70..d9b1827 100644 --- a/docs/docs/exceptions.md +++ b/docs/docs/exceptions.md @@ -6,27 +6,25 @@ layout: docs Jobs can fail due to exceptions occurring. When your RQ workers run in the background, how do you get notified of these exceptions? -## Default: the `failed` queue +## Default: the `FailedJobRegistry` -The default safety net for RQ is the `failed` queue. Every job that fails -execution is stored in here, along with its exception information (type, +The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't +execute successfully is stored here, along with its exception information (type, value, traceback). While this makes sure no failing jobs "get lost", this is of no use to get notified pro-actively about job failure. -## Custom exception handlers +## Custom Exception Handlers -Starting from version 0.3.1, RQ supports registering custom exception -handlers. This makes it possible to replace the default behaviour (sending -the job to the `failed` queue) altogether, or to take additional steps when an -exception occurs. +RQ supports registering custom exception handlers. This makes it possible to +inject your own error handling logic to your workers. This is how you register custom exception handler(s) to an RQ worker: ```python -from rq.handlers import move_to_failed_queue # RQ's default exception handler +from exception_handlers import foo_handler, bar_handler -w = Worker([q], exception_handlers=[my_handler, move_to_failed_queue]) +w = Worker([q], exception_handlers=[foo_handler, bar_handler]) ``` The handler itself is a function that takes the following parameters: `job`, @@ -46,11 +44,19 @@ def my_handler(job, *exc_info): # do custom things here ``` -## Chaining exception handlers +{% highlight python %} +from exception_handlers import foo_handler + +w = Worker([q], exception_handlers=[foo_handler], + disable_default_exception_handler=True) +{% endhighlight %} + + +## Chaining Exception Handlers The handler itself is responsible for deciding whether or not the exception handling is done, or should fall through to the next handler on the stack. -The handler can indicate this by returning a boolean. `False` means stop +The handler can indicate this by returning a boolean. `False` means stop processing exceptions, `True` means continue and fall through to the next exception handler on the stack. @@ -58,7 +64,7 @@ It's important to know for implementors that, by default, when the handler doesn't have an explicit return value (thus `None`), this will be interpreted as `True` (i.e. continue with the next handler). -To replace the default behaviour (i.e. moving the job to the `failed` queue), +To prevent the next exception handler in the handler chain from executing, use a custom exception handler that doesn't fall through, for example: ```python diff --git a/docs/docs/index.md b/docs/docs/index.md index 28b900a..d8c166a 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -61,7 +61,7 @@ In addition, you can add a few options to modify the behaviour of the queued job. By default, these are popped out of the kwargs that will be passed to the job function. -* `timeout` specifies the maximum runtime of the job before it's interrupted +* `job_timeout` specifies the maximum runtime of the job before it's interrupted and marked as `failed`. Its default unit is second and it can be an integer or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second(e.g. `'1h'`, `'3m'`, `'5s'`). * `result_ttl` specifies the expiry time of the key where the job result will be stored @@ -72,8 +72,9 @@ job function. * `job_id` allows you to manually specify this job's `job_id` * `at_front` will place the job at the *front* of the queue, instead of the back +* `description` to add additional description to enqueued jobs. * `kwargs` and `args` lets you bypass the auto-pop of these arguments, ie: - specify a `timeout` argument for the underlying job function. + specify a `description` argument for the underlying job function. In the last case, it may be advantageous to instead use the explicit version of `.enqueue()`, `.enqueue_call()`: @@ -82,7 +83,7 @@ In the last case, it may be advantageous to instead use the explicit version of q = Queue('low', connection=redis_conn) q.enqueue_call(func=count_words_at_url, args=('http://nvie.com',), - timeout=30) + job_timeout=30) ``` For cases where the web process doesn't have access to the source code running diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index 234b4a7..46b8546 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -88,31 +88,52 @@ job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43) ## Failed Jobs -If a job fails and raises an exception, the worker will put the job in a failed job queue. -On the Job instance, the `is_failed` property will be true. To fetch all failed jobs, scan -through the `get_failed_queue()` queue. +If a job fails during execution, the worker will put the job in a FailedJobRegistry. +On the Job instance, the `is_failed` property will be true. FailedJobRegistry +can be accessed through `queue.failed_job_registry`. ```python from redis import StrictRedis -from rq import push_connection, get_failed_queue, Queue +from rq import Queue from rq.job import Job -con = StrictRedis() -push_connection(con) - def div_by_zero(x): return x / 0 -job = Job.create(func=div_by_zero, args=(1, 2, 3)) -job.origin = 'fake' -job.save() -fq = get_failed_queue() -fq.quarantine(job, Exception('Some fake error')) -assert fq.count == 1 -fq.requeue(job.id) +connection = StrictRedis() +queue = Queue(connection=connection) +job = queue.enqueue(div_by_zero, 1) +registry = queue.failed_job_registry + +worker = Worker([queue]) +worker.work(burst=True) + +assert len(registry) == 1 # Failed jobs are kept in FailedJobRegistry + +registry.requeue(job) # Puts job back in its original queue + +assert len(registry) == 0 + +assert queue.count == 1 +``` + +By default, failed jobs are kept for 1 year. You can change this by specifying +`failure_ttl` (in seconds) when enqueueing jobs. + +```python +job = queue.enqueue(foo_job, failure_ttl=300) # 5 minutes in seconds +``` + +## Requeueing Failed Jobs + +RQ also provides a CLI tool that makes requeueing failed jobs easy. + +```console +# This will requeue foo_job_id and bar_job_id from myqueue's failed job registry +rq requeue --queue myqueue -u redis://localhost:6379 foo_job_id bar_job_id -assert fq.count == 0 -assert Queue('fake').count == 1 +# This command will requeue all jobs in myqueue's failed job registry +rq requeue --queue myqueue -u redis://localhost:6379 --all ``` diff --git a/docs/docs/results.md b/docs/docs/results.md index 26087d6..5228104 100644 --- a/docs/docs/results.md +++ b/docs/docs/results.md @@ -68,7 +68,7 @@ This makes it possible to inspect and interpret the problem manually and possibly resubmit the job. -## Dealing with interruption +## Dealing With Interruptions When workers get killed in the polite way (Ctrl+C or `kill`), RQ tries hard not to lose any work. The current work is finished after which the worker will @@ -83,7 +83,7 @@ damage. Just sayin'. -## Dealing with job timeouts +## Dealing With Job Timeouts By default, jobs should execute within 180 seconds. After that, the worker kills the work horse and puts the job onto the `failed` queue, indicating the @@ -95,7 +95,7 @@ can be loosened (or tightened), by specifying it as a keyword argument to the ```python q = Queue() -q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins +q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, job_timeout=600) # 10 mins ``` You can also change the default timeout for jobs that are enqueued via specific @@ -108,7 +108,7 @@ high = Queue('high', default_timeout=8) # 8 secs low = Queue('low', default_timeout=600) # 10 mins # Individual jobs can still override these defaults -low.enqueue(really_really_slow, timeout=3600) # 1 hr +low.enqueue(really_really_slow, job_timeout=3600) # 1 hr ``` Individual jobs can still specify an alternative timeout, as workers will diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 64888ff..877788b 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -8,7 +8,7 @@ solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes. -## Starting workers +## Starting Workers To start crunching work, simply start a worker from the root of your project directory: @@ -30,7 +30,7 @@ concurrent processing going on. If you want to perform jobs concurrently, simply start more workers. -### Burst mode +### Burst Mode By default, workers will start working immediately and will block and wait for new work when they run out of work. Workers can also be started in _burst @@ -50,7 +50,7 @@ This can be useful for batch work that needs to be processed periodically, or just to scale up your workers temporarily during peak periods. -### Worker arguments +### Worker Arguments In addition to `--burst`, `rq worker` also accepts these arguments: @@ -67,7 +67,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: ## Inside the worker -### The worker life-cycle +### The Worker Lifecycle The life-cycle of a worker consists of a few phases: @@ -86,11 +86,11 @@ The life-cycle of a worker consists of a few phases: 7. _Cleanup job execution_. The worker sets its status to `idle` and sets both the job and its result to expire based on `result_ttl`. Job is also removed from `StartedJobRegistry` and added to to `FinishedJobRegistry` in the case - of successful execution, or `FailedQueue` in the case of failure. + of successful execution, or `FailedJobRegistry` in the case of failure. 8. _Loop_. Repeat from step 3. -## Performance notes +## Performance Notes Basically the `rq worker` shell script is a simple fetch-fork-execute loop. When a lot of your jobs do lengthy setups, or they all depend on the same set @@ -124,17 +124,29 @@ with Connection(): ``` -### Worker names +### Worker Names -Workers are registered to the system under their names, see [monitoring][m]. -By default, the name of a worker is equal to the concatenation of the current -hostname and the current PID. To override this default, specify the name when -starting the worker, using the `--name` option. +Workers are registered to the system under their names, which are generated +randomly during instantiation (see [monitoring][m]). To override this default, +specify the name when starting the worker, or use the `--name` cli option. + +{% highlight python %} +from redis import Redis +from rq import Queue, Worker + +redis = Redis() +queue = Queue('queue_name') + +# Start a worker with a custom name +worker = Worker([queue], connection=redis, name='foo') +{% endhighlight %} [m]: /docs/monitoring/ -### Retrieving worker information +### Retrieving Worker Information + +_Updated in version 0.10.0._ `Worker` instances store their runtime information in Redis. Here's how to retrieve them: @@ -150,11 +162,25 @@ workers = Worker.all(connection=redis) # Returns all workers in this queue (new in version 0.10.0) queue = Queue('queue_name') workers = Worker.all(queue=queue) +worker = workers[0] +print(worker.name) ``` +Aside from `worker.name`, worker also have the following properties: +* `hostname` - the host where this worker is run +* `pid` - worker's process ID +* `queues` - queues on which this worker is listening for jobs +* `state` - possible states are `suspended`, `started`, `busy` and `idle` +* `current_job` - the job it's currently executing (if any) +* `last_heartbeat` - the last time this worker was seen +* `birth_date` - time of worker's instantiation +* `successful_job_count` - number of jobs finished successfully +* `failed_job_count` - number of failed jobs processed +* `total_working_time` - amount of time spent executing jobs, in seconds + _New in version 0.10.0._ -If you only want to know the number of workers for monitoring purposes, using +If you only want to know the number of workers for monitoring purposes, `Worker.count()` is much more performant. ```python @@ -172,7 +198,7 @@ workers = Worker.all(queue=queue) ``` -### Worker statistics +### Worker Statistics _New in version 0.9.0._ @@ -184,12 +210,12 @@ from rq.worker import Worker worker = Worker.find_by_key('rq:worker:name') worker.successful_job_count # Number of jobs finished successfully -worker.failed_job_count. # Number of failed jobs processed by this worker -worker.total_working_time # Number of time spent executing jobs +worker.failed_job_count # Number of failed jobs processed by this worker +worker.total_working_time # Amount of time spent executing jobs (in seconds) ``` -## Taking down workers +## Taking Down Workers If, at any time, the worker receives `SIGINT` (via Ctrl+C) or `SIGTERM` (via `kill`), the worker wait until the currently running task is finished, stop @@ -200,9 +226,7 @@ worker will forcefully terminate the child process (sending it `SIGKILL`), but will still try to register its own death. -## Using a config file - -_New in version 0.3.2._ +## Using a Config File If you'd like to configure `rq worker` via a configuration file instead of through command line arguments, you can do this by creating a Python file like @@ -240,9 +264,7 @@ $ rq worker -c settings ``` -## Custom worker classes - -_New in version 0.4.0._ +## Custom Worker Classes There are times when you want to customize the worker's behavior. Some of the more common requests so far are: @@ -259,9 +281,7 @@ $ rq worker -w 'path.to.GeventWorker' ``` -## Custom Job and Queue classes - -_Will be available in next release._ +## Custom Job and Queue Classes You can tell the worker to use a custom class for jobs and queues using `--job-class` and/or `--queue-class`. @@ -289,7 +309,7 @@ queue.enqueue(some_func) ``` -## Custom DeathPenalty classes +## Custom DeathPenalty Classes When a Job times-out, the worker will try to kill it using the supplied `death_penalty_class` (default: `UnixSignalDeathPenalty`). This can be overridden @@ -299,9 +319,7 @@ DeathPenalty classes are constructed with the following arguments `BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)` -## Custom exception handlers - -_New in version 0.5.5._ +## Custom Exception Handlers If you need to handle errors differently for different types of jobs, or simply want to customize RQ's default error handling behavior, run `rq worker` using the `--exception-handler` option: @@ -312,3 +330,9 @@ $ rq worker --exception-handler 'path.to.my.ErrorHandler' # Multiple exception handlers is also supported $ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler' ``` + +If you want to disable RQ's default exception handler, use the `--disable-default-exception-handler` option: + +```console +$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler +``` diff --git a/docs/patterns/sentry.md b/docs/patterns/sentry.md index ecce264..7dab4dc 100644 --- a/docs/patterns/sentry.md +++ b/docs/patterns/sentry.md @@ -3,45 +3,47 @@ title: "RQ: Sending exceptions to Sentry" layout: patterns --- -## Sending exceptions to Sentry - -[Sentry](https://www.getsentry.com/) is a popular exception gathering service -that RQ supports integrating with since version 0.3.1, through its custom -exception handlers. - -RQ includes a convenience function that registers your existing Sentry client -to send all exceptions to. - -An example: - -{% highlight python %} -from raven import Client -from raven.transport.http import HTTPTransport -from rq.contrib.sentry import register_sentry - -client = Client('', transport=HTTPTransport) -register_sentry(client, worker) -{% endhighlight %} - -Where `worker` is your RQ worker instance. After that, call `worker.work(...)` -to start the worker. All exceptions that occur are reported to Sentry -automatically. - -
- - Note: -

- Error delivery to Sentry is known to be unreliable with RQ when using - async transports (the default is). So you are encouraged to use the - HTTPTransport or RequestsHTTPTransport when - creating your client. See the code sample above, or the Raven - documentation. -

-

- For more info, see the - Raven docs. -

-
- -Read more on RQ's [custom exception handling](/docs/exceptions/) capabilities. +## Sending Exceptions to Sentry + +[Sentry](https://www.getsentry.com/) is a popular exception gathering service. +RQ allows you to very easily send job exceptions to Sentry. To do this, you'll +need to have [sentry-sdk](https://pypi.org/project/sentry-sdk/) installed. + +There are a few ways to start sending job exceptions to Sentry. + + +### Configuring Sentry Through CLI + +Simply invoke the `rqworker` script using the ``--sentry-dsn`` argument. + +```console +rq worker --sentry-dsn https://my-dsn@sentry.io/123 +``` + + +### Configuring Sentry Through a Config File + +Declare `SENTRY_DSN` in RQ's config file like this: + +```python +SENTRY_DSN = 'https://my-dsn@sentry.io/123' +``` + +And run RQ's worker with your config file: + +```console +rq worker -c my_settings +``` + +Visit [this page](https://python-rq.org/docs/workers/#using-a-config-file) +to read more about running RQ using a config file. + + +### Configuring Sentry Through Environment Variable + +Simple set `RQ_SENTRY_DSN` in your environment variable and RQ will +automatically start Sentry integration for you. + +```console +RQ_SENTRY_DSN="https://my-dsn@sentry.io/123" rq worker +``` diff --git a/rq/__init__.py b/rq/__init__.py index 0e55f1b..9ad8be1 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -6,7 +6,7 @@ from __future__ import (absolute_import, division, print_function, from .connections import (Connection, get_current_connection, pop_connection, push_connection, use_connection) from .job import cancel_job, get_current_job, requeue_job -from .queue import get_failed_queue, Queue +from .queue import Queue from .version import VERSION from .worker import SimpleWorker, Worker diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 2b75a16..dbc4eb2 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -12,7 +12,7 @@ import sys import click from redis.exceptions import ConnectionError -from rq import Connection, get_failed_queue, __version__ as version +from rq import Connection, __version__ as version from rq.cli.helpers import (read_config_file, refresh, setup_loghandlers_from_args, show_both, show_queues, show_workers, CliConfig) @@ -23,6 +23,7 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) from rq.exceptions import InvalidJobOperationError +from rq.registry import FailedJobRegistry from rq.utils import import_attribute from rq.suspension import (suspend as connection_suspend, resume as connection_resume, is_suspended) @@ -112,16 +113,16 @@ def empty(cli_config, all, queues, **options): @main.command() @click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') +@click.option('--queue', required=True, type=str) @click.argument('job_ids', nargs=-1) @pass_cli_config -def requeue(cli_config, all, job_class, job_ids, **options): +def requeue(cli_config, queue, all, job_class, job_ids, **options): """Requeue failed jobs.""" - failed_queue = get_failed_queue(connection=cli_config.connection, - job_class=cli_config.job_class) - + failed_job_registry = FailedJobRegistry(queue, + connection=cli_config.connection) if all: - job_ids = failed_queue.job_ids + job_ids = failed_job_registry.get_job_ids() if not job_ids: click.echo('Nothing to do') @@ -132,12 +133,12 @@ def requeue(cli_config, all, job_class, job_ids, **options): with click.progressbar(job_ids) as job_ids: for job_id in job_ids: try: - failed_queue.requeue(job_id) + failed_job_registry.requeue(job_id) except InvalidJobOperationError: fail_count += 1 if fail_count > 0: - click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red') + click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red') @main.command() @@ -183,16 +184,17 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') -@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') +@click.option('--sentry-dsn', envvar='RQ_SENTRY_DSN', help='Report exceptions to this Sentry DSN') @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') +@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.argument('queues', nargs=-1) @pass_cli_config def worker(cli_config, burst, logging_level, name, results_ttl, worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn, - exception_handler, pid, queues, log_format, date_format, **options): + exception_handler, pid, disable_default_exception_handler, queues, + log_format, date_format, **options): """Starts an RQ worker.""" - settings = read_config_file(cli_config.config) if cli_config.config else {} # Worker specific default arguments queues = queues or settings.get('QUEUES', ['default']) @@ -220,23 +222,19 @@ def worker(cli_config, burst, logging_level, name, results_ttl, connection=cli_config.connection, job_class=cli_config.job_class) for queue in queues] - worker = cli_config.worker_class(queues, - name=name, - connection=cli_config.connection, - default_worker_ttl=worker_ttl, - default_result_ttl=results_ttl, - job_monitoring_interval=job_monitoring_interval, - job_class=cli_config.job_class, - queue_class=cli_config.queue_class, - exception_handlers=exception_handlers or None) + worker = cli_config.worker_class( + queues, name=name, connection=cli_config.connection, + default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + job_monitoring_interval=job_monitoring_interval, + job_class=cli_config.job_class, queue_class=cli_config.queue_class, + exception_handlers=exception_handlers or None, + disable_default_exception_handler=disable_default_exception_handler + ) # Should we configure Sentry? if sentry_dsn: - from raven import Client - from raven.transport.http import HTTPTransport from rq.contrib.sentry import register_sentry - client = Client(sentry_dsn, transport=HTTPTransport) - register_sentry(client, worker) + register_sentry(sentry_dsn) worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format) except ConnectionError as e: diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 0e964f9..49ba255 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -53,22 +53,9 @@ def get_redis_from_config(settings, connection_class=Redis): 'port': settings.get('REDIS_PORT', 6379), 'db': settings.get('REDIS_DB', 0), 'password': settings.get('REDIS_PASSWORD', None), + 'ssl': settings.get('REDIS_SSL', False), } - use_ssl = settings.get('REDIS_SSL', False) - if use_ssl: - # If SSL is required, we need to depend on redis-py being 2.10 at - # least - def safeint(x): - try: - return int(x) - except ValueError: - return 0 - - version_info = tuple(safeint(x) for x in redis.__version__.split('.')) - if not version_info >= (2, 10): - raise RuntimeError('Using SSL requires a redis-py version >= 2.10') - kwargs['ssl'] = use_ssl return connection_class(**kwargs) @@ -137,48 +124,49 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): if queues: qs = list(map(queue_class, queues)) - def any_matching_queue(worker): - def queue_matches(q): - return q in qs - return any(map(queue_matches, worker.queues)) - - # Filter out workers that don't match the queue filter - ws = [w for w in worker_class.all() if any_matching_queue(w)] - - def filter_queues(queue_names): - return [qname for qname in queue_names if queue_class(qname) in qs] + workers = set() + for queue in qs: + for worker in worker_class.all(queue=queue): + workers.add(worker) else: qs = queue_class.all() - ws = worker_class.all() - filter_queues = (lambda x: x) + workers = worker_class.all() if not by_queue: - for w in ws: - worker_queues = filter_queues(w.queue_names()) + + for worker in workers: + queue_names = ', '.join(worker.queue_names()) + name = '%s (%s %s)' % (worker.name, worker.hostname, worker.pid) if not raw: - click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) + click.echo('%s: %s %s' % (name, state_symbol(worker.get_state()), queue_names)) else: - click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) + click.echo('worker %s %s %s' % (name, worker.get_state(), queue_names)) + else: - # Create reverse lookup table - queues = dict([(q, []) for q in qs]) - for w in ws: - for q in w.queues: - if q not in queues: - continue - queues[q].append(w) - - max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 - for q in queues: - if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa + # Display workers by queue + queue_dict = {} + for queue in qs: + queue_dict[queue] = worker_class.all(queue=queue) + + if queue_dict: + max_length = max([len(q.name) for q, in queue_dict.keys()]) + else: + max_length = 0 + + for queue in queue_dict: + if queue_dict[queue]: + queues_str = ", ".join( + sorted( + map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]) + ) + ) else: queues_str = '–' - click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) + click.echo('%s %s' % (pad(queue.name + ':', max_length + 1), queues_str)) if not raw: - click.echo('%d workers, %d queues' % (len(ws), len(qs))) + click.echo('%d workers, %d queues' % (len(workers), len(qs))) def show_both(queues, raw, by_queue, queue_class, worker_class): diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 5608e63..d76c4ae 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -3,19 +3,10 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -def register_sentry(client, worker): +def register_sentry(sentry_dsn): """Given a Raven client and an RQ worker, registers exception handlers with the worker so exceptions are logged to Sentry. """ - def send_to_sentry(job, *exc_info): - client.captureException( - exc_info=exc_info, - extra={ - 'job_id': job.id, - 'func': job.func_name, - 'args': job.args, - 'kwargs': job.kwargs, - 'description': job.description, - }) - - worker.push_exc_handler(send_to_sentry) + import sentry_sdk + from sentry_sdk.integrations.rq import RqIntegration + sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()]) diff --git a/rq/defaults.py b/rq/defaults.py index 89792c0..701e356 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -5,5 +5,6 @@ DEFAULT_CONNECTION_CLASS = 'redis.Redis' DEFAULT_WORKER_TTL = 420 DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_RESULT_TTL = 500 +DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' diff --git a/rq/exceptions.py b/rq/exceptions.py index e9f58e0..684bfb0 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -15,6 +15,10 @@ class InvalidJobOperationError(Exception): pass +class InvalidJobOperation(Exception): + pass + + class UnpickleError(Exception): def __init__(self, message, raw_data, inner_exception=None): super(UnpickleError, self).__init__(message, inner_exception) diff --git a/rq/handlers.py b/rq/handlers.py deleted file mode 100644 index 33ac03d..0000000 --- a/rq/handlers.py +++ /dev/null @@ -1,12 +0,0 @@ -import traceback - -from .connections import get_current_connection -from .queue import get_failed_queue -from .worker import Worker - - -def move_to_failed_queue(job, *exc_info): - """Default exception handler: move the job to the failed queue.""" - exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info)) - failed_queue = get_failed_queue(get_current_connection(), job.__class__) - failed_queue.quarantine(job, exc_info=exc_string) diff --git a/rq/job.py b/rq/job.py index 79101a7..c411fb2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -20,6 +20,7 @@ try: except ImportError: # noqa # pragma: no cover import pickle + # Serialize pickle dumps using the highest pickle protocol (binary, default # uses ascii) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) @@ -61,16 +62,6 @@ def cancel_job(job_id, connection=None): Job.fetch(job_id, connection=connection).cancel() -def requeue_job(job_id, connection=None, job_class=None): - """Requeues the job with the given job ID. If no such job exists, just - remove the job ID from the failed queue, otherwise the job ID should refer - to a failed job (i.e. it should be on the failed queue). - """ - from .queue import get_failed_queue - failed_queue = get_failed_queue(connection=connection, job_class=job_class) - return failed_queue.requeue(job_id) - - def get_current_job(connection=None, job_class=None): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. @@ -81,6 +72,11 @@ def get_current_job(connection=None, job_class=None): return _job_stack.top +def requeue_job(job_id, connection): + job = Job.fetch(job_id, connection=connection) + return job.requeue() + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ @@ -90,7 +86,8 @@ class Job(object): @classmethod def create(cls, func, args=None, kwargs=None, connection=None, result_ttl=None, ttl=None, status=None, description=None, - depends_on=None, timeout=None, id=None, origin=None, meta=None): + depends_on=None, timeout=None, id=None, origin=None, meta=None, + failure_ttl=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -131,6 +128,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.failure_ttl = failure_ttl job.ttl = ttl job.timeout = parse_timeout(timeout) job._status = status @@ -145,27 +143,11 @@ class Job(object): self._status = as_text(self.connection.hget(self.key, 'status')) return self._status - def _get_status(self): - warnings.warn( - "job.status is deprecated. Use job.get_status() instead", - DeprecationWarning - ) - return self.get_status() - def set_status(self, status, pipeline=None): self._status = status connection = pipeline or self.connection connection.hset(self.key, 'status', self._status) - def _set_status(self, status): - warnings.warn( - "job.status is deprecated. Use job.set_status() instead", - DeprecationWarning - ) - self.set_status(status) - - status = property(_get_status, _set_status) - @property def is_finished(self): return self.get_status() == JobStatus.FINISHED @@ -323,6 +305,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.failure_ttl = None self.ttl = None self._status = None self._dependency_id = None @@ -447,6 +430,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self._dependency_id = as_text(obj.get('dependency_id', None)) self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None @@ -492,6 +476,8 @@ class Job(object): obj['timeout'] = self.timeout if self.result_ttl is not None: obj['result_ttl'] = self.result_ttl + if self.failure_ttl is not None: + obj['failure_ttl'] = self.failure_ttl if self._status is not None: obj['status'] = self._status if self._dependency_id is not None: @@ -538,11 +524,14 @@ class Job(object): q.remove(self, pipeline=pipeline) pipeline.execute() + def requeue(self): + """Requeues job.""" + self.failed_job_registry.requeue(self) + def delete(self, pipeline=None, remove_from_queue=True, delete_dependents=False): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well.""" - if remove_from_queue: self.cancel(pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection @@ -569,10 +558,7 @@ class Job(object): registry.remove(self, pipeline=pipeline) elif self.is_failed: - from .queue import get_failed_queue - failed_queue = get_failed_queue(connection=self.connection, - job_class=self.__class__) - failed_queue.remove(self, pipeline=pipeline) + self.failed_job_registry.remove(self, pipeline=pipeline) if delete_dependents: self.delete_dependents(pipeline=pipeline) @@ -655,6 +641,12 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) + @property + def failed_job_registry(self): + from .registry import FailedJobRegistry + return FailedJobRegistry(self.origin, connection=self.connection, + job_class=self.__class__) + def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they depend on is successfully performed. We record this relation as diff --git a/rq/logutils.py b/rq/logutils.py index 5844e2b..2ae417a 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -9,16 +9,19 @@ from rq.defaults import (DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) -def setup_loghandlers(level, date_format=DEFAULT_LOGGING_DATE_FORMAT, +def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT, log_format=DEFAULT_LOGGING_FORMAT): logger = logging.getLogger('rq.worker') + if not _has_effective_handler(logger): - logger.setLevel(level) formatter = logging.Formatter(fmt=log_format, datefmt=date_format) handler = ColorizingStreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) + if level is not None: + logger.setLevel(level) + def _has_effective_handler(logger): """ diff --git a/rq/queue.py b/rq/queue.py index 090576d..d58c27b 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -10,17 +10,12 @@ from redis import WatchError from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL -from .exceptions import (DequeueTimeout, InvalidJobDependency, - InvalidJobOperationError, NoSuchJobError, UnpickleError) +from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError, + UnpickleError) from .job import Job, JobStatus from .utils import backend_class, import_attribute, utcnow, parse_timeout -def get_failed_queue(connection=None, job_class=None): - """Returns a handle to the special failed queue.""" - return FailedQueue(connection=connection, job_class=job_class) - - def compact(lst): return [item for item in lst if item is not None] @@ -94,6 +89,17 @@ class Queue(object): """Returns the Redis key for this Queue.""" return self._key + @property + def registry_cleaning_key(self): + """Redis key used to indicate this queue has been cleaned.""" + return 'rq:clean_registries:%s' % self.name + + def acquire_cleaning_lock(self): + """Returns a boolean indicating whether a lock to clean this queue + is acquired. A lock expires in 899 seconds (15 minutes - 1 second) + """ + return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899) + def empty(self): """Removes all messages on the queue.""" script = """ @@ -141,8 +147,7 @@ class Queue(object): except NoSuchJobError: self.remove(job_id) else: - if job.origin == self.name or \ - (job.is_failed and self == get_failed_queue(connection=self.connection, job_class=self.job_class)): + if job.origin == self.name: return job def get_job_ids(self, offset=0, length=-1): @@ -175,6 +180,12 @@ class Queue(object): """Returns a count of all messages in the queue.""" return self.connection.llen(self.key) + @property + def failed_job_registry(self): + """Returns this queue's FailedJobRegistry.""" + from rq.registry import FailedJobRegistry + return FailedJobRegistry(queue=self) + def remove(self, job_or_id, pipeline=None): """Removes Job from queue, accepts either a Job instance or ID.""" job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id @@ -210,8 +221,9 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, ttl=None, description=None, - depends_on=None, job_id=None, at_front=False, meta=None): + result_ttl=None, ttl=None, failure_ttl=None, + description=None, depends_on=None, job_id=None, + at_front=False, meta=None): """Creates a job to represent the delayed function call and enqueues it. @@ -221,13 +233,15 @@ class Queue(object): """ timeout = parse_timeout(timeout) or self._default_timeout result_ttl = parse_timeout(result_ttl) + failure_ttl = parse_timeout(failure_ttl) ttl = parse_timeout(ttl) job = self.job_class.create( func, args=args, kwargs=kwargs, connection=self.connection, - result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED, - description=description, depends_on=depends_on, - timeout=timeout, id=job_id, origin=self.name, meta=meta) + result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, + status=JobStatus.QUEUED, description=description, + depends_on=depends_on, timeout=timeout, id=job_id, + origin=self.name, meta=meta) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. @@ -290,16 +304,12 @@ class Queue(object): 'by workers') # Detect explicit invocations, i.e. of the form: - # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) + # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) timeout = kwargs.pop('job_timeout', None) - if timeout is None: - timeout = kwargs.pop('timeout', None) - if timeout: - warnings.warn('The `timeout` keyword is deprecated. Use `job_timeout` instead', DeprecationWarning) - description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) ttl = kwargs.pop('ttl', None) + failure_ttl = kwargs.pop('failure_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) at_front = kwargs.pop('at_front', False) @@ -310,10 +320,12 @@ class Queue(object): args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) - return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, ttl=ttl, - description=description, depends_on=depends_on, - job_id=job_id, at_front=at_front, meta=meta) + return self.enqueue_call( + func=f, args=args, kwargs=kwargs, timeout=timeout, + result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, + description=description, depends_on=depends_on, job_id=job_id, + at_front=at_front, meta=meta + ) def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. @@ -429,28 +441,6 @@ class Queue(object): return queue_key, blob return None - def dequeue(self): - """Dequeues the front-most job from this queue. - - Returns a job_class instance, which can be executed or inspected. - """ - while True: - job_id = self.pop_job_id() - if job_id is None: - return None - try: - job = self.job_class.fetch(job_id, connection=self.connection) - except NoSuchJobError as e: - # Silently pass on jobs that don't exist (anymore), - continue - except UnpickleError as e: - # Attach queue information on the exception for improved error - # reporting - e.job_id = job_id - e.queue = self - raise e - return job - @classmethod def dequeue_any(cls, queues, timeout, connection=None, job_class=None): """Class method returning the job_class instance at the front of the given @@ -509,48 +499,3 @@ class Queue(object): def __str__(self): return '<{0} {1}>'.format(self.__class__.__name__, self.name) - - -class FailedQueue(Queue): - def __init__(self, connection=None, job_class=None): - super(FailedQueue, self).__init__(JobStatus.FAILED, - connection=connection, - job_class=job_class) - - def quarantine(self, job, exc_info): - """Puts the given Job in quarantine (i.e. put it on the failed - queue). - """ - - with self.connection.pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - - job.exc_info = exc_info - job.save(pipeline=pipeline, include_meta=False) - job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire - - self.push_job_id(str(job.id), pipeline=pipeline) - pipeline.execute() - - return job - - def requeue(self, job_id): - """Requeues the job with the given job ID.""" - try: - job = self.job_class.fetch(job_id, connection=self.connection) - except NoSuchJobError: - # Silently ignore/remove this job and return (i.e. do nothing) - self.remove(job_id) - return - - # Delete it from the failed queue (raise an error if that failed) - if self.remove(job) == 0: - raise InvalidJobOperationError('Cannot requeue non-failed jobs') - - job.set_status(JobStatus.QUEUED) - job.exc_info = None - queue = Queue(job.origin, - connection=self.connection, - job_class=self.job_class) - return queue.enqueue_job(job) diff --git a/rq/registry.py b/rq/registry.py index b482c30..85fefc2 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,8 +1,9 @@ from .compat import as_text from .connections import resolve_connection -from .exceptions import NoSuchJobError +from .defaults import DEFAULT_FAILURE_TTL +from .exceptions import InvalidJobOperation, NoSuchJobError from .job import Job, JobStatus -from .queue import FailedQueue +from .queue import Queue from .utils import backend_class, current_timestamp @@ -27,11 +28,20 @@ class BaseRegistry(object): self.key = self.key_template.format(self.name) self.job_class = backend_class(self, 'job_class', override=job_class) - def __len__(self): """Returns the number of jobs in this registry""" return self.count + def __contains__(self, item): + """ + Returns a boolean indicating registry contains the given + job instance or job id. + """ + job_id = item + if isinstance(item, self.job_class): + job_id = item.id + return self.connection.zscore(self.key, job_id) is not None + @property def count(self): """Returns the number of jobs in this registry""" @@ -69,6 +79,10 @@ class BaseRegistry(object): return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] + def get_queue(self): + """Returns Queue object associated with this registry.""" + return Queue(self.name, connection=self.connection) + class StartedJobRegistry(BaseRegistry): """ @@ -82,7 +96,7 @@ class StartedJobRegistry(BaseRegistry): key_template = 'rq:wip:{0}' def cleanup(self, timestamp=None): - """Remove expired jobs from registry and add them to FailedQueue. + """Remove expired jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as seconds since the Unix epoch. timestamp defaults to call time if @@ -92,8 +106,7 @@ class StartedJobRegistry(BaseRegistry): job_ids = self.get_expired_job_ids(score) if job_ids: - failed_queue = FailedQueue(connection=self.connection, - job_class=self.job_class) + failed_job_registry = FailedJobRegistry(self.name, self.connection) with self.connection.pipeline() as pipeline: for job_id in job_ids: @@ -103,7 +116,7 @@ class StartedJobRegistry(BaseRegistry): job.set_status(JobStatus.FAILED) job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) - failed_queue.push_job_id(job_id, pipeline=pipeline) + failed_job_registry.add(job, job.failure_ttl) except NoSuchJobError: pass @@ -131,6 +144,61 @@ class FinishedJobRegistry(BaseRegistry): self.connection.zremrangebyscore(self.key, 0, score) +class FailedJobRegistry(BaseRegistry): + """ + Registry of containing failed jobs. + """ + key_template = 'rq:failed:{0}' + + def cleanup(self, timestamp=None): + """Remove expired jobs from registry. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() + self.connection.zremrangebyscore(self.key, 0, score) + + def add(self, job, ttl=None, exc_string='', pipeline=None): + """ + Adds a job to a registry with expiry time of now + ttl. + `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. + """ + if ttl is None: + ttl = DEFAULT_FAILURE_TTL + score = ttl if ttl < 0 else current_timestamp() + ttl + + if pipeline: + p = pipeline + else: + p = self.connection.pipeline() + + job.exc_info = exc_string + job.save(pipeline=p, include_meta=False) + job.cleanup(ttl=-1, pipeline=p) # failed job won't expire + p.zadd(self.key, {job.id: score}) + + if not pipeline: + p.execute() + + def requeue(self, job_or_id): + """Requeues the job with the given job ID.""" + if isinstance(job_or_id, self.job_class): + job = job_or_id + else: + job = self.job_class.fetch(job_or_id, connection=self.connection) + + result = self.connection.zrem(self.key, job.id) + if not result: + raise InvalidJobOperation + + queue = Queue(job.origin, connection=self.connection, + job_class=self.job_class) + + return queue.enqueue_job(job) + + class DeferredJobRegistry(BaseRegistry): """ Registry of deferred jobs (waiting for another job to finish). @@ -154,3 +222,8 @@ def clean_registries(queue): connection=queue.connection, job_class=queue.job_class) registry.cleanup() + + registry = FailedJobRegistry(name=queue.name, + connection=queue.connection, + job_class=queue.job_class) + registry.cleanup() diff --git a/rq/version.py b/rq/version.py index 98b8f38..ba59ecb 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.13.0' +VERSION = '1.0' diff --git a/rq/worker.py b/rq/worker.py index 506c8ab..2dbb27f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,6 +13,7 @@ import time import traceback import warnings from datetime import timedelta +from uuid import uuid4 try: from signal import SIGKILL @@ -24,19 +25,22 @@ from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, + +from .defaults import (DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers -from .queue import Queue, get_failed_queue -from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries +from .queue import Queue +from .registry import (FailedJobRegistry, FinishedJobRegistry, + StartedJobRegistry, clean_registries) from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .utils import (backend_class, ensure_list, enum, make_colorizer, utcformat, utcnow, utcparse) from .version import VERSION -from .worker_registration import get_keys +from .worker_registration import clean_worker_registry, get_keys try: from procname import setprocname @@ -56,10 +60,6 @@ class StopRequested(Exception): pass -def iterable(x): - return hasattr(x, '__iter__') - - def compact(l): return [x for x in l if x is not None] @@ -148,11 +148,8 @@ class Worker(object): return None name = worker_key[len(prefix):] - worker = cls([], - name, - connection=connection, - job_class=job_class, - queue_class=queue_class) + worker = cls([], name, connection=connection, job_class=job_class, + queue_class=queue_class, prepare_for_work=False) worker.refresh() @@ -161,13 +158,21 @@ class Worker(object): def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL, connection=None, exc_handler=None, exception_handlers=None, default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None, - queue_class=None, + queue_class=None, log_job_description=True, job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, - log_job_description=True): # noqa + disable_default_exception_handler=False, + prepare_for_work=True): # noqa if connection is None: connection = get_current_connection() self.connection = connection + if prepare_for_work: + self.hostname = socket.gethostname() + self.pid = os.getpid() + else: + self.hostname = None + self.pid = None + self.job_class = backend_class(self, 'job_class', override=job_class) self.queue_class = backend_class(self, 'queue_class', override=queue_class) @@ -176,7 +181,8 @@ class Worker(object): job_class=self.job_class) if isinstance(q, string_types) else q for q in ensure_list(queues)] - self._name = name + + self.name = name or uuid4().hex self.queues = queues self.validate_queues() self._exc_handlers = [] @@ -191,27 +197,17 @@ class Worker(object): self._stop_requested = False self.log = logger self.log_job_description = log_job_description - self.failed_queue = get_failed_queue(connection=self.connection, - job_class=self.job_class) self.last_cleaned_at = None self.successful_job_count = 0 self.failed_job_count = 0 self.total_working_time = 0 self.birth_date = None - # By default, push the "move-to-failed-queue" exception handler onto - # the stack - if exception_handlers is None: - self.push_exc_handler(self.move_to_failed_queue) - if exc_handler is not None: - self.push_exc_handler(exc_handler) - warnings.warn( - "exc_handler is deprecated, pass a list to exception_handlers instead.", - DeprecationWarning - ) - elif isinstance(exception_handlers, list): - for h in exception_handlers: - self.push_exc_handler(h) + self.disable_default_exception_handler = disable_default_exception_handler + + if isinstance(exception_handlers, list): + for handler in exception_handlers: + self.push_exc_handler(handler) elif exception_handlers is not None: self.push_exc_handler(exception_handlers) @@ -229,30 +225,11 @@ class Worker(object): """Returns the Redis keys representing this worker's queues.""" return list(map(lambda q: q.key, self.queues)) - @property - def name(self): - """Returns the name of the worker, under which it is registered to the - monitoring system. - - By default, the name of the worker is constructed from the current - (short) host name and the current PID. - """ - if self._name is None: - hostname = socket.gethostname() - shortname, _, _ = hostname.partition('.') - self._name = '{0}.{1}'.format(shortname, self.pid) - return self._name - @property def key(self): """Returns the worker's Redis hash key.""" return self.redis_worker_namespace_prefix + self.name - @property - def pid(self): - """The current process ID.""" - return os.getpid() - @property def horse_pid(self): """The horse's process ID. Only available in the worker. Will return @@ -289,6 +266,8 @@ class Worker(object): p.hset(key, 'birth', now_in_string) p.hset(key, 'last_heartbeat', now_in_string) p.hset(key, 'queues', queues) + p.hset(key, 'pid', self.pid) + p.hset(key, 'hostname', self.hostname) worker_registration.register(self, p) p.expire(key, self.default_worker_ttl) p.execute() @@ -502,6 +481,17 @@ class Worker(object): except StopRequested: break + + except SystemExit: + # Cold shutdown detected + raise + + except: # noqa + self.log.error( + 'Worker %s: found an unhandled exception, quitting...', + self.name, exc_info=True + ) + break finally: if not self.is_horse: self.register_death() @@ -526,12 +516,11 @@ class Worker(object): job, queue = result if self.log_job_description: - self.log.info('%s: %s (%s)', green(queue.name), - blue(job.description), - job.id) + self.log.info( + '%s: %s (%s)', green(queue.name), + blue(job.description), job.id) else: - self.log.info('%s:%s', green(queue.name), - job.id) + self.log.info('%s:%s', green(queue.name), job.id) break except DequeueTimeout: @@ -561,10 +550,14 @@ class Worker(object): def refresh(self): data = self.connection.hmget( self.key, 'queues', 'state', 'current_job', 'last_heartbeat', - 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time' + 'birth', 'failed_job_count', 'successful_job_count', + 'total_working_time', 'hostname', 'pid' ) - queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data + (queues, state, job_id, last_heartbeat, birth, failed_job_count, + successful_job_count, total_working_time, hostname, pid) = data queues = as_text(queues) + self.hostname = hostname + self.pid = int(pid) if pid else None self._state = as_text(state or '?') self._job_id = job_id or None if last_heartbeat: @@ -598,7 +591,7 @@ class Worker(object): def increment_total_working_time(self, job_execution_time, pipeline): pipeline.hincrbyfloat(self.key, 'total_working_time', - job_execution_time.microseconds) + job_execution_time.total_seconds()) def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. @@ -648,19 +641,17 @@ class Worker(object): if not job.ended_at: job.ended_at = utcnow() - self.handle_job_failure(job=job) - # Unhandled failure: move the job to the failed queue self.log.warning(( - 'Moving job to %r queue ' - '(work-horse terminated unexpectedly; waitpid returned %s)' - ), self.failed_queue.name, ret_val) - self.failed_queue.quarantine( + 'Moving job to FailedJobRegistry ' + '(work-horse terminated unexpectedly; waitpid returned {})' + ).format(ret_val)) + + exc_string = "Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val + self.handle_job_failure( job, - exc_info=( - "Work-horse process was terminated unexpectedly " - "(waitpid returned {0})" - ).format(ret_val) + exc_string="Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val ) def execute_job(self, job, queue): @@ -727,24 +718,37 @@ class Worker(object): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job, started_job_registry=None): + def handle_job_failure(self, job, started_job_registry=None, + exc_string=''): """Handles the failure or an executing job by: 1. Setting the job status to failed - 2. Removing the job from the started_job_registry + 2. Removing the job from StartedJobRegistry 3. Setting the workers current job to None + 4. Add the job to FailedJobRegistry """ with self.connection.pipeline() as pipeline: if started_job_registry is None: - started_job_registry = StartedJobRegistry(job.origin, - self.connection, - job_class=self.job_class) + started_job_registry = StartedJobRegistry( + job.origin, + self.connection, + job_class=self.job_class + ) job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) + + if not self.disable_default_exception_handler: + failed_job_registry = FailedJobRegistry(job.origin, job.connection, + job_class=self.job_class) + failed_job_registry.add(job, ttl=job.failure_ttl, + exc_string=exc_string, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) self.increment_failed_job_count(pipeline) if job.started_at and job.ended_at: - self.increment_total_working_time(job.ended_at - job.started_at, - pipeline) + self.increment_total_working_time( + job.ended_at - job.started_at, + pipeline + ) try: pipeline.execute() @@ -795,7 +799,6 @@ class Worker(object): inside the work horse's process. """ self.prepare_job_execution(job, heartbeat_ttl) - push_connection(self.connection) started_job_registry = StartedJobRegistry(job.origin, @@ -813,15 +816,18 @@ class Worker(object): # Pickle the result in the same try-except block since we need # to use the same exc handling when pickling fails job._result = rv - self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: job.ended_at = utcnow() - self.handle_job_failure(job=job, + exc_info = sys.exc_info() + exc_string = self._get_safe_exception_string( + traceback.format_exception(*exc_info) + ) + self.handle_job_failure(job=job, exc_string=exc_string, started_job_registry=started_job_registry) - self.handle_exception(job, *sys.exc_info()) + self.handle_exception(job, *exc_info) return False finally: @@ -855,7 +861,7 @@ class Worker(object): 'queue': job.origin, }) - for handler in reversed(self._exc_handlers): + for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) fallthrough = handler(job, *exc_info) @@ -867,12 +873,6 @@ class Worker(object): if not fallthrough: break - def move_to_failed_queue(self, job, *exc_info): - """Default exception handler: move the job to the failed queue.""" - self.log.warning('Moving job to %r queue', self.failed_queue.name) - from .handlers import move_to_failed_queue - move_to_failed_queue(job, *exc_info) - @staticmethod def _get_safe_exception_string(exc_strings): """Ensure list of exception strings is decoded on Python 2 and joined as one string safely.""" @@ -904,16 +904,20 @@ class Worker(object): def clean_registries(self): """Runs maintenance jobs on each Queue's registries.""" for queue in self.queues: - self.log.info('Cleaning registries for queue: %s', queue.name) - clean_registries(queue) + # If there are multiple workers running, we only want 1 worker + # to run clean_registries(). + if queue.acquire_cleaning_lock(): + self.log.info('Cleaning registries for queue: %s', queue.name) + clean_registries(queue) + clean_worker_registry(queue) self.last_cleaned_at = utcnow() @property def should_run_maintenance_tasks(self): - """Maintenance tasks should run on first startup or every hour.""" + """Maintenance tasks should run on first startup or 15 minutes.""" if self.last_cleaned_at is None: return True - if (utcnow() - self.last_cleaned_at) > timedelta(hours=1): + if (utcnow() - self.last_cleaned_at) > timedelta(minutes=15): return True return False diff --git a/rq/worker_registration.py b/rq/worker_registration.py index d24fac3..3944bc7 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -43,3 +43,25 @@ def get_keys(queue=None, connection=None): redis_key = REDIS_WORKER_KEYS return {as_text(key) for key in redis.smembers(redis_key)} + + +def clean_worker_registry(queue): + """Delete invalid worker keys in registry""" + keys = list(get_keys(queue)) + + with queue.connection.pipeline() as pipeline: + + for key in keys: + pipeline.exists(key) + results = pipeline.execute() + + invalid_keys = [] + + for i, key_exists in enumerate(results): + if not key_exists: + invalid_keys.append(keys[i]) + + if invalid_keys: + pipeline.srem(WORKERS_BY_QUEUE_KEY % queue.name, *invalid_keys) + pipeline.srem(REDIS_WORKER_KEYS, *invalid_keys) + pipeline.execute() diff --git a/tests/config_files/__init__.py b/tests/config_files/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/dummy_settings.py b/tests/config_files/dummy.py similarity index 100% rename from tests/dummy_settings.py rename to tests/config_files/dummy.py diff --git a/tests/dummy_settings_override.py b/tests/config_files/dummy_override.py similarity index 100% rename from tests/dummy_settings_override.py rename to tests/config_files/dummy_override.py diff --git a/tests/config_files/sentry.py b/tests/config_files/sentry.py new file mode 100644 index 0000000..5ed25b1 --- /dev/null +++ b/tests/config_files/sentry.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +REDIS_HOST = "testhost.example.com" +SENTRY_DSN = 'https://123@sentry.io/123' diff --git a/tests/fixtures.py b/tests/fixtures.py index 16c2134..882bdad 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -119,6 +119,12 @@ def black_hole(job, *exc_info): return False +def add_meta(job, *exc_info): + job.meta = {'foo': 1} + job.save() + return True + + def save_key_ttl(key): # Stores key ttl in meta job = get_current_job() diff --git a/tests/test_cli.py b/tests/test_cli.py index c7f56af..060558c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,19 +3,19 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from click.testing import CliRunner -from rq import get_failed_queue, Queue -from rq.job import Job +from redis import Redis + +from rq import Queue from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig +from rq.job import Job +from rq.registry import FailedJobRegistry +from rq.worker import Worker, WorkerStatus + import pytest from tests import RQTestCase -from tests.fixtures import div_by_zero - -try: - from unittest import TestCase -except ImportError: - from unittest2 import TestCase # noqa +from tests.fixtures import div_by_zero, say_hello class TestRQCli(RQTestCase): @@ -39,20 +39,20 @@ class TestRQCli(RQTestCase): super(TestRQCli, self).setUp() db_num = self.testconn.connection_pool.connection_kwargs['db'] self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + self.connection = Redis.from_url(self.redis_url) job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa def test_config_file(self): - settings = read_config_file('tests.dummy_settings') + settings = read_config_file('tests.config_files.dummy') self.assertIn('REDIS_HOST', settings) self.assertEqual(settings['REDIS_HOST'], 'testhost.example.com') def test_config_file_option(self): """""" - cli_config = CliConfig(config='tests.dummy_settings') + cli_config = CliConfig(config='tests.config_files.dummy') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', @@ -63,7 +63,7 @@ class TestRQCli(RQTestCase): def test_config_file_default_options(self): """""" - cli_config = CliConfig(config='tests.dummy_settings') + cli_config = CliConfig(config='tests.config_files.dummy') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], @@ -84,7 +84,7 @@ class TestRQCli(RQTestCase): def test_config_file_default_options_override(self): """""" - cli_config = CliConfig(config='tests.dummy_settings_override') + cli_config = CliConfig(config='tests.config_files.dummy_override') self.assertEqual( cli_config.connection.connection_pool.connection_kwargs['host'], @@ -110,34 +110,55 @@ class TestRQCli(RQTestCase): self.assert_normal_execution(result) self.assertEqual(result.output.strip(), 'Nothing to do') - def test_empty_failed(self): - """rq empty -u failed""" - runner = CliRunner() - result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed']) - self.assert_normal_execution(result) - self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') - - def test_empty_all(self): - """rq empty -u failed --all""" - runner = CliRunner() - result = runner.invoke(main, ['empty', '-u', self.redis_url, '--all']) - self.assert_normal_execution(result) - self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') - def test_requeue(self): """rq requeue -u --all""" + connection = Redis.from_url(self.redis_url) + queue = Queue('requeue', connection=connection) + registry = queue.failed_job_registry + runner = CliRunner() - result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) + + job = queue.enqueue(div_by_zero) + job2 = queue.enqueue(div_by_zero) + job3 = queue.enqueue(div_by_zero) + + worker = Worker([queue]) + worker.work(burst=True) + + self.assertIn(job, registry) + self.assertIn(job2, registry) + self.assertIn(job3, registry) + + result = runner.invoke( + main, + ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id] + ) self.assert_normal_execution(result) - self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue') - result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) + # Only the first specified job is requeued + self.assertNotIn(job, registry) + self.assertIn(job2, registry) + self.assertIn(job3, registry) + + result = runner.invoke( + main, + ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all'] + ) self.assert_normal_execution(result) - self.assertEqual(result.output.strip(), 'Nothing to do') + # With --all flag, all failed jobs are requeued + self.assertNotIn(job2, registry) + self.assertNotIn(job3, registry) def test_info(self): """rq info -u """ runner = CliRunner() + result = runner.invoke(main, ['info', '-u', self.redis_url]) + self.assert_normal_execution(result) + self.assertIn('0 queues, 0 jobs total', result.output) + + queue = Queue(connection=self.connection) + queue.enqueue(say_hello) + result = runner.invoke(main, ['info', '-u', self.redis_url]) self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) @@ -147,6 +168,13 @@ class TestRQCli(RQTestCase): runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-queues']) self.assert_normal_execution(result) + self.assertIn('0 queues, 0 jobs total', result.output) + + queue = Queue(connection=self.connection) + queue.enqueue(say_hello) + + result = runner.invoke(main, ['info', '-u', self.redis_url]) + self.assert_normal_execution(result) self.assertIn('1 queues, 1 jobs total', result.output) def test_info_only_workers(self): @@ -154,8 +182,42 @@ class TestRQCli(RQTestCase): runner = CliRunner() result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) + self.assertIn('0 workers, 0 queue', result.output) + + queue = Queue(connection=self.connection) + queue.enqueue(say_hello) + result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers']) + self.assert_normal_execution(result) self.assertIn('0 workers, 1 queues', result.output) + foo_queue = Queue(name='foo', connection=self.connection) + foo_queue.enqueue(say_hello) + + bar_queue = Queue(name='bar', connection=self.connection) + bar_queue.enqueue(say_hello) + + worker = Worker([foo_queue, bar_queue], connection=self.connection) + worker.register_birth() + + worker_2 = Worker([foo_queue, bar_queue], connection=self.connection) + worker_2.register_birth() + worker_2.set_state(WorkerStatus.BUSY) + + result = runner.invoke(main, ['info', 'foo', 'bar', + '-u', self.redis_url, '--only-workers']) + + self.assert_normal_execution(result) + self.assertIn('2 workers, 2 queues', result.output) + + result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue', + '-u', self.redis_url, '--only-workers']) + + self.assert_normal_execution(result) + # Ensure both queues' workers are shown + self.assertIn('foo:', result.output) + self.assertIn('bar:', result.output) + self.assertIn('2 workers, 2 queues', result.output) + def test_worker(self): """rq worker -u -b""" runner = CliRunner() @@ -172,22 +234,41 @@ class TestRQCli(RQTestCase): def test_exception_handlers(self): """rq worker -u -b --exception-handler """ - q = Queue() - failed_q = get_failed_queue() - failed_q.empty() - + connection = Redis.from_url(self.redis_url) + q = Queue('default', connection=connection) runner = CliRunner() - # If exception handler is not given, failed job goes to FailedQueue - q.enqueue(div_by_zero) + # If exception handler is not given, no custom exception handler is run + job = q.enqueue(div_by_zero) runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) - self.assertEqual(failed_q.count, 1) + registry = FailedJobRegistry(queue=q) + self.assertTrue(job in registry) - # Black hole exception handler doesn't add failed jobs to FailedQueue - q.enqueue(div_by_zero) + # If disable-default-exception-handler is given, job is not moved to FailedJobRegistry + job = q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', + '--disable-default-exception-handler']) + registry = FailedJobRegistry(queue=q) + self.assertFalse(job in registry) + + # Both default and custom exception handler is run + job = q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', + '--exception-handler', 'tests.fixtures.add_meta']) + registry = FailedJobRegistry(queue=q) + self.assertTrue(job in registry) + job.refresh() + self.assertEqual(job.meta, {'foo': 1}) + + # Only custom exception handler is run + job = q.enqueue(div_by_zero) runner.invoke(main, ['worker', '-u', self.redis_url, '-b', - '--exception-handler', 'tests.fixtures.black_hole']) - self.assertEqual(failed_q.count, 1) + '--exception-handler', 'tests.fixtures.add_meta', + '--disable-default-exception-handler']) + registry = FailedJobRegistry(queue=q) + self.assertFalse(job in registry) + job.refresh() + self.assertEqual(job.meta, {'foo': 1}) def test_suspend_and_resume(self): """rq suspend -u diff --git a/tests/test_job.py b/tests/test_job.py index d940f6d..4e34574 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -18,8 +18,10 @@ from tests import fixtures, RQTestCase from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError -from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job -from rq.queue import Queue, get_failed_queue +from rq.job import Job, get_current_job, JobStatus, cancel_job +from rq.queue import Queue +from rq.registry import (DeferredJobRegistry, FailedJobRegistry, + FinishedJobRegistry, StartedJobRegistry) from rq.utils import utcformat from rq.worker import Worker @@ -360,6 +362,18 @@ class TestJob(RQTestCase): Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.result_ttl, None) + def test_failure_ttl_is_persisted(self): + """Ensure job.failure_ttl is set and restored properly""" + job = Job.create(func=fixtures.say_hello, args=('Lionel',), failure_ttl=15) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.failure_ttl, 15) + + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.failure_ttl, None) + def test_description_is_persisted(self): """Ensure that job's custom description is set properly""" job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!') @@ -383,10 +397,11 @@ class TestJob(RQTestCase): def test_job_access_within_job_function(self): """The current job is accessible within the job function.""" q = Queue() - q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts + job = q.enqueue(fixtures.access_self) w = Worker([q]) w.work(burst=True) - assert get_failed_queue(self.testconn).count == 0 + # access_self calls get_current_job() and executes successfully + self.assertEqual(job.get_status(), JobStatus.FINISHED) def test_job_access_within_synchronous_job_function(self): queue = Queue(is_async=False) @@ -483,6 +498,48 @@ class TestJob(RQTestCase): self.assertNotIn(job.id, queue.get_job_ids()) + def test_job_delete_removes_itself_from_registries(self): + """job.delete() should remove itself from job registries""" + connection = self.testconn + job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED, + connection=self.testconn, origin='default') + job.save() + registry = FailedJobRegistry(connection=self.testconn) + registry.add(job, 500) + + job.delete() + self.assertFalse(job in registry) + + job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED, + connection=self.testconn, origin='default') + job.save() + + registry = FinishedJobRegistry(connection=self.testconn) + registry.add(job, 500) + + job.delete() + self.assertFalse(job in registry) + + job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED, + connection=self.testconn, origin='default') + job.save() + + registry = StartedJobRegistry(connection=self.testconn) + registry.add(job, 500) + + job.delete() + self.assertFalse(job in registry) + + job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED, + connection=self.testconn, origin='default') + job.save() + + registry = DeferredJobRegistry(connection=self.testconn) + registry.add(job, 500) + + job.delete() + self.assertFalse(job in registry) + def test_job_with_dependents_delete_parent_with_saved(self): """job.delete() deletes itself from Redis but not dependents. If the dependent job was saved, it will remain in redis.""" @@ -574,31 +631,6 @@ class TestJob(RQTestCase): cancel_job(job.id) self.assertEqual(0, len(queue.get_jobs())) - def test_create_failed_and_cancel_job(self): - """test creating and using cancel_job deletes job properly""" - failed_queue = get_failed_queue(connection=self.testconn) - job = failed_queue.enqueue(fixtures.say_hello) - job.set_status(JobStatus.FAILED) - self.assertEqual(1, len(failed_queue.get_jobs())) - cancel_job(job.id) - self.assertEqual(0, len(failed_queue.get_jobs())) - - def test_create_and_requeue_job(self): - """Requeueing existing jobs.""" - job = Job.create(func=fixtures.div_by_zero, args=(1, 2, 3)) - job.origin = 'fake' - job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa - - self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa - self.assertEqual(get_failed_queue().count, 1) - - requeued_job = requeue_job(job.id) - - self.assertEqual(get_failed_queue().count, 0) - self.assertEqual(Queue('fake').count, 1) - self.assertEqual(requeued_job.origin, job.origin) - def test_dependents_key_for_should_return_prefixed_job_id(self): """test redis key to store job dependents hash under""" job_id = 'random' diff --git a/tests/test_queue.py b/tests/test_queue.py index 2a6c46e..463396e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -3,11 +3,10 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from tests import RQTestCase -from tests.fixtures import (div_by_zero, echo, Number, say_hello, - some_calculation) +from tests.fixtures import echo, Number, say_hello -from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobDependency, InvalidJobOperationError +from rq import Queue +from rq.exceptions import InvalidJobDependency from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker @@ -189,73 +188,6 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEqual(q.count, 0) - def test_dequeue(self): - """Dequeueing jobs from queues.""" - # Set up - q = Queue() - result = q.enqueue(say_hello, 'Rick', foo='bar') - - # Dequeue a job (not a job ID) off the queue - self.assertEqual(q.count, 1) - job = q.dequeue() - self.assertEqual(job.id, result.id) - self.assertEqual(job.func, say_hello) - self.assertEqual(job.origin, q.name) - self.assertEqual(job.args[0], 'Rick') - self.assertEqual(job.kwargs['foo'], 'bar') - - # ...and assert the queue count when down - self.assertEqual(q.count, 0) - - def test_dequeue_deleted_jobs(self): - """Dequeueing deleted jobs from queues don't blow the stack.""" - q = Queue() - for _ in range(1, 1000): - job = q.enqueue(say_hello) - job.delete() - q.dequeue() - - def test_dequeue_instance_method(self): - """Dequeueing instance method jobs from queues.""" - q = Queue() - n = Number(2) - q.enqueue(n.div, 4) - - job = q.dequeue() - - # The instance has been pickled and unpickled, so it is now a separate - # object. Test for equality using each object's __dict__ instead. - self.assertEqual(job.instance.__dict__, n.__dict__) - self.assertEqual(job.func.__name__, 'div') - self.assertEqual(job.args, (4,)) - - def test_dequeue_class_method(self): - """Dequeueing class method jobs from queues.""" - q = Queue() - q.enqueue(Number.divide, 3, 4) - - job = q.dequeue() - - self.assertEqual(job.instance.__dict__, Number.__dict__) - self.assertEqual(job.func.__name__, 'divide') - self.assertEqual(job.args, (3, 4)) - - def test_dequeue_ignores_nonexisting_jobs(self): - """Dequeuing silently ignores non-existing jobs.""" - - q = Queue() - uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' - q.push_job_id(uuid) - q.push_job_id(uuid) - result = q.enqueue(say_hello, 'Nick', foo='bar') - q.push_job_id(uuid) - - # Dequeue simply ignores the missing job and returns None - self.assertEqual(q.count, 4) - self.assertEqual(q.dequeue().id, result.id) - self.assertIsNone(q.dequeue()) - self.assertEqual(q.count, 0) - def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -319,22 +251,26 @@ class TestQueue(RQTestCase): self.assertEqual(job.meta['foo'], 'bar') self.assertEqual(job.meta['baz'], 42) + def test_enqueue_with_failure_ttl(self): + """enqueue() properly sets job.failure_ttl""" + q = Queue() + job = q.enqueue(say_hello, failure_ttl=10) + job.refresh() + self.assertEqual(job.failure_ttl, 10) + def test_job_timeout(self): """Timeout can be passed via job_timeout argument""" queue = Queue() - job = queue.enqueue(echo, 1, timeout=15) - self.assertEqual(job.timeout, 15) - job = queue.enqueue(echo, 1, job_timeout=15) self.assertEqual(job.timeout, 15) - + def test_default_timeout(self): """Timeout can be passed via job_timeout argument""" queue = Queue() job = queue.enqueue(echo, 1) self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT) - - job = Job.create(func=echo) + + job = Job.create(func=echo) job = queue.enqueue_job(job) self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT) @@ -342,7 +278,7 @@ class TestQueue(RQTestCase): job = queue.enqueue(echo, 1) self.assertEqual(job.timeout, 15) - job = Job.create(func=echo) + job = Job.create(func=echo) job = queue.enqueue_job(job) self.assertEqual(job.timeout, 15) @@ -572,160 +508,3 @@ class TestQueue(RQTestCase): job_fetch = q1.fetch_job(job_orig.id) self.assertIsNotNone(job_fetch) - - -class TestFailedQueue(RQTestCase): - def test_get_failed_queue(self): - """Use custom job class""" - class CustomJob(Job): - pass - failed_queue = get_failed_queue(job_class=CustomJob) - self.assertIs(failed_queue.job_class, CustomJob) - - failed_queue = get_failed_queue(job_class='rq.job.Job') - self.assertIsNot(failed_queue.job_class, CustomJob) - - def test_requeue_job(self): - """Requeueing existing jobs.""" - job = Job.create(func=div_by_zero, args=(1, 2, 3)) - job.origin = 'fake' - job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa - - self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa - self.assertEqual(get_failed_queue().count, 1) - - requeued_job = get_failed_queue().requeue(job.id) - - self.assertEqual(get_failed_queue().count, 0) - self.assertEqual(Queue('fake').count, 1) - self.assertEqual(requeued_job.origin, job.origin) - - def test_get_job_on_failed_queue(self): - default_queue = Queue() - failed_queue = get_failed_queue() - - job = default_queue.enqueue(div_by_zero, args=(1, 2, 3)) - - job_on_default_queue = default_queue.fetch_job(job.id) - job_on_failed_queue = failed_queue.fetch_job(job.id) - - self.assertIsNotNone(job_on_default_queue) - self.assertIsNone(job_on_failed_queue) - - job.set_status(JobStatus.FAILED) - - job_on_default_queue = default_queue.fetch_job(job.id) - job_on_failed_queue = failed_queue.fetch_job(job.id) - - self.assertIsNotNone(job_on_default_queue) - self.assertIsNotNone(job_on_failed_queue) - self.assertTrue(job_on_default_queue.is_failed) - - def test_requeue_nonfailed_job_fails(self): - """Requeueing non-failed jobs raises error.""" - q = Queue() - job = q.enqueue(say_hello, 'Nick', foo='bar') - - # Assert that we cannot requeue a job that's not on the failed queue - with self.assertRaises(InvalidJobOperationError): - get_failed_queue().requeue(job.id) - - def test_quarantine_preserves_timeout(self): - """Quarantine preserves job timeout.""" - job = Job.create(func=div_by_zero, args=(1, 2, 3)) - job.origin = 'fake' - job.timeout = 200 - job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) - - self.assertEqual(job.timeout, 200) - - def test_requeueing_preserves_timeout(self): - """Requeueing preserves job timeout.""" - job = Job.create(func=div_by_zero, args=(1, 2, 3)) - job.origin = 'fake' - job.timeout = 200 - job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) - get_failed_queue().requeue(job.id) - - job = Job.fetch(job.id) - self.assertEqual(job.timeout, 200) - - def test_requeue_sets_status_to_queued(self): - """Requeueing a job should set its status back to QUEUED.""" - job = Job.create(func=div_by_zero, args=(1, 2, 3)) - job.save() - get_failed_queue().quarantine(job, Exception('Some fake error')) - get_failed_queue().requeue(job.id) - - job = Job.fetch(job.id) - self.assertEqual(job.get_status(), JobStatus.QUEUED) - - def test_enqueue_preserves_result_ttl(self): - """Enqueueing persists result_ttl.""" - q = Queue() - job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10) - self.assertEqual(job.result_ttl, 10) - job_from_queue = Job.fetch(job.id, connection=self.testconn) - self.assertEqual(int(job_from_queue.result_ttl), 10) - - def test_async_false(self): - """Job executes and cleaned up immediately if is_async=False.""" - q = Queue(is_async=False) - job = q.enqueue(some_calculation, args=(2, 3)) - self.assertEqual(job.return_value, 6) - self.assertNotEqual(self.testconn.ttl(job.key), -1) - - def test_is_async(self): - """Queue exposes is_async as a property.""" - inline_queue = Queue(is_async=False) - self.assertFalse(inline_queue.is_async) - async_queue = Queue(is_async=True) - self.assertTrue(async_queue.is_async) - - def test_custom_job_class(self): - """Ensure custom job class assignment works as expected.""" - q = Queue(job_class=CustomJob) - self.assertEqual(q.job_class, CustomJob) - - def test_skip_queue(self): - """Ensure the skip_queue option functions""" - q = Queue('foo') - job1 = q.enqueue(say_hello) - job2 = q.enqueue(say_hello) - assert q.dequeue() == job1 - skip_job = q.enqueue(say_hello, at_front=True) - assert q.dequeue() == skip_job - assert q.dequeue() == job2 - - def test_job_deletion(self): - """Ensure job.delete() removes itself from FailedQueue.""" - job = Job.create(func=div_by_zero, args=(1, 2, 3)) - job.origin = 'fake' - job.timeout = 200 - job.save() - - job.set_status(JobStatus.FAILED) - - failed_queue = get_failed_queue() - failed_queue.quarantine(job, Exception('Some fake error')) - - self.assertTrue(job.id in failed_queue.get_job_ids()) - - job.delete() - self.assertFalse(job.id in failed_queue.get_job_ids()) - - def test_job_in_failed_queue_persists(self): - """Make sure failed job key does not expire""" - q = Queue('foo') - job = q.enqueue(div_by_zero, args=(1,), ttl=5) - self.assertEqual(self.testconn.ttl(job.key), 5) - - self.assertRaises(ZeroDivisionError, job.perform) - job.set_status(JobStatus.FAILED) - failed_queue = get_failed_queue() - failed_queue.quarantine(job, Exception('Some fake error')) - - self.assertEqual(self.testconn.ttl(job.key), -1) diff --git a/tests/test_registry.py b/tests/test_registry.py index ed2dbd2..eb651dc 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,12 +2,15 @@ from __future__ import absolute_import from rq.compat import as_text -from rq.job import Job, JobStatus -from rq.queue import FailedQueue, Queue +from rq.defaults import DEFAULT_FAILURE_TTL +from rq.exceptions import InvalidJobOperation +from rq.job import Job, JobStatus, requeue_job +from rq.queue import Queue from rq.utils import current_timestamp from rq.worker import Worker from rq.registry import (clean_registries, DeferredJobRegistry, - FinishedJobRegistry, StartedJobRegistry) + FailedJobRegistry, FinishedJobRegistry, + StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -41,6 +44,19 @@ class TestRegistry(RQTestCase): registry = StartedJobRegistry(job_class=CustomJob) self.assertFalse(registry.job_class == self.registry.job_class) + def test_contains(self): + registry = StartedJobRegistry(connection=self.testconn) + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + self.assertFalse(job in registry) + self.assertFalse(job.id in registry) + + registry.add(job, 5) + + self.assertTrue(job in registry) + self.assertTrue(job.id in registry) + def test_add_and_remove(self): """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() @@ -78,23 +94,22 @@ class TestRegistry(RQTestCase): self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), ['foo', 'bar']) - def test_cleanup(self): - """Moving expired jobs to FailedQueue.""" - failed_queue = FailedQueue(connection=self.testconn) - self.assertTrue(failed_queue.is_empty()) - + def test_cleanup_moves_jobs_to_failed_job_registry(self): + """Moving expired jobs to FailedJobRegistry.""" queue = Queue(connection=self.testconn) + failed_job_registry = FailedJobRegistry(connection=self.testconn) job = queue.enqueue(say_hello) self.testconn.zadd(self.registry.key, {job.id: 2}) + # Job has not been moved to FailedJobRegistry self.registry.cleanup(1) - self.assertNotIn(job.id, failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2) + self.assertNotIn(job, failed_job_registry) + self.assertIn(job, self.registry) self.registry.cleanup() - self.assertIn(job.id, failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None) + self.assertIn(job.id, failed_job_registry) + self.assertNotIn(job, self.registry) job.refresh() self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -158,9 +173,22 @@ class TestRegistry(RQTestCase): started_job_registry = StartedJobRegistry(connection=self.testconn) self.testconn.zadd(started_job_registry.key, {'foo': 1}) + failed_job_registry = FailedJobRegistry(connection=self.testconn) + self.testconn.zadd(failed_job_registry.key, {'foo': 1}) + clean_registries(queue) self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0) + + def test_get_queue(self): + """registry.get_queue() returns the right Queue object.""" + registry = StartedJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_queue(), Queue(connection=self.testconn)) + + registry = StartedJobRegistry('foo', connection=self.testconn) + self.assertEqual(registry.get_queue(), + Queue('foo', connection=self.testconn)) class TestFinishedJobRegistry(RQTestCase): @@ -225,7 +253,7 @@ class TestDeferredRegistry(RQTestCase): self.assertEqual(job_ids, [job.id]) def test_register_dependency(self): - """Ensure job creation and deletion works properly with DeferredJobRegistry.""" + """Ensure job creation and deletion works with DeferredJobRegistry.""" queue = Queue(connection=self.testconn) job = queue.enqueue(say_hello) job2 = queue.enqueue(say_hello, depends_on=job) @@ -236,3 +264,119 @@ class TestDeferredRegistry(RQTestCase): # When deleted, job removes itself from DeferredJobRegistry job2.delete() self.assertEqual(registry.get_job_ids(), []) + + +class TestFailedJobRegistry(RQTestCase): + + def test_default_failure_ttl(self): + """Job TTL defaults to DEFAULT_FAILURE_TTL""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + registry = FailedJobRegistry(connection=self.testconn) + key = registry.key + + timestamp = current_timestamp() + registry.add(job) + self.assertLess( + self.testconn.zscore(key, job.id), + timestamp + DEFAULT_FAILURE_TTL + 2 + ) + self.assertGreater( + self.testconn.zscore(key, job.id), + timestamp + DEFAULT_FAILURE_TTL - 2 + ) + + timestamp = current_timestamp() + ttl = 5 + registry.add(job, ttl=5) + self.assertLess( + self.testconn.zscore(key, job.id), + timestamp + ttl + 2 + ) + self.assertGreater( + self.testconn.zscore(key, job.id), + timestamp + ttl - 2 + ) + + def test_requeue(self): + """FailedJobRegistry.requeue works properly""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(div_by_zero, failure_ttl=5) + + worker = Worker([queue]) + worker.work(burst=True) + + registry = FailedJobRegistry(connection=worker.connection) + self.assertTrue(job in registry) + + registry.requeue(job.id) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # Should also work with job instance + registry.requeue(job) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # requeue_job should work the same way + requeue_job(job.id, connection=self.testconn) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # And so does job.requeue() + job.requeue() + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + def test_invalid_job(self): + """Requeuing a job that's not in FailedJobRegistry raises an error.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + registry = FailedJobRegistry(connection=self.testconn) + with self.assertRaises(InvalidJobOperation): + registry.requeue(job) + + def test_worker_handle_job_failure(self): + """Failed jobs are added to FailedJobRegistry""" + q = Queue(connection=self.testconn) + + w = Worker([q]) + registry = FailedJobRegistry(connection=w.connection) + + timestamp = current_timestamp() + + job = q.enqueue(div_by_zero, failure_ttl=5) + w.handle_job_failure(job) + # job is added to FailedJobRegistry with default failure ttl + self.assertIn(job.id, registry.get_job_ids()) + self.assertLess(self.testconn.zscore(registry.key, job.id), + timestamp + DEFAULT_FAILURE_TTL + 5) + + # job is added to FailedJobRegistry with specified ttl + job = q.enqueue(div_by_zero, failure_ttl=5) + w.handle_job_failure(job) + self.assertLess(self.testconn.zscore(registry.key, job.id), + timestamp + 7) diff --git a/tests/test_sentry.py b/tests/test_sentry.py index 7fa7bcb..085b086 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -2,10 +2,17 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from rq import get_failed_queue, Queue, Worker +from rq import Queue +from rq.cli import main +from rq.cli.helpers import read_config_file from rq.contrib.sentry import register_sentry +from rq.worker import SimpleWorker from tests import RQTestCase +from tests.fixtures import div_by_zero + +import mock +from click.testing import CliRunner class FakeSentry(object): @@ -17,20 +24,33 @@ class FakeSentry(object): class TestSentry(RQTestCase): - def test_work_fails(self): - """Non importable jobs should be put on the failed queue event with sentry""" - q = Queue() - failed_q = get_failed_queue() - - # Action - q.enqueue('_non.importable.job') - self.assertEqual(q.count, 1) - - w = Worker([q]) - register_sentry(FakeSentry(), w) - - w.work(burst=True) - - # Postconditions - self.assertEqual(failed_q.count, 1) - self.assertEqual(q.count, 0) + def setUp(self): + super(TestSentry, self).setUp() + db_num = self.testconn.connection_pool.connection_kwargs['db'] + self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + + def test_reading_dsn_from_file(self): + settings = read_config_file('tests.config_files.sentry') + self.assertIn('SENTRY_DSN', settings) + self.assertEqual(settings['SENTRY_DSN'], 'https://123@sentry.io/123') + + @mock.patch('rq.contrib.sentry.register_sentry') + def test_cli_flag(self, mocked): + """rq worker -u -b --exception-handler """ + # connection = Redis.from_url(self.redis_url) + runner = CliRunner() + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', + '--sentry-dsn', 'https://1@sentry.io/1']) + self.assertEqual(mocked.call_count, 1) + + def test_failure_capture(self): + """Test failure is captured by Sentry SDK""" + from sentry_sdk import Hub + hub = Hub.current + self.assertIsNone(hub.last_event_id()) + queue = Queue(connection=self.testconn) + queue.enqueue(div_by_zero) + worker = SimpleWorker(queues=[queue], connection=self.testconn) + register_sentry('https://123@sentry.io/123') + worker.work(burst=True) + self.assertIsNotNone(hub.last_event_id()) diff --git a/tests/test_worker.py b/tests/test_worker.py index acd3ffa..007c653 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -27,11 +27,10 @@ from tests.fixtures import ( modify_self_and_error, long_running_job, save_key_ttl ) -from rq import (get_failed_queue, Queue, SimpleWorker, Worker, - get_current_connection) +from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.compat import as_text, PY2 from rq.job import Job, JobStatus -from rq.registry import StartedJobRegistry +from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.suspension import resume, suspend from rq.utils import utcnow from rq.worker import HerokuWorker, WorkerStatus @@ -147,6 +146,8 @@ class TestWorker(RQTestCase): Worker.find_by_key(worker.key) self.assertFalse(worker.key in Worker.all_keys(worker.connection)) + self.assertRaises(ValueError, Worker.find_by_key, 'foo') + def test_worker_ttl(self): """Worker ttl.""" w = Worker([]) @@ -197,17 +198,14 @@ class TestWorker(RQTestCase): ) def test_work_is_unreadable(self): - """Unreadable jobs are put on the failed queue.""" + """Unreadable jobs are put on the failed job registry.""" q = Queue() - failed_q = get_failed_queue() - - self.assertEqual(failed_q.count, 0) self.assertEqual(q.count, 0) # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. - job = Job.create(func=div_by_zero, args=(3,)) + job = Job.create(func=div_by_zero, args=(3,), origin=q.name) job.save() job_data = job.data @@ -225,16 +223,21 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) # should silently pass self.assertEqual(q.count, 0) - self.assertEqual(failed_q.count, 1) + + failed_job_registry = FailedJobRegistry(queue=q) + self.assertTrue(job in failed_job_registry) def test_heartbeat(self): """Heartbeat saves last_heartbeat""" q = Queue() w = Worker([q]) w.register_birth() - birth = self.testconn.hget(w.key, 'birth') + + self.assertEqual(str(w.pid), as_text(self.testconn.hget(w.key, 'pid'))) + self.assertEqual(w.hostname, + as_text(self.testconn.hget(w.key, 'hostname'))) last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat') - self.assertTrue(birth is not None) + self.assertIsNotNone(self.testconn.hget(w.key, 'birth')) self.assertTrue(last_heartbeat is not None) w = Worker.find_by_key(w.key) self.assertIsInstance(w.last_heartbeat, datetime) @@ -268,10 +271,6 @@ class TestWorker(RQTestCase): def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue() - failed_q = get_failed_queue() - - # Preconditions - self.assertEqual(failed_q.count, 0) self.assertEqual(q.count, 0) # Action @@ -286,7 +285,8 @@ class TestWorker(RQTestCase): # Postconditions self.assertEqual(q.count, 0) - self.assertEqual(failed_q.count, 1) + failed_job_registry = FailedJobRegistry(queue=q) + self.assertTrue(job in failed_job_registry) self.assertEqual(w.get_current_job_id(), None) # Check the job @@ -296,65 +296,119 @@ class TestWorker(RQTestCase): # Should be the original enqueued_at date, not the date of enqueueing # to the failed queue self.assertEqual(str(job.enqueued_at), enqueued_at_date) - self.assertIsNotNone(job.exc_info) # should contain exc_info + self.assertTrue(job.exc_info) # should contain exc_info def test_statistics(self): """Successful and failed job counts are saved properly""" - q = Queue() - job = q.enqueue(div_by_zero) - w = Worker([q]) - w.register_birth() + queue = Queue() + job = queue.enqueue(div_by_zero) + worker = Worker([queue]) + worker.register_birth() - self.assertEqual(w.failed_job_count, 0) - self.assertEqual(w.successful_job_count, 0) - self.assertEqual(w.total_working_time, 0) + self.assertEqual(worker.failed_job_count, 0) + self.assertEqual(worker.successful_job_count, 0) + self.assertEqual(worker.total_working_time, 0) - registry = StartedJobRegistry(connection=w.connection) + registry = StartedJobRegistry(connection=worker.connection) job.started_at = utcnow() job.ended_at = job.started_at + timedelta(seconds=0.75) - w.handle_job_failure(job) - w.handle_job_success(job, q, registry) + worker.handle_job_failure(job) + worker.handle_job_success(job, queue, registry) + + worker.refresh() + self.assertEqual(worker.failed_job_count, 1) + self.assertEqual(worker.successful_job_count, 1) + self.assertEqual(worker.total_working_time, 1.5) # 1.5 seconds + + worker.handle_job_failure(job) + worker.handle_job_success(job, queue, registry) + + worker.refresh() + self.assertEqual(worker.failed_job_count, 2) + self.assertEqual(worker.successful_job_count, 2) + self.assertEqual(worker.total_working_time, 3.0) + + def test_total_working_time(self): + """worker.total_working_time is stored properly""" + queue = Queue() + job = queue.enqueue(long_running_job, 0.05) + worker = Worker([queue]) + worker.register_birth() - w.refresh() - self.assertEqual(w.failed_job_count, 1) - self.assertEqual(w.successful_job_count, 1) - self.assertEqual(w.total_working_time, 1500000) # 1.5 seconds in microseconds + worker.perform_job(job, queue) + worker.refresh() + # total_working_time should be around 0.05 seconds + self.assertTrue(0.05 <= worker.total_working_time < 0.06) - w.handle_job_failure(job) - w.handle_job_success(job, q, registry) + def test_disable_default_exception_handler(self): + """ + Job is not moved to FailedJobRegistry when default custom exception + handler is disabled. + """ + queue = Queue(name='default', connection=self.testconn) - w.refresh() - self.assertEqual(w.failed_job_count, 2) - self.assertEqual(w.successful_job_count, 2) - self.assertEqual(w.total_working_time, 3000000) + job = queue.enqueue(div_by_zero) + worker = Worker([queue], disable_default_exception_handler=False) + worker.work(burst=True) + + registry = FailedJobRegistry(queue=queue) + self.assertTrue(job in registry) + + # Job is not added to FailedJobRegistry if + # disable_default_exception_handler is True + job = queue.enqueue(div_by_zero) + worker = Worker([queue], disable_default_exception_handler=True) + worker.work(burst=True) + self.assertFalse(job in registry) def test_custom_exc_handling(self): """Custom exception handling.""" + + def first_handler(job, *exc_info): + job.meta = {'first_handler': True} + job.save_meta() + return True + + def second_handler(job, *exc_info): + job.meta.update({'second_handler': True}) + job.save_meta() + def black_hole(job, *exc_info): # Don't fall through to default behaviour (moving to failed queue) return False q = Queue() - failed_q = get_failed_queue() - - # Preconditions - self.assertEqual(failed_q.count, 0) self.assertEqual(q.count, 0) + job = q.enqueue(div_by_zero) + + w = Worker([q], exception_handlers=first_handler) + w.work(burst=True) + + # Check the job + job.refresh() + self.assertEqual(job.is_failed, True) + self.assertTrue(job.meta['first_handler']) - # Action job = q.enqueue(div_by_zero) - self.assertEqual(q.count, 1) + w = Worker([q], exception_handlers=[first_handler, second_handler]) + w.work(burst=True) - w = Worker([q], exception_handlers=black_hole) - w.work(burst=True) # should silently pass + # Both custom exception handlers are run + job.refresh() + self.assertEqual(job.is_failed, True) + self.assertTrue(job.meta['first_handler']) + self.assertTrue(job.meta['second_handler']) - # Postconditions - self.assertEqual(q.count, 0) - self.assertEqual(failed_q.count, 0) + job = q.enqueue(div_by_zero) + w = Worker([q], exception_handlers=[first_handler, black_hole, + second_handler]) + w.work(burst=True) - # Check the job - job = Job.fetch(job.id) + # second_handler is not run since it's interrupted by black_hole + job.refresh() self.assertEqual(job.is_failed, True) + self.assertTrue(job.meta['first_handler']) + self.assertEqual(job.meta.get('second_handler'), None) def test_cancelled_jobs_arent_executed(self): """Cancelling jobs.""" @@ -691,6 +745,12 @@ class TestWorker(RQTestCase): self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0) + # worker.clean_registries() only runs once every 15 minutes + # If we add another key, calling clean_registries() should do nothing + self.testconn.zadd(bar_registry.key, {'bar': 1}) + worker.clean_registries() + self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + def test_should_run_maintenance_tasks(self): """Workers should run maintenance tasks on startup and every hour.""" queue = Queue(connection=self.testconn) @@ -771,7 +831,6 @@ class TestWorker(RQTestCase): the job itself persists completely through the queue/worker/job stack -- even if the job errored""" q = Queue() - failed_q = get_failed_queue() # Also make sure that previously existing metadata # persists properly job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42}, @@ -782,7 +841,8 @@ class TestWorker(RQTestCase): # Postconditions self.assertEqual(q.count, 0) - self.assertEqual(failed_q.count, 1) + failed_job_registry = FailedJobRegistry(queue=q) + self.assertTrue(job in failed_job_registry) self.assertEqual(w.get_current_job_id(), None) job_check = Job.fetch(job.id) @@ -927,8 +987,6 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): completing the job) should set the job's status to FAILED """ fooq = Queue('foo') - failed_q = get_failed_queue() - self.assertEqual(failed_q.count, 0) self.assertEqual(fooq.count, 0) w = Worker(fooq) sentinel_file = '/tmp/.rq_sentinel_work_horse_death' @@ -943,7 +1001,8 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): job_status = job.get_status() p.join(1) self.assertEqual(job_status, JobStatus.FAILED) - self.assertEqual(failed_q.count, 1) + failed_job_registry = FailedJobRegistry(queue=fooq) + self.assertTrue(job in failed_job_registry) self.assertEqual(fooq.count, 0) @@ -966,18 +1025,20 @@ class TestWorkerSubprocess(RQTestCase): def test_run_access_self(self): """Schedule a job, then run the worker as subprocess""" q = Queue() - q.enqueue(access_self) + job = q.enqueue(access_self) subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) - assert get_failed_queue().count == 0 + registry = FinishedJobRegistry(queue=q) + self.assertTrue(job in registry) assert q.count == 0 @skipIf('pypy' in sys.version.lower(), 'often times out with pypy') def test_run_scheduled_access_self(self): """Schedule a job that schedules a job, then run the worker as subprocess""" q = Queue() - q.enqueue(schedule_access_self) + job = q.enqueue(schedule_access_self) subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) - assert get_failed_queue().count == 0 + registry = FinishedJobRegistry(queue=q) + self.assertTrue(job in registry) assert q.count == 0 @@ -1082,7 +1143,6 @@ class TestExceptionHandlerMessageEncoding(RQTestCase): super(TestExceptionHandlerMessageEncoding, self).setUp() self.worker = Worker("foo") self.worker._exc_handlers = [] - self.worker.failed_queue = Mock() # Mimic how exception info is actually passed forwards try: raise Exception(u"💪") @@ -1092,7 +1152,3 @@ class TestExceptionHandlerMessageEncoding(RQTestCase): def test_handle_exception_handles_non_ascii_in_exception_message(self): """worker.handle_exception doesn't crash on non-ascii in exception message.""" self.worker.handle_exception(Mock(), *self.exc_info) - - def test_move_to_failed_queue_handles_non_ascii_in_exception_message(self): - """Test that move_to_failed_queue doesn't crash on non-ascii in exception message.""" - self.worker.move_to_failed_queue(Mock(), *self.exc_info) diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py index 1fe6612..177dc7b 100644 --- a/tests/test_worker_registration.py +++ b/tests/test_worker_registration.py @@ -1,7 +1,8 @@ from tests import RQTestCase from rq import Queue, Worker -from rq.worker_registration import (get_keys, register, unregister, +from rq.worker_registration import (clean_worker_registry, get_keys, register, + unregister, REDIS_WORKER_KEYS, WORKERS_BY_QUEUE_KEY) @@ -17,12 +18,15 @@ class TestWorkerRegistry(RQTestCase): redis = worker.connection self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) + self.assertEqual(Worker.count(connection=redis), 1) self.assertTrue( redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) ) + self.assertEqual(Worker.count(queue=foo_queue), 1) self.assertTrue( redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) ) + self.assertEqual(Worker.count(queue=bar_queue), 1) unregister(worker) self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) @@ -68,3 +72,18 @@ class TestWorkerRegistry(RQTestCase): unregister(worker1) unregister(worker2) unregister(worker3) + + def test_clean_registry(self): + """clean_registry removes worker keys that don't exist in Redis""" + queue = Queue(name='foo') + worker = Worker([queue]) + + register(worker) + redis = worker.connection + + self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) + self.assertTrue(redis.sismember(REDIS_WORKER_KEYS, worker.key)) + + clean_worker_registry(queue) + self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) + self.assertFalse(redis.sismember(REDIS_WORKER_KEYS, worker.key))