You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

803 lines
32 KiB
Python

import json
import unittest
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
Job scheduling (#1163) * First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
from redis import Redis
from rq import Queue, Retry
from rq.job import Job, JobStatus
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
from rq.registry import (
CanceledJobRegistry,
DeferredJobRegistry,
FailedJobRegistry,
FinishedJobRegistry,
ScheduledJobRegistry,
StartedJobRegistry,
)
from rq.serializers import JSONSerializer
from rq.utils import get_version
from rq.worker import Worker
Multi Dependency Support - Registration & Enqueue Call (#1155) * Multi Dependency Support - Registration & Enqueue Call Internal API changes to support multiple dependencies. * Store all of a job's _dependencies_ in a redis set. Delete that set when a job is deleted. * Add Job#fetch_dependencies method - which return all jobs a job is dependent upon and optionally _WATCHES_ all dependency ids. * Use Job#fetch_dependencies in Queue#call_enqueue. `fetch_dependencies` now sets WATCH and raises InvalidJobDependency, rather than call_enqueue. `Queue` and `Job` public APIs still expect single ids of jobs for `depends_on` but internally register them in a way that could support multiple jobs being passed as dependencies. Next up: need to update Queue#enqueue_dependents * Use existing fetch_many method to get dependencies. Modify fetch_dependencies to use fetch_many. * Remove default value for fetch_many's connection parameter * PR review housekeeping * Remove a duplicate test * Oneline something * Fix missing colon in dependencies key * Delete job key, dependents and dependencies at once * More Fixes From Code Review Updates to Job, Queue and associated tests. * When Checking dependencies Avoid, trip to Redis * When checking the status of a job, we have a 'clean' status of all dependencies(returned from Job#fetch_dependencies) and the job keys are WATCHed, so there's no reason to go back to Redis to get the status _again_. * Looks as though, the `_status` set in `Job#restore` was bytes while it was converted to text(`as_text`) in `Job#get_status` - for consistency(and tests) converting to text in `restore` as well. * In `Queue#enqueue_call`, moved WATCH of dependencies_key to before fetching dependencies. This doesn't really matter but seems more _correct_ - one can imagine some rogue API adding a dependency after they've been fetched but before they've been WATCHEed. * Update Job#get_status to get _local_ status * If refresh=False is passed, don't get status from Redis; return the value of _status. This is to avoid a trip to Redis if the caller can guarantee that the value of `_status` is _clean_. * More Fixups * Expire dependency keys in Job#cleanup * Consistency in Job#fetch_dependencies
5 years ago
from tests import RQTestCase
from tests.fixtures import echo, say_hello
class MultipleDependencyJob(Job):
"""
Allows for the patching of `_dependency_ids` to simulate multi-dependency
support without modifying the public interface of `Job`
"""
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
create_job = Job.create
@classmethod
def create(cls, *args, **kwargs):
dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids')
_job = cls.create_job(*args, **kwargs)
_job._dependency_ids = dependency_ids
return _job
class TestQueue(RQTestCase):
def test_create_queue(self):
"""Creating queues."""
q = Queue('my-queue')
self.assertEqual(q.name, 'my-queue')
self.assertEqual(str(q), '<Queue my-queue>')
def test_create_queue_with_serializer(self):
"""Creating queues with serializer."""
# Test using json serializer
q = Queue('queue-with-serializer', serializer=json)
self.assertEqual(q.name, 'queue-with-serializer')
self.assertEqual(str(q), '<Queue queue-with-serializer>')
self.assertIsNotNone(q.serializer)
def test_create_default_queue(self):
"""Instantiating the default queue."""
q = Queue()
self.assertEqual(q.name, 'default')
def test_equality(self):
"""Mathematical equality of queues."""
q1 = Queue('foo')
q2 = Queue('foo')
q3 = Queue('bar')
self.assertEqual(q1, q2)
self.assertEqual(q2, q1)
self.assertNotEqual(q1, q3)
self.assertNotEqual(q2, q3)
self.assertGreater(q1, q3)
self.assertRaises(TypeError, lambda: q1 == 'some string')
self.assertRaises(TypeError, lambda: q1 < 'some string')
def test_empty_queue(self):
"""Emptying queues."""
q = Queue('example')
self.testconn.rpush('rq:queue:example', 'foo')
self.testconn.rpush('rq:queue:example', 'bar')
self.assertEqual(q.is_empty(), False)
q.empty()
self.assertEqual(q.is_empty(), True)
self.assertIsNone(self.testconn.lpop('rq:queue:example'))
def test_empty_removes_jobs(self):
"""Emptying a queue deletes the associated job objects"""
q = Queue('example')
job = q.enqueue(say_hello)
self.assertTrue(Job.exists(job.id))
q.empty()
self.assertFalse(Job.exists(job.id))
def test_queue_is_empty(self):
"""Detecting empty queues."""
q = Queue('example')
self.assertEqual(q.is_empty(), True)
self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEqual(q.is_empty(), False)
def test_queue_delete(self):
"""Test queue.delete properly removes queue"""
q = Queue('example')
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
self.assertEqual(2, len(q.get_job_ids()))
q.delete()
self.assertEqual(0, len(q.get_job_ids()))
self.assertEqual(False, self.testconn.exists(job.key))
self.assertEqual(False, self.testconn.exists(job2.key))
self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys)))
self.assertEqual(False, self.testconn.exists(q.key))
def test_queue_delete_but_keep_jobs(self):
"""Test queue.delete properly removes queue but keeps the job keys in the redis store"""
q = Queue('example')
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
self.assertEqual(2, len(q.get_job_ids()))
q.delete(delete_jobs=False)
self.assertEqual(0, len(q.get_job_ids()))
self.assertEqual(True, self.testconn.exists(job.key))
self.assertEqual(True, self.testconn.exists(job2.key))
self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys)))
self.assertEqual(False, self.testconn.exists(q.key))
def test_position(self):
"""Test queue.delete properly removes queue but keeps the job keys in the redis store"""
q = Queue('example')
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
job3 = q.enqueue(say_hello)
self.assertEqual(0, q.get_job_position(job.id))
self.assertEqual(1, q.get_job_position(job2.id))
self.assertEqual(2, q.get_job_position(job3))
self.assertEqual(None, q.get_job_position("no_real_job"))
def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
q = Queue('example', serializer=JSONSerializer)
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job)
self.assertNotIn(job.id, q.job_ids)
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job.id)
self.assertNotIn(job.id, q.job_ids)
def test_jobs(self):
"""Getting jobs out of a queue."""
q = Queue('example')
self.assertEqual(q.jobs, [])
job = q.enqueue(say_hello)
self.assertEqual(q.jobs, [job])
# Deleting job removes it from queue
job.delete()
self.assertEqual(q.job_ids, [])
def test_compact(self):
"""Queue.compact() removes non-existing jobs."""
q = Queue()
q.enqueue(say_hello, 'Alice')
q.enqueue(say_hello, 'Charlie')
self.testconn.lpush(q.key, '1', '2')
self.assertEqual(q.count, 4)
self.assertEqual(len(q), 4)
q.compact()
self.assertEqual(q.count, 2)
self.assertEqual(len(q), 2)
def test_enqueue(self):
"""Enqueueing job onto queues."""
q = Queue()
self.assertEqual(q.is_empty(), True)
# say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id
self.assertEqual(job.origin, q.name)
# Inspect data inside Redis
q_key = 'rq:queue:default'
self.assertEqual(self.testconn.llen(q_key), 1)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'), job_id)
def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data."""
q = Queue()
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
# Preconditions
self.assertIsNone(job.enqueued_at)
# Action
q.enqueue_job(job)
# Postconditions
self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self):
"""Popping job IDs from queues."""
# Set up
q = Queue()
uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
q.push_job_id(uuid)
# Pop it off the queue...
self.assertEqual(q.count, 1)
self.assertEqual(q.pop_job_id(), uuid)
# ...and assert the queue count when down
self.assertEqual(q.count, 0)
def test_dequeue_any(self):
"""Fetching work from any given queue."""
fooq = Queue('foo', connection=self.testconn)
barq = Queue('bar', connection=self.testconn)
self.assertRaises(ValueError, Queue.dequeue_any, [fooq, barq], timeout=0, connection=self.testconn)
self.assertEqual(Queue.dequeue_any([fooq, barq], None), None)
# Enqueue a single item
barq.enqueue(say_hello)
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEqual(job.func, say_hello)
self.assertEqual(queue, barq)
# Enqueue items on both queues
barq.enqueue(say_hello, 'for Bar')
fooq.enqueue(say_hello, 'for Foo')
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEqual(queue, fooq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, fooq.name)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(job.args[0], 'for Foo', 'Foo should be dequeued first.')
job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEqual(queue, barq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, barq.name)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(job.args[0], 'for Bar', 'Bar should be dequeued second.')
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_dequeue_any_reliable(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
foo_queue = Queue('foo', connection=self.testconn)
job_1 = foo_queue.enqueue(say_hello)
self.assertRaises(ValueError, Queue.dequeue_any, [foo_queue], timeout=0, connection=self.testconn)
# Job ID is not in intermediate queue
self.assertIsNone(self.testconn.lpos(foo_queue.intermediate_queue_key, job_1.id))
job, queue = Queue.dequeue_any([foo_queue], timeout=None, connection=self.testconn)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 0)
# Test the blocking version
foo_queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([foo_queue], timeout=1, connection=self.testconn)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 1)
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_intermediate_queue(self):
"""Job should be stuck in intermediate queue if execution fails after dequeued."""
queue = Queue('foo', connection=self.testconn)
job = queue.enqueue(say_hello)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
# mocked.execute_job.side_effect = Exception()
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
# Job status is still QUEUED even though it's already dequeued
self.assertEqual(job.get_status(refresh=True), JobStatus.QUEUED)
self.assertFalse(job.id in queue.get_job_ids())
self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Dequeuing (from any queue) silently ignores non-existing jobs."""
q = Queue('low')
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEqual(q.count, 1)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), None) # noqa
self.assertEqual(q.count, 0)
Job scheduling (#1163) * First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
def test_enqueue_with_ttl(self):
"""Negative TTL value is not allowed"""
queue = Queue()
self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=0)
self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=-1)
def test_enqueue_sets_status(self):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_meta_arg(self):
"""enqueue() can set the job.meta contents."""
q = Queue()
job = q.enqueue(say_hello, meta={'foo': 'bar', 'baz': 42})
self.assertEqual(job.meta['foo'], 'bar')
self.assertEqual(job.meta['baz'], 42)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
6 years ago
def test_enqueue_with_failure_ttl(self):
"""enqueue() properly sets job.failure_ttl"""
q = Queue()
job = q.enqueue(say_hello, failure_ttl=10)
job.refresh()
self.assertEqual(job.failure_ttl, 10)
def test_job_timeout(self):
"""Timeout can be passed via job_timeout argument"""
queue = Queue()
job = queue.enqueue(echo, 1, job_timeout=15)
self.assertEqual(job.timeout, 15)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
6 years ago
# Not passing job_timeout will use queue._default_timeout
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, queue._default_timeout)
# job_timeout = 0 is not allowed
self.assertRaises(ValueError, queue.enqueue, echo, 1, job_timeout=0)
def test_default_timeout(self):
"""Timeout can be passed via job_timeout argument"""
queue = Queue()
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
6 years ago
job = Job.create(func=echo)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
queue = Queue(default_timeout=15)
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, 15)
RQ v1.0! (#1059) * Added FailedJobRegistry. * Added job.failure_ttl. * queue.enqueue() now supports failure_ttl * Added registry.get_queue(). * FailedJobRegistry.add() now assigns DEFAULT_FAILURE_TTL. * StartedJobRegistry.cleanup() now moves expired jobs to FailedJobRegistry. * Failed jobs are now added to FailedJobRegistry. * Added FailedJobRegistry.requeue() * Document the new `FailedJobRegistry` and changes in custom exception handler behavior. * Added worker.disable_default_exception_handler. * Document --disable-default-exception-handler option. * Deleted worker.failed_queue. * Deleted "move_to_failed_queue" exception handler. * StartedJobRegistry should no longer move jobs to FailedQueue. * Deleted requeue_job * Fixed test error. * Make requeue cli command work with FailedJobRegistry * Added .pytest_cache to gitignore. * Custom exception handlers are no longer run in reverse * Restored requeue_job function * Removed get_failed_queue * Deleted FailedQueue * Updated changelog. * Document `failure_ttl` * Updated docs. * Remove job.status * Fixed typo in test_registry.py * Replaced _pipeline() with pipeline() * FailedJobRegistry no longer fails on redis-py>=3 * Fixes test_clean_registries * Worker names are now randomized * Added a note about random worker names in CHANGES.md * Worker will now stop working when encountering an unhandled exception. * Worker should reraise SystemExit on cold shutdowns * Added anchor.js to docs * Support for Sentry-SDK (#1045) * Updated RQ to support sentry-sdk * Document Sentry integration * Install sentry-sdk before running tests * Improved rq info CLI command to be more efficient when displaying lar… (#1046) * Improved rq info CLI command to be more efficient when displaying large number of workers * Fixed an rq info --by-queue bug * Fixed worker.total_working_time bug (#1047) * queue.enqueue() no longer accepts `timeout` argument (#1055) * Clean worker registry (#1056) * queue.enqueue() no longer accepts `timeout` argument * Added clean_worker_registry() * Show worker hostname and PID on cli (#1058) * Show worker hostname and PID on cli * Improve test coverage * Remove Redis version check when SSL is used * Bump version to 1.0 * Removed pytest_cache/README.md * Changed worker logging to use exc_info=True * Removed unused queue.dequeue() * Fixed typo in CHANGES.md * setup_loghandlers() should always call logger.setLevel() if specified
6 years ago
job = Job.create(func=echo)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, 15)
def test_synchronous_timeout(self):
queue = Queue(is_async=False)
self.assertFalse(queue.is_async)
no_expire_job = queue.enqueue(echo, result_ttl=-1)
self.assertEqual(queue.connection.ttl(no_expire_job.key), -1)
delete_job = queue.enqueue(echo, result_ttl=0)
self.assertEqual(queue.connection.ttl(delete_job.key), -2)
keep_job = queue.enqueue(echo, result_ttl=100)
self.assertLessEqual(queue.connection.ttl(keep_job.key), 100)
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
q = Queue()
# Implicit args/kwargs mode
job = q.enqueue(echo, 1, job_timeout=1, result_ttl=1, bar='baz')
self.assertEqual(job.timeout, 1)
self.assertEqual(job.result_ttl, 1)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(job.perform(), ((1,), {'bar': 'baz'}))
# Explicit kwargs mode
kwargs = {
'timeout': 1,
'result_ttl': 1,
}
job = q.enqueue(echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
# Explicit args and kwargs should also work with enqueue_at
time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
# Positional arguments is not allowed if explicit args and kwargs are used
self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs)
def test_all_queues(self):
"""All queues"""
q1 = Queue('first-queue')
q2 = Queue('second-queue')
q3 = Queue('third-queue')
# Ensure a queue is added only once a job is enqueued
self.assertEqual(len(Queue.all()), 0)
q1.enqueue(say_hello)
self.assertEqual(len(Queue.all()), 1)
# Ensure this holds true for multiple queues
q2.enqueue(say_hello)
q3.enqueue(say_hello)
names = [q.name for q in Queue.all()]
self.assertEqual(len(Queue.all()), 3)
# Verify names
self.assertTrue('first-queue' in names)
self.assertTrue('second-queue' in names)
self.assertTrue('third-queue' in names)
# Now empty two queues
w = Worker([q2, q3])
w.work(burst=True)
# Queue.all() should still report the empty queues
self.assertEqual(len(Queue.all()), 3)
def test_all_custom_job(self):
class CustomJob(Job):
pass
q = Queue('all-queue')
q.enqueue(say_hello)
queues = Queue.all(job_class=CustomJob)
self.assertEqual(len(queues), 1)
self.assertIs(queues[0].job_class, CustomJob)
def test_from_queue_key(self):
"""Ensure being able to get a Queue instance manually from Redis"""
q = Queue()
key = Queue.redis_queue_namespace_prefix + 'default'
reverse_q = Queue.from_queue_key(key)
self.assertEqual(q, reverse_q)
def test_from_queue_key_error(self):
"""Ensure that an exception is raised if the queue prefix is wrong"""
key = 'some:weird:prefix:' + 'default'
self.assertRaises(ValueError, Queue.from_queue_key, key)
def test_enqueue_dependents(self):
"""Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue()
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(set(registry.get_job_ids()), set([job_1.id, job_2.id]))
# After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue("queue_1")
q_2 = Queue("queue_2")
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
# Each queue has its own DeferredJobRegistry
registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(set(registry_1.get_job_ids()), set([job_1.id]))
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
self.assertEqual(set(registry_2.get_job_ids()), set([job_2.id]))
# After dependents is enqueued, job_1 on queue_1 and
# job_2 should be in queue_2
self.assertEqual(q_1.job_ids, [])
self.assertEqual(q_2.job_ids, [])
q_1.enqueue_dependents(parent_job)
q_2.enqueue_dependents(parent_job)
self.assertEqual(set(q_1.job_ids), set([job_1.id]))
self.assertEqual(set(q_2.job_ids), set([job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry_1.get_job_ids(), [])
self.assertEqual(registry_2.get_job_ids(), [])
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue()
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.DEFERRED)
# Not in registry before execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 0)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 1)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
# Pre execute conditions
self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Post execute conditions
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
q = Queue()
with q.connection.pipeline() as pipe:
pipe.watch(b'fake_key') # Test watch then enqueue
job = q.enqueue_call(say_hello, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Not in queue before execute, since passed in pipeline
self.assertEqual(len(q), 0)
# Make sure modifying key doesn't cause issues, if in multi mode won't fail
pipe.set(b'fake_key', b'fake_value')
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 1)
def test_enqueue_many_internal_pipeline(self):
"""Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue()
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
jobs = q.enqueue_many(
[job_1_data, job_2_data, job_3_data],
)
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
def test_enqueue_many_with_passed_pipeline(self):
"""Jobs should be enqueued in bulk with a passed pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue()
with q.connection.pipeline() as pipe:
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
jobs = q.enqueue_many([job_1_data, job_2_data, job_3_data], pipeline=pipe)
self.assertEqual(q.job_ids, [])
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
def test_enqueue_job_with_dependency_by_id(self):
"""Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_multiple_queued_dependencies(self):
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.QUEUED
job.save()
q = Queue()
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueue_job_with_multiple_finished_dependencies(self):
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.FINISHED
job.save()
q = Queue()
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueues_dependent_if_other_dependencies_finished(self):
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
parent_jobs = [Job.create(func=say_hello) for _ in range(3)]
parent_jobs[0]._status = JobStatus.STARTED
parent_jobs[0].save()
parent_jobs[1]._status = JobStatus.FINISHED
parent_jobs[1].save()
parent_jobs[2]._status = JobStatus.FINISHED
parent_jobs[2].save()
q = Queue()
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
# dependent job deferred, b/c parent_job 0 is still 'started'
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
dependent_job = q.enqueue(
say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs]
)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
# now set parent job 0 to 'finished'
parent_jobs[0].set_status(JobStatus.FINISHED)
q.enqueue_dependents(parent_jobs[0])
self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [dependent_job.id])
def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED)
started_dependency.save()
queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED)
queued_dependency.save()
q = Queue()
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
Worker pool (#1874) * First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
2 years ago
dependent_job = q.enqueue(
say_hello,
depends_on=[started_dependency],
_dependency_ids=[started_dependency.id, queued_dependency.id],
)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
def test_fetch_job_successful(self):
"""Fetch a job from a queue."""
q = Queue('example')
job_orig = q.enqueue(say_hello)
job_fetch: Job = q.fetch_job(job_orig.id) # type: ignore
self.assertIsNotNone(job_fetch)
self.assertEqual(job_orig.id, job_fetch.id)
self.assertEqual(job_orig.description, job_fetch.description)
def test_fetch_job_missing(self):
"""Fetch a job from a queue which doesn't exist."""
q = Queue('example')
job = q.fetch_job('123')
self.assertIsNone(job)
def test_fetch_job_different_queue(self):
"""Fetch a job from a queue which is in a different queue."""
q1 = Queue('example1')
q2 = Queue('example2')
job_orig = q1.enqueue(say_hello)
job_fetch = q2.fetch_job(job_orig.id)
self.assertIsNone(job_fetch)
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
Job scheduling (#1163) * First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
def test_getting_registries(self):
"""Getting job registries from queue object"""
queue = Queue('example')
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
def test_getting_registries_with_serializer(self):
"""Getting job registries from queue object (with custom serializer)"""
queue = Queue('example', serializer=JSONSerializer)
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
# Make sure we don't use default when queue has custom
self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.started_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.canceled_job_registry.serializer, JSONSerializer)
def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works"""
queue = Queue('example', connection=self.testconn)
job = queue.enqueue(say_hello, retry=Retry(max=3, interval=5))
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [5])
Job scheduling (#1163) * First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
class TestJobScheduling(RQTestCase):
def test_enqueue_at(self):
"""enqueue_at() creates a job in ScheduledJobRegistry"""
queue = Queue(connection=self.testconn)
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
Job scheduling (#1163) * First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
5 years ago
job = queue.enqueue_at(scheduled_time, say_hello)
registry = ScheduledJobRegistry(queue=queue)
self.assertIn(job, registry)
self.assertTrue(registry.get_expiration_time(job), scheduled_time)