200 Commits (2faba2cea9bb23e635bd067c85ecabd7fdd22cce)

Author SHA1 Message Date
Selwin Ong 37ddcb51cd
Reliable queue (#1911)
* Use lmove() when working on a single queue

* Skip reliable queue tests if Redis server doesn't support LMOVE

* Better test coverage

* job.origin should be string

* Added test for job that gets orphaned if worker.execute_job() fails

* Fix job tests

* worker.run_maintenance_tasks() now cleans intermediate queues

* Fixed import ordering

* No need to run slow tests and flake8 on SSL tests

* Minor typing fixes

* Fixed linting
2 years ago
Rob Hudson ea063edf0a
Update linting configuration (#1915)
* Update linting configuration

This removes flake8 in favor of ruff, which also provides isort support, and
updates all files to be black, isort, and ruff compliant. This also adds black
and ruff checks to the tox and Github linting workflow.

* Tweak the code coverage config and calls
2 years ago
gabriels1234 07fef85dd2
Catch serializer TypeError Exception (#1872)
* Catch serializer TypeError Exception

* Add test for unserializable job.meta
2 years ago
Selwin Ong 64cb1a27b9
Worker pool (#1874)
* First stab at implementating worker pool

* Use process.is_alive() to check whether a process is still live

* Handle shutdown signal

* Check worker loop done

* First working version of `WorkerPool`.

* Added test for check_workers()

* Added test for pool.start()

* Better shutdown process

* Comment out test_start() to see if it fixes CI

* Make tests pass

* Make CI pass

* Comment out some tests

* Comment out more tests

* Re-enable a test

* Re-enable another test

* Uncomment check_workers test

* Added run_worker test

* Minor modification to dead worker detection

* More test cases

* Better process name for workers

* Added back pool.stop_workers() when signal is received

* Cleaned up cli.py

* WIP on worker-pool command

* Fix test

* Test that worker pool ignores consecutive shutdown signals

* Added test for worker-pool CLI command.

* Added timeout to CI jobs

* Fix worker pool test

* Comment out test_scheduler.py

* Fixed worker-pool in burst mode

* Increase test coverage

* Exclude tests directory from coverage.py

* Improve test coverage

* Renamed `Pool(num_workers=2) to `Pool(size=2)`

* Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`"

This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b.

* Renamed Pool to WorkerPool

* Added a new TestCase that doesn't use LocalStack

* Added job_class, worker_class and serializer arguments to WorkerPool

* Use parse_connection() in WorkerPool.__init__

* Added CLI arguments for worker-pool

* Minor WorkerPool and test fixes

* Fixed failing CLI test

* Document WorkerPool
2 years ago
lowercase00 654649743c
New dequeue strategy (#1806)
* New dequeue strategy

This implements a new parameter `dequeue_strategy` that
should replace the `RoundRobinWorker` and `RandomWorker`.
Changes includes: feature, docs, tests, deprecation warning.

* Fix dequeue strategy name

* Black & Fix warning

* feat: tests, warnings, refactor naming

* feat: improve worker check

* fix: revert to str subclass

* fix: dequeue strategy into bootstrap

* org: move DequeueStrategy to worker

* refactor: round robin naming

* fix: naming

* fix: type annotation

* fix: typo

* refactor: remove kwarg from worker's init

* fix: typo

* move `dequeue_strategy` from `bootstrap()` into `work()`
2 years ago
Rony Lutsky aedc9b9e06
Worker - max_idle_time feature (#1795)
* fix accessing None when dequeued result is None (burst=True, or timeout=None)

* add a test

* implement + tests

* fix if

* adjust test

* merge

* test

* test

* merge master

* take max_idle_time into account for dequeue_timeout

* refactor a bit

* potential bug fix

* tests

* math.ceil

* buffer tests
2 years ago
Rony Lutsky 41406db3eb
Configurable maintenance task interval (#1823)
* Configurable maintenance task interval

* pass to worker

* rename parameter

* rename

* rename

* test
2 years ago
Rony Lutsky bba781d288
Enhance worker termination logic (#1729)
* enhance worker termination logic and allow passing custom exc_info in failure callback

* handle ret_val None

* fix unbound variable

* typing

* docs

* Update exceptions.md

* rename

* typing

* rename

* Update exceptions.md

* revert test change
2 years ago
Rony Lutsky b69ee10cbb
Fix - Use worker TTL for timeout (#1794)
* Use worker TTL for timeout

* add test

* renames

* test

* use dequeue_timeout
2 years ago
Rony Lutsky 54db2fa8d1
Fix - TypeError - accessing None when dequeued result is None (when timeout=None, e.g. in burst mode) (#1793)
* fix accessing None when dequeued result is None (burst=True, or timeout=None)

* add a test

* pr fix

* fix tests
2 years ago
lowercase00 c2e6d95338
Enhanced Redis Connection Reliability (#1753)
* Enhanced Redis Connection Reliability

The Redis connection may fail for several reasons. As the connection can be
(1) explicitly passed to the worker or (2) implicity set, this will improve the
Connection configuration by setting a timeout to the socket, and adding
an ExponentialBackoff Retry logic.

* Simpler Connection logic

* Add simple retry logic to Redis Connection Error

* Make retry exponential, add keepalive & socket_connect_timeout

* Handles configuration on Redis' connection pool

* Simplifies timeout exception logic

* Fix burst bug, add test

* Add docs related to `socket_timeout`, improve compatibility with older RedisPy versions

* Fixes

* New timeout private method

* Fix timeout
2 years ago
lowercase00 6813e5a2ba
Remove compatibility layer for < 3.5 (#1761)
* Remove unused code from compat module

* Remove unused dictconfig

* Remove total_ordering compat layer

* Remove compatibility layer

This completely removes the compat module. It moves utilities
functions (`as_text` and `decode_redis_hash`) to the `utils`
module, is eliminates the use of the proxies `text_type` and
`string_types`, using the `str` construct directly.

* Remove compat module

Finishes the cleaning of the compatibility module.
The last function being the `is_python_version` function
which was being used internally.

* Fix old import

* Fix Imports

* Remove Dummy (Force GH Actions)

* Fix Imports

* Organize Imports
2 years ago
eswolinsky3241 5119716911
Persist worker_name after job is finished (#1730)
* Persist worker_name after job is finished

Persisting the worker_name on the job object in Redis would allow for debugging and
analyzing logs from the worker

* Remove redundent job.save() method call

* Remove check for null worker

Now that worker name is persisted after job finishes or fails,
no need to assert that worker name is None

* Change github runner to Ubuntu 20.04

* Change github runner to Ubuntu 20.04
2 years ago
Selwin Ong 0691b4d46e
Multiple results using Redis Streams (#1725)
* WIP job results

* Result can now be saved

* Successfully saved and restored result

* result.save() should accept pipeline

* Successful results are saved

* Failures are now saved properly too.

* Added test for Result.get_latest()

* Checkpoint

* Got Result.all() to work

* Added Result.count(), Result.delete()

* Backward compatibility for job.result and job.exc_info

* Added some typing

* More typing stuff

* Fixed typing in job.py

* More typing updates

* Only keep the last 10 results

* Documented job.results()

* Got results test to pass

* Don't run test_results.py on Redis server < 5.0

* Fixed mock import on some Python versions

* Remove Redis 3 from test matrix

* Jobs should never use the new Result implementation if server is < 5.0

* Results should only be created is Redis stream is supported.

* Added back Redis 3 to test matrix

* Fixed job.supports_redis_streams

* Fixed worker test

* Updated docs.
2 years ago
lowercase00 375ace1747
Typing (#1698)
* Gitignore Venv + VScode

* Add Typings, Add Test to Makefile

* Fix, More typing, Redis Pipeline specific type

* More types

* Fix 3.7- Typing Compat, Add Tox Envs, Tests Dockerfile

* fix listindex error (#1700)

* More docstrings

* More Types

* Fix Typo on Dependency

* Last Types

Co-authored-by: Burak Yılmaz <46003469+yilmaz-burak@users.noreply.github.com>
2 years ago
Yang Yang 9db728921d
Improve the lint situation (#1688)
* Move common flake8 options into config file

Currently --max-line-length being specified in two places. Just use the
existing value in the config file as the source of truth.

Move --count and --statistics to config file as well.

* Fix some lints
2 years ago
Selwin Ong 5b95725dc4
Dependency with failures (#1681)
* added Dependency class with allow_failures

* Requested changes

* Check type before setting `job.dependency_allow_fail` within `Job.create`

* Set `job.dependency_allow_fail` within `Job.create`

* Added test to ensure persistence of `dependency_allow_fail`

* Removed typing and allow mixed list of ints and Job objects

* Convert dependency_allow_fail boolean to integer during serialization to avoid redis DataError

* Updated `test_multiple_dependencies_are_accepted_and_persisted` test to include `Dependency` cases

* Adding placeholder test to test actual behavior of new `Dependency` usage in `depends_on`

* Updated `test_job_dependency` to include cases using `Dependency`

* Added dependency_allow_fail logic to `Job.restore`

* Renamed `dependency_allow_fail` to a simpler `allow_failure`

* Update docs to add section about the new `Dependency` class and use-case

* Updated `Job.dependencies_are_met` logic to take `FAILED` and `STOPPED` jobs into account when `allow_failure=True`

* Updated `test_job_dependency` test. Still failing with `Dependency` case.

* Fix `allow_failure` type coercion in `Job.restore`

* Re-arrange tests, so default `Dependency.allow_failure` is before explicit `allow_failure=True`

* Fixed Dependency, so it works correctly when allow_failure=True

* Attempt to execute pipeline prior to queueing a failed job's dependents. test_create_and_cancel_job_enqueue_dependents_in_registry test now passes.

* Added `Depedency` test utilizing multiple dependencies

* Removed irrelevant on_success and on_failure keyword arguments in example

* Replaced use of long_running_job

* Add test to verify `Dependency.jobs` contraints

* Suppress connection error in handle_job_failure

* test_dependencies have passed

* All tests pass if enqueue_dependents called without pipeline.watch()

* All tests now pass

* Removed print statements

* Cleanup Dependency implementation

* Renamed job.allow_failure to job.allow_dependency_failures

Co-authored-by: mattchan <mattchan@tencent.com>
Co-authored-by: Mike Hill <mhilluniversal@gmail.com>
2 years ago
Hugo d5175c38da
Drop python2-specific syntax (#1674)
* Drop syntax required only for Python 2

* Drop python2-style super() calls

Co-authored-by: Selwin Ong <selwin.ong@gmail.com>
2 years ago
Hugo 61a4a1720b
Use unittest.mock instead of mock (#1673)
This module has been included in Python itself since 3.3.

Fixes: https://github.com/rq/rq/issues/1646
2 years ago
Xavier Fernandez cd17d17d71
rq.worker: remove useless set_state call in horse (#1618)
* rq.worker: remove useless set_state call in horse

The state should already have been set properly by the worker in
`execute_job`

`prepare_job_execution` is only called by `perform_job` which should only be
called by `main_work_horse`/`fork_work_horse` (themselves only called by `execute_job`).
Let `execute_job` do the bookkeeping.

* worker: update SimpleWorker's state in execute_job
3 years ago
Selwin Ong 0147b30f2b
Fixes a bug that causes leftover job keys when result_ttl=0 (#1591)
* Fixes a bug that causes leftover job keys when result_ttl=0

* Fixed a buggy worker.maintain_heartbeats() behavior

* Fixed a bug in worker.maintain_heartbeats().
3 years ago
Selwin Ong 246d52b977
job.cancel() puts job into CanceledJobRegistry. (#1546)
* job.cancel() puts job into CanceledJobRegistry.

* Improve test coverage
3 years ago
Selwin Ong e8ec07ed61
Minor changes (#1544)
* Added test for job timeout

* Added more debugging statements
3 years ago
Cyrille Lavigne 6fc9454675
Handle deserializing failures gracefully (#1428)
* adds unit test for a deserialization error

This tests that deserialization exceptions are properly logged, and fails in
the manner described in #1422 .

* Catch deserializing errors in Worker.handle_exception()

This fixes #1422 , and makes

tests/test_worker.py::TestWorker::test_deserializing_failure_is_handled

pass.

* made unit test less specific

This is required to get the test to pass under other serializers / other
python versions.

* Added generic DeserializationError

* switched ValueError to DeserializationError in a test

The changed test is creating an invalid job, which now raises
DeserializationError when data is accessed, as opposed to ValueError.
4 years ago
Selwin Ong 5b5cfdf9ab
Jobs that get cleaned up should also be retried (#1467) 4 years ago
Omer Lachish 76ac0afbcd
Cleanup zombie worker leftovers as part of StartedJobRegistry's cleanup() (#1372)
* cleanup jobs that are not really running due to zombie workers

* remove registry entries for zombie jobs

* return only the job ids on cleanup

* test zombie job cleanup

* format code

* rename variable to explain that second element in tuple is expiry, not score

* remove worker_key

* detect zombie jobs using old heartbeats

* reuse get_expired_job_ids

* set score using current_timestamp

* test idle jobs using stale heartbeats

* extract timeout into variable

* move heartbeats into StartedJobRegistry

* use registry.heartbeat in tests

* remove heartbeats when job removed from StartedJobRegistry

* remove idle and expired jobs from both wip and heartbeats set

* send heartbeat_ttl to registry.add

* typo

* revert everything 😶

* only keep job heartbeats as score (and get rid of job timeouts as scores

* calculate heartbeat_ttl in an overrideable function + override it in SimpleWorker + move storing StartedJobRegistry scores to job.heartbeat()

* set heartbeat to monitoring interval for infinite timeouts

* track elapsed_execution_time as part of worker

* reset current job working time when work on a job is done

* persisting the job working time as part of monitoring
4 years ago
Biel Cardona 08ef54dcf4
Workers dequeuing jobs from queues using both Round-Robin and Random strategies (#1425)
* implemented round-robin and random access to queues

* added tests for RoundRobinQueue

* reverted change in gitignore

* removed linebreak

* added tests for random queues

* added documentation for round robin and random queues

* moved round robin strategy to worker

* reverted changes to queue.py

* reverted changes to workers.md

* reverted changes to test_queue

* added tests for RoundRobinWorker and RandomWorker

* added doc for round robin and random workers

* removed f-strings for backward compatibility

* corrected a mistake

* minor changes (code style)

* now using _ordered_queues instead of queues for reordering queues
4 years ago
Adda Satya Ram 11c8631921
Add exception to catch redis connection failure to retry after wait time (#1387)
* add exception catch for redis connection failure

* Add test for connection recovery

* add exponential backoff

* limit worker max connection wait time to 60 seconds

* fix undefined class variable

* fix string formatting issue while printing error log

* cap max connection wait time:better code style

Co-authored-by: corynezin <cory.nezin@gmail.com>
4 years ago
JackBoreczky 016da14723
Fix custom serializer in job fetches (#1381)
* Ensure that the custom serializer defined is passed into the job fetch calls

* add serializer as argument to fetch_many and dequeue_any methods

* add worker test for custom serializer

* move json serializer to serializers.py
4 years ago
Selwin Ong f3e924cdd1
Added job.worker_name (#1375)
* Added job.worker_name

* Fix compatibility with Redis server 3.x

* Document job.worker_name

* Removed some Python 2 compatibility stuff.

* Remove unused codes
4 years ago
Ruslan Mullakhmetov 9adcd7e50c
feat: avoided "zombie" processes after killing work horse (#1348)
* feat: avoided "zombie" processes after killing work horse by setting work horse process group and killing this group

* fixed tests

* tests: added test to check that all workhorse subprocesses are killed

* tests: updated guthub run tests dependencies since they are not using (dev-)requirements.txt

Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
4 years ago
Selwin Ong 01d71c8984
Fixes an issue where retried jobs should not be put in FailedJobRegistry (#1336) 4 years ago
Ruslan Mullakhmetov c2931b45b6
handled unhandled exceptions in horse (#1303)
* handled unhandled exceptions in horse to prevent a job from being silently dropped without going into FailedRegistry

* changes after review

* made sure that work_horse always terminates in a proper way with tests

* minor refactoring

* fix for failing test

* fixes for the other tests

- removed exception handling (done in monitor_work_horse)
- adjusted some tests for the checks that are not relevant anymore

* review suggested changes

* cleanup

Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
4 years ago
Selwin Ong 49b156ecc7
Job retry feature. Docs WIP (#1299)
* Initial implementation of Retry class

* Fixes job.refresh() under Python 3.5

* Remove the use of text_type in job.py

* Retry can be scheduled

* monitor_work_horse() should call handle_job_failure() with queue argument.

* Flake8 fixes

* Added docs for job retries
4 years ago
wevsty 4e1eb97056
Split kill_house() fix issues #1234 (#1300)
* Split kill_house() fix issues #1234

Details View issues #1234

* Removing the catch finally

* rename wait_horse() to wait_for_horse()

* rename wait_horse() to wait_for_horse()

* update test_handle_shutdown_request()

Change test_handle_shutdown_request() exitcode assert

* Restore kill_horse() output

* optimization wait_for_horse()
5 years ago
Selwin Ong 1d8ea8e7a3
Worker key TTLs are set to be a bit longer to account for system hiccups (#1279)
* Worker key TTLs are set to be a bit longer to account for system hiccups

* Fix test_work_horse_force_death
5 years ago
Babatunde Olusola e1cbc3736c
Implement Customizable Serializer Support (#1219)
* Implement Customizable Serializer Support

* Refractor serializer instance methods

* Update tests with other serializers

* Edit function description

* Edit function description

* Raise appropriate exception

* Update tests for better code coverage

* Remove un-used imports and un-necessary code

* Refractor resolve_serializer

* Remove un-necessary alias from imports

* Add documentation

* Refractor tests, improve documentation
5 years ago
Samuel Colvin 4036471203
fixing HerokuWorkerShutdownTestCase after #1194 (#1213) 5 years ago
mr-trouble 5f949f4cef Add a hard kill from the parent process with a 10% increased timeout … (#1169)
* Add a hard kill from the parent process with a 10% increased timeout in case the forked process gets stuck and cannot stop itself.

* Added test for the force kill of the parent process.

* Changed 10% to +1 second, and other misc changes based on review comments.
5 years ago
Selwin Ong baa0cc268a
Job scheduling (#1163)
* First RQScheduler prototype

* WIP job scheduling

* Fixed Python 2.7 tests

* Added ScheduledJobRegistry.get_scheduled_time(job)

* WIP on scheduler's threading mechanism

* Fixed test errors

* Changed scheduler.acquire_locks() to instance method

* Added scheduler.prepare_registries()

* Somewhat working implementation of RQ scheduler

* Only call stop_scheduler if there's a scheduler present

* Use OSError rather than ProcessLookupError for PyPy compatibility

* Added `auto_start` argument to scheduler.acquire_locks()

* Make RQScheduler play better with timezone

* Fixed test error

* Added --with-scheduler flag to rq worker CLI

* Fix tests on Python 2.x

* More Python 2 fixes

* Only call `scheduler.start` if worker is run in non burst mode

* Fixed an issue where running worker with scheduler would fail sometimes

* Make `worker.stop_scheduler()` more resilient to errors

* worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks

* Scheduler can now work with worker in both burst and non burst mode

* Fixed scheduler logging message

* Always log scheduler errors when running

* Improve scheduler error logging message

* Removed testing code

* Scheduler should periodically try to acquire locks for other queues it doesn't have

* Added tests for scheduler.should_reacquire_locks

* Added queue.enqueue_in()

* Fixes queue.enqueue_in() in Python 2.7

* First stab at documenting job scheduling

* Remove unused methods

* Remove Python 2.6 logging compatibility code

* Remove more unused imports

* Added convenience methods to access job registries from queue

* Added test for worker.run_maintenance_tasks()

* Simplify worker.queue_names() and worker.queue_keys()

* Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
Vladimir Protasov 8c34e2b353 Store worker's RQ and Python versions (#1125)
* Store worker version to Redis

* Store worker's Python version to Redis

* Store worker version in __init__ body as suggested in review
5 years ago
Vladimir Protasov b62b9b0727 Fix unreliable test (#1126)
Also make error message more useful in case of future failures.
5 years ago
Selwin Ong d1813cdff9 Fixed test errors caused by _sentry_trace_headers 6 years ago
Selwin Ong f9d42e8a17
Added logging statements to handle_job_success and handle_job_failure (#1112) 6 years ago
Paul Robertson e1c135d4de add the ability to have the worker stop executing after a max amount of jobs (#1094)
* add the ability to have the worker stop executing after a max amount of jobs

* rename to max-jobs

* updated logging messages
6 years ago
Ted Summer 79a6fd7999 Fix timeout adding job to StartedJobRegistry (#1086)
* Fix timeout adding job to StartedJobRegistry

* Fix prepare_job_execution handling neg timeout

* Add test for inf job timeout in StartedJobRegistry

* refactor(worker): simplify checking neg timeout
6 years ago
Selwin Ong c4cbb3af2f
RQ v1.0! (#1059)
* Added FailedJobRegistry.

* Added job.failure_ttl.

* queue.enqueue() now supports failure_ttl

* Added registry.get_queue().

* FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL.

* StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry.

* Failed jobs are now added to FailedJobRegistry.

* Added FailedJobRegistry.requeue()

* Document the new `FailedJobRegistry` and changes in custom exception handler behavior.

* Added worker.disable_default_exception_handler.

* Document --disable-default-exception-handler option.

* Deleted worker.failed_queue.

* Deleted "move_to_failed_queue" exception handler.

* StartedJobRegistry should no longer move jobs to FailedQueue.

* Deleted requeue_job

* Fixed test error.

* Make requeue cli command work with FailedJobRegistry

* Added .pytest_cache to gitignore.

* Custom exception handlers are no longer run in reverse

* Restored requeue_job function

* Removed get_failed_queue

* Deleted FailedQueue

* Updated changelog.

* Document `failure_ttl`

* Updated docs.

* Remove job.status

* Fixed typo in test_registry.py

* Replaced _pipeline() with pipeline()

* FailedJobRegistry no longer fails on redis-py>=3

* Fixes test_clean_registries

* Worker names are now randomized

* Added a note about random worker names in CHANGES.md

* Worker will now stop working when encountering an unhandled exception.

* Worker should reraise SystemExit on cold shutdowns

* Added anchor.js to docs

* Support for Sentry-SDK (#1045)

* Updated RQ to support sentry-sdk

* Document Sentry integration

* Install sentry-sdk before running tests

* Improved rq info CLI command to be more efficient when displaying lar… (#1046)

* Improved rq info CLI command to be more efficient when displaying large number of workers

* Fixed an rq info --by-queue bug

* Fixed worker.total_working_time bug (#1047)

* queue.enqueue() no longer accepts `timeout` argument (#1055)

* Clean worker registry (#1056)

* queue.enqueue() no longer accepts `timeout` argument

* Added clean_worker_registry()

* Show worker hostname and PID on cli (#1058)

* Show worker hostname and PID on cli

* Improve test coverage

* Remove Redis version check when SSL is used

* Bump version to 1.0

* Removed pytest_cache/README.md

* Changed worker logging to use exc_info=True

* Removed unused queue.dequeue()

* Fixed typo in CHANGES.md

* setup_loghandlers() should always call logger.setLevel() if specified
6 years ago
Wolfgang Langner 8fc987dc68 Make logging in worker consitent. (#1030)
Switch some messages from warn to info because it is normal requested bahavior.
6 years ago
Finnci 14db0ecd26 Update/add flag for description logging (#991)
* test workers

* indent

* add docs and add option to the cli

* rename flag for cli

* logging
6 years ago
Samuel Colvin 2f35222ddb skip test_1_sec_shutdown with pypy (#1020)
* skip test_1_sec_shutdown with pypy, fix #1019

* skip all HerokuWorkerShutdownTestCase with pypy
6 years ago