Job scheduling (#1163)

* First RQScheduler prototype

* WIP job scheduling

* Fixed Python 2.7 tests

* Added ScheduledJobRegistry.get_scheduled_time(job)

* WIP on scheduler's threading mechanism

* Fixed test errors

* Changed scheduler.acquire_locks() to instance method

* Added scheduler.prepare_registries()

* Somewhat working implementation of RQ scheduler

* Only call stop_scheduler if there's a scheduler present

* Use OSError rather than ProcessLookupError for PyPy compatibility

* Added `auto_start` argument to scheduler.acquire_locks()

* Make RQScheduler play better with timezone

* Fixed test error

* Added --with-scheduler flag to rq worker CLI

* Fix tests on Python 2.x

* More Python 2 fixes

* Only call `scheduler.start` if worker is run in non burst mode

* Fixed an issue where running worker with scheduler would fail sometimes

* Make `worker.stop_scheduler()` more resilient to errors

* worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks

* Scheduler can now work with worker in both burst and non burst mode

* Fixed scheduler logging message

* Always log scheduler errors when running

* Improve scheduler error logging message

* Removed testing code

* Scheduler should periodically try to acquire locks for other queues it doesn't have

* Added tests for scheduler.should_reacquire_locks

* Added queue.enqueue_in()

* Fixes queue.enqueue_in() in Python 2.7

* First stab at documenting job scheduling

* Remove unused methods

* Remove Python 2.6 logging compatibility code

* Remove more unused imports

* Added convenience methods to access job registries from queue

* Added test for worker.run_maintenance_tasks()

* Simplify worker.queue_names() and worker.queue_keys()

* Updated changelog to mention RQ's new job scheduling mechanism.
main
Selwin Ong 5 years ago committed by GitHub
parent f09d4db080
commit baa0cc268a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,5 @@
### RQ 1.2.0 (Unreleased)
### RQ 1.2.0 (2020-01-04)
* This release also contains an alpha version of RQ's builtin job scheduling mechanism. Thanks @selwin!
* Various internal API changes in preparation to support multiple job dependencies. Thanks @thomasmatecki!
* `--verbose` or `--quiet` CLI arguments should override `--logging-level`. Thanks @zyt312074545!
* Fixes a bug in `rq info` where it doesn't show workers for empty queues. Thanks @zyt312074545!

@ -10,6 +10,7 @@ executed and removed right after completion (success or failure).
* `FailedJobRegistry` Holds jobs that have been executed, but didn't finish successfully.
* `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that
job to finish).
* `ScheduledJobRegistry` Holds schedduled jobs.
You can get the number of jobs in a registry, the ids of the jobs in the registry, and more.
Below is an example using a `StartedJobRegistry`.
@ -44,6 +45,24 @@ print('Job in registry %s' % (job in registry))
print('Job in registry %s' % (job.id in registry))
```
_New in version 1.2.0_
You can quickly access job registries from `Queue` objects.
```python
from redis import Redis
from rq import Queue
redis = Redis()
queue = Queue(connection=redis)
queue.started_job_registry # Returns StartedJobRegistry
queue.deferred_job_registry # Returns DeferredJobRegistry
queue.finished_job_registry # Returns FinishedJobRegistry
queue.failed_job_registry # Returns FailedJobRegistry
queue.scheduled_job_registry # Returns ScheduledobRegistry
```
## Removing Jobs
_New in version 1.2.0_
@ -69,5 +88,4 @@ for job_id in registry.get_job_ids():
# use `delete_job=True`
for job_id in registry.get_job_ids():
registry.remove(job_id, delete_job=True)
```

@ -0,0 +1,114 @@
---
title: "RQ: Scheduling Jobs"
layout: docs
---
_New in version 1.2.0._
This builtin version of `RQScheduler` is still in alpha, use at your own risk!
If you need a battle tested version of RQ job scheduling, please take a look at
https://github.com/rq/rq-scheduler instead.
New in RQ 1.2.0 is `RQScheduler`, a built-in component that allows you to schedule jobs
for future execution.
This component is developed based on prior experience of developing the external
`rq-scheduler` library. The goal of taking this component in house is to allow
RQ to have job scheduling capabilities without:
1. Running a separate `rqscheduler` CLI command.
2. Worrying about a separate `Scheduler` class.
# Scheduling Jobs for Execution
There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`.
`queue.enqueue_at()` works almost like `queue.enqueue()`, except that it expects a datetime
for its first argument.
```python
from datetime import datetime
from rq import Queue
from redis import Redis
from somewhere import say_hello
queue = Queue(name='default', connection=Redis())
# Schedules job to be run at 9:15, October 10th in the local timezone
job = queue.enqueue_at(datetime(2019, 10, 8, 9, 15), say_hello)
```
Note that if you pass in a naive datetime object, RQ will automatically convert it
to the local timezone.
`queue.enqueue_in()` accepts a `timedelta` as its first argument.
```python
from datetime import timedelta
from rq import Queue
from redis import Redis
from somewhere import say_hello
queue = Queue(name='default', connection=Redis())
# Schedules job to be run in 10 seconds
job = queue.enqueue_at(timedelta(seconds=10), say_hello)
```
Jobs that are scheduled for execution are not placed in the queue, but they are
stored in `ScheduledJobRegistry`.
```python
from datetime import timedelta
from redis import Redis
from rq import Queue
from rq.registry import ScheduledJobRegistry
redis = Redis()
queue = Queue(name='default', connection=redis)
job = queue.enqueue_in(timedelta(seconds=10), say_nothing)
print(job in queue) # Outputs False as job is not enqueued
registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry
```
# Running the Scheduler
If you use RQ's scheduling features, you need to run RQ workers with the
scheduler component enabled.
```console
$ rq worker --with-scheduler
```
You can also run a worker with scheduler enabled in a programmatic way.
```python
from rq import Worker, Queue
from redis import Redis
redis = Redis()
queue = Queue(connection=redis)
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)
```
Only a single scheduler can run for a specific queue at any one time. If you run multiple
workers with scheduler enabled, only one scheduler will be actively working for a given queue.
Active schedulers are responsible for enqueueing scheduled jobs. Active schedulers will check for
scheduled jobs once every second.
Idle schedulers will periodically (every 15 minutes) check whether the queues they're
responsible for have active schedulers. If they don't, one of the idle schedulers will start
working. This way, if a worker with active scheduler dies, the scheduling work will be picked
up by other workers with the scheduling component enabled.

@ -200,12 +200,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn,
exception_handler, pid, disable_default_exception_handler, max_jobs, queues,
log_format, date_format, **options):
exception_handler, pid, disable_default_exception_handler, max_jobs, with_scheduler,
queues, log_format, date_format, **options):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
@ -253,7 +254,9 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
if verbose or quiet:
logging_level = None
worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format, max_jobs=max_jobs)
worker.work(burst=burst, logging_level=logging_level,
date_format=date_format, log_format=log_format,
max_jobs=max_jobs, with_scheduler=with_scheduler)
except ConnectionError as e:
print(e)
sys.exit(1)

@ -84,3 +84,23 @@ else:
def decode_redis_hash(h):
return h
try:
from datetime import timezone
utc = timezone.utc
except ImportError:
# Python 2.x workaround
from datetime import timedelta, tzinfo
class UTC(tzinfo):
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
utc = UTC()

@ -33,7 +33,8 @@ JobStatus = enum(
FINISHED='finished',
FAILED='failed',
STARTED='started',
DEFERRED='deferred'
DEFERRED='deferred',
SCHEDULED='scheduled',
)
# Sentinel value to mark that some of our lazily evaluated properties have not
@ -128,9 +129,9 @@ class Job(object):
# Extra meta data
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job.failure_ttl = failure_ttl
job.ttl = ttl
job.result_ttl = parse_timeout(result_ttl)
job.failure_ttl = parse_timeout(failure_ttl)
job.ttl = parse_timeout(ttl)
job.timeout = parse_timeout(timeout)
job._status = status
job.meta = meta or {}

@ -5,13 +5,14 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
from datetime import datetime
from redis import WatchError
from .compat import as_text, string_types, total_ordering
from .compat import as_text, string_types, total_ordering, utc
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError,
UnpickleError)
from .exceptions import DequeueTimeout, NoSuchJobError, UnpickleError
from .job import Job, JobStatus
from .utils import backend_class, import_attribute, parse_timeout, utcnow
@ -184,7 +185,31 @@ class Queue(object):
def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry
return FailedJobRegistry(queue=self)
return FailedJobRegistry(queue=self, job_class=self.job_class)
@property
def started_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class)
@property
def finished_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import FinishedJobRegistry
return FinishedJobRegistry(queue=self)
@property
def deferred_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class)
@property
def scheduled_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class)
def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID."""
@ -220,6 +245,23 @@ class Queue(object):
else:
connection.rpush(self.key, job_id)
def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
meta=None):
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout) or self._default_timeout
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=JobStatus.QUEUED, description=description,
depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta
)
return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
@ -248,12 +290,12 @@ class Queue(object):
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=JobStatus.QUEUED, description=description,
depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta)
description=description, depends_on=depends_on, origin=self.name,
id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
)
# If a _dependent_ job depends on any unfinished job, register all the
#_dependent_ job's dependencies instead of enqueueing it.
# _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
@ -338,6 +380,24 @@ class Queue(object):
at_front=at_front, meta=meta
)
def enqueue_at(self, datetime, func, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry
job = self.create_job(func, *args, **kwargs)
registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute()
return job
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedules a job to be executed in a given `timedelta` object"""
return self.enqueue_at(datetime.now(utc) + time_delta,
func, *args, **kwargs)
def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution.

@ -1,4 +1,8 @@
from .compat import as_text
import calendar
import time
from datetime import datetime, timedelta
from .compat import as_text, utc
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
@ -32,6 +36,9 @@ class BaseRegistry(object):
"""Returns the number of jobs in this registry"""
return self.count
def __eq__(self, other):
return (self.name == other.name and self.connection == other.connection)
def __contains__(self, item):
"""
Returns a boolean indicating registry contains the given
@ -92,6 +99,11 @@ class BaseRegistry(object):
"""Returns Queue object associated with this registry."""
return Queue(self.name, connection=self.connection)
def get_expiration_time(self, job):
"""Returns job's expiration time."""
score = self.connection.zscore(self.key, job.id)
return datetime.utcfromtimestamp(score)
class StartedJobRegistry(BaseRegistry):
"""
@ -221,6 +233,70 @@ class DeferredJobRegistry(BaseRegistry):
pass
class ScheduledJobRegistry(BaseRegistry):
"""
Registry of scheduled jobs.
"""
key_template = 'rq:scheduled:{0}'
def __init__(self, *args, **kwargs):
super(ScheduledJobRegistry, self).__init__(*args, **kwargs)
# The underlying implementation of get_jobs_to_enqueue() is
# the same as get_expired_job_ids, but get_expired_job_ids() doesn't
# make sense in this context
self.get_jobs_to_enqueue = self.get_expired_job_ids
def schedule(self, job, scheduled_datetime, pipeline=None):
"""
Adds job to registry, scored by its execution time (in UTC).
If datetime has no tzinfo, it will assume localtimezone.
"""
# If datetime has no timezone, assume server's local timezone
# if we're on Python 3. If we're on Python 2.7, raise an
# exception since Python < 3.2 has no builtin `timezone` class
if not scheduled_datetime.tzinfo:
try:
from datetime import timezone
except ImportError:
raise ValueError('datetime object with no timezone')
tz = timezone(timedelta(seconds=-time.timezone))
scheduled_datetime = scheduled_datetime.replace(tzinfo=tz)
timestamp = calendar.timegm(scheduled_datetime.utctimetuple())
return self.connection.zadd(self.key, {job.id: timestamp})
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass
def remove_jobs(self, timestamp=None, pipeline=None):
"""Remove jobs whose timestamp is in the past from registry."""
connection = pipeline if pipeline is not None else self.connection
score = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
def get_jobs_to_schedule(self, timestamp=None):
"""Remove jobs whose timestamp is in the past from registry."""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score)]
def get_scheduled_time(self, job_or_id):
"""Returns datetime (UTC) at which job is scheduled to be enqueued"""
if isinstance(job_or_id, self.job_class):
job_id = job_or_id.id
else:
job_id = job_or_id
score = self.connection.zscore(self.key, job_id)
if not score:
raise NoSuchJobError
return datetime.fromtimestamp(score, tz=utc)
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name,

@ -0,0 +1,197 @@
import logging
import os
import signal
import time
import traceback
from datetime import datetime
from multiprocessing import Process
from .job import Job
from .queue import Queue
from .registry import ScheduledJobRegistry
from .utils import current_timestamp, enum
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
class RQScheduler(object):
# STARTED: scheduler has been started but sleeping
# WORKING: scheduler is in the midst of scheduling jobs
# STOPPED: scheduler is in stopped condition
Status = enum(
'SchedulerStatus',
STARTED='started',
WORKING='working',
STOPPED='stopped'
)
def __init__(self, queues, connection, interval=1):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set([])
self._scheduled_job_registries = []
self.lock_acquisition_time = None
self.connection = connection
self.interval = interval
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None
@property
def acquired_locks(self):
return self._acquired_locks
@property
def status(self):
return self._status
@property
def should_reacquire_locks(self):
"""Returns True if lock_acquisition_time is longer than 15 minutes ago"""
if self._queue_names == self.acquired_locks:
return False
if not self.lock_acquisition_time:
return True
return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900
def acquire_locks(self, auto_start=False):
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set([])
pid = os.getpid()
logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
successful_locks.add(name)
self._acquired_locks = self._acquired_locks.union(successful_locks)
if self._acquired_locks:
self.prepare_registries(self._acquired_locks)
self.lock_acquisition_time = datetime.now()
# If auto_start is requested and scheduler is not started,
# run self.start()
if self._acquired_locks and auto_start:
if not self._process:
self.start()
return successful_locks
def prepare_registries(self, queue_names):
"""Prepare scheduled job registries for use"""
self._scheduled_job_registries = []
for name in queue_names:
self._scheduled_job_registries.append(
ScheduledJobRegistry(name, connection=self.connection)
)
@classmethod
def get_locking_key(self, name):
"""Returns scheduler key for a given queue name"""
return SCHEDULER_LOCKING_KEY_TEMPLATE % name
def enqueue_scheduled_jobs(self):
"""Enqueue jobs whose timestamp is in the past"""
self._status = self.Status.WORKING
for registry in self._scheduled_job_registries:
timestamp = current_timestamp()
# TODO: try to use Lua script to make get_jobs_to_schedule()
# and remove_jobs() atomic
job_ids = registry.get_jobs_to_schedule(timestamp)
if not job_ids:
continue
queue = Queue(registry.name, connection=self.connection)
with self.connection.pipeline() as pipeline:
# This should be done in bulk
for job_id in job_ids:
job = Job.fetch(job_id, connection=self.connection)
queue.enqueue_job(job, pipeline=pipeline)
registry.remove_jobs(timestamp)
pipeline.execute()
self._status = self.Status.STARTED
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
"""
signal.signal(signal.SIGINT, self.request_stop)
signal.signal(signal.SIGTERM, self.request_stop)
def request_stop(self, signum=None, frame=None):
"""Toggle self._stop_requested that's checked on every loop"""
self._stop_requested = True
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
logging.info("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._queue_names:
key = self.get_locking_key(name)
pipeline.expire(key, self.interval + 5)
pipeline.execute()
else:
key = self.get_locking_key(next(iter(self._queue_names)))
self.connection.expire(key, self.interval + 5)
def stop(self):
logging.info("Scheduler stopping, releasing locks for %s...",
','.join(self._queue_names))
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
self._status = self.Status.STOPPED
def start(self):
self._status = self.Status.STARTED
self._process = Process(target=run, args=(self,), name='Scheduler')
self._process.start()
return self._process
def work(self):
self._install_signal_handlers()
while True:
if self._stop_requested:
self.stop()
break
if self.should_reacquire_locks:
self.acquire_locks()
self.enqueue_scheduled_jobs()
self.heartbeat()
time.sleep(self.interval)
def run(scheduler):
logging.info("Scheduler for %s started with PID %s",
','.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
logging.error(
'Scheduler [PID %s] raised an exception.\n%s',
os.getpid(), traceback.format_exc()
)
raise
logging.info("Scheduler with PID %s has stopped", os.getpid())
def parse_names(queues_or_names):
"""Given a list of strings or queues, returns queue names"""
names = []
for queue_or_name in queues_or_names:
if isinstance(queue_or_name, Queue):
names.append(queue_or_name.name)
else:
names.append(str(queue_or_name))
return names

@ -125,10 +125,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
def __init__(self, exclude=None, *args, **kwargs):
self.exclude = exclude
if is_python_version((2, 6)):
logging.StreamHandler.__init__(self, *args, **kwargs)
else:
super(ColorizingStreamHandler, self).__init__(*args, **kwargs)
super(ColorizingStreamHandler, self).__init__(*args, **kwargs)
@property
def is_tty(self):

@ -33,8 +33,8 @@ from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import (FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry, clean_registries)
from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum,
@ -204,6 +204,7 @@ class Worker(object):
self.failed_job_count = 0
self.total_working_time = 0
self.birth_date = None
self.scheduler = None
self.disable_default_exception_handler = disable_default_exception_handler
@ -221,11 +222,11 @@ class Worker(object):
def queue_names(self):
"""Returns the queue names of this worker's queues."""
return list(map(lambda q: q.name, self.queues))
return [queue.name for queue in self.queues]
def queue_keys(self):
"""Returns the Redis keys representing this worker's queues."""
return list(map(lambda q: q.key, self.queues))
return [queue.key for queue in self.queues]
@property
def key(self):
@ -411,7 +412,11 @@ class Worker(object):
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
if self.scheduler:
self.stop_scheduler()
else:
if self.scheduler:
self.stop_scheduler()
raise StopRequested()
def handle_warm_shutdown_request(self):
@ -440,8 +445,22 @@ class Worker(object):
if before_state:
self.set_state(before_state)
def run_maintenance_tasks(self):
"""
Runs periodic maintenance tasks, these include:
1. Check if scheduler should be started. This check should not be run
on first run since worker.work() already calls
`scheduler.enqueue_scheduled_jobs()` on startup.
2. Cleaning registries
"""
# No need to try to start scheduler on first run
if self.last_cleaned_at:
if self.scheduler and not self.scheduler._process:
self.scheduler.acquire_locks(auto_start=True)
self.clean_registries()
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None):
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@ -451,7 +470,6 @@ class Worker(object):
The return value indicates whether any jobs were processed.
"""
setup_loghandlers(logging_level, date_format, log_format)
self._install_signal_handlers()
completed_jobs = 0
self.register_birth()
self.log.info("Worker %s: started, version %s", self.key, VERSION)
@ -459,13 +477,27 @@ class Worker(object):
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
if with_scheduler:
self.scheduler = RQScheduler(self.queues, connection=self.connection)
self.scheduler.acquire_locks()
# If lock is acquired, start scheduler
if self.scheduler.acquired_locks:
# If worker is run on burst mode, enqueue_scheduled_jobs()
# before working. Otherwise, start scheduler in a separate process
if burst:
self.scheduler.enqueue_scheduled_jobs()
else:
self.scheduler.start()
self._install_signal_handlers()
try:
while True:
try:
self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
self.clean_registries()
self.run_maintenance_tasks()
if self._stop_requested:
self.log.info('Worker %s: stopping on request', self.key)
@ -507,9 +539,23 @@ class Worker(object):
break
finally:
if not self.is_horse:
if self.scheduler:
self.stop_scheduler()
self.register_death()
return bool(completed_jobs)
def stop_scheduler(self):
"""Ensure scheduler process is stopped"""
if self.scheduler._process and self.scheduler._process.pid:
# Send the kill signal to scheduler process
try:
os.kill(self.scheduler._process.pid, signal.SIGTERM)
except OSError:
pass
self.scheduler._process.join()
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = ','.join(self.queue_names())
@ -521,6 +567,9 @@ class Worker(object):
while True:
self.heartbeat()
if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()
try:
result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection,
@ -798,9 +847,7 @@ class Worker(object):
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection,
job_class=self.job_class)
finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
@ -819,9 +866,7 @@ class Worker(object):
self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection)
started_job_registry = StartedJobRegistry(job.origin,
self.connection,
job_class=self.job_class)
started_job_registry = queue.started_job_registry
try:
job.started_at = utcnow()
@ -837,7 +882,7 @@ class Worker(object):
self.handle_job_success(job=job,
queue=queue,
started_job_registry=started_job_registry)
except:
except: # NOQA
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = self._get_safe_exception_string(

@ -8,6 +8,7 @@ from __future__ import (absolute_import, division, print_function,
import os
import time
import signal
import sys
from rq import Connection, get_current_job, get_current_connection, Queue
@ -164,3 +165,13 @@ def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay):
class DummyQueue(object):
pass
def kill_worker(pid, double_kill, interval=0.5):
# wait for the worker to be started over on the main process
time.sleep(interval)
os.kill(pid, signal.SIGTERM)
if double_kill:
# give the worker time to switch signal handler
time.sleep(interval)
os.kill(pid, signal.SIGTERM)

@ -2,14 +2,17 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime
from click.testing import CliRunner
from redis import Redis
from rq import Queue
from rq.compat import utc
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
from rq.registry import FailedJobRegistry
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.worker import Worker, WorkerStatus
import pytest
@ -244,6 +247,21 @@ class TestRQCli(RQTestCase):
self.assertTrue(len(pid.read()) > 0)
self.assert_normal_execution(result)
def test_worker_with_scheduler(self):
"""rq worker -u <url> --with-scheduler"""
queue = Queue(connection=self.connection)
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
registry = ScheduledJobRegistry(queue=queue)
runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 1) # 1 job still scheduled
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--with-scheduler'])
self.assert_normal_execution(result)
self.assertEqual(len(registry), 0) # Job has been enqueued
def test_worker_logging_options(self):
"""--quiet and --verbose logging options are supported"""
runner = CliRunner()

@ -2,11 +2,18 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime, timedelta
from rq import Queue
from rq.exceptions import InvalidJobDependency, NoSuchJobError
from rq.compat import utc
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, ScheduledJobRegistry,
StartedJobRegistry)
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import echo, say_hello
@ -520,3 +527,23 @@ class TestQueue(RQTestCase):
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
def test_getting_registries(self):
"""Getting job registries from queue object"""
queue = Queue('example')
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
class TestJobScheduling(RQTestCase):
def test_enqueue_at(self):
"""enqueue_at() creates a job in ScheduledJobRegistry"""
queue = Queue(connection=self.testconn)
scheduled_time = datetime.now(utc) + timedelta(seconds=10)
job = queue.enqueue_at(scheduled_time, say_hello)
registry = ScheduledJobRegistry(queue=queue)
self.assertIn(job, registry)
self.assertTrue(registry.get_expiration_time(job), scheduled_time)

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from datetime import datetime, timedelta
from rq.compat import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import InvalidJobOperation
@ -57,6 +59,18 @@ class TestRegistry(RQTestCase):
self.assertTrue(job in registry)
self.assertTrue(job.id in registry)
def test_get_expiration_time(self):
"""registry.get_expiration_time() returns correct datetime objects"""
registry = StartedJobRegistry(connection=self.testconn)
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
registry.add(job, 5)
self.assertEqual(
registry.get_expiration_time(job),
(datetime.utcnow() + timedelta(seconds=5)).replace(microsecond=0)
)
def test_add_and_remove(self):
"""Adding and removing job to StartedJobRegistry."""
timestamp = current_timestamp()

@ -0,0 +1,291 @@
import os
import time
from datetime import datetime, timedelta
from multiprocessing import Process
from rq import Queue
from rq.compat import utc, PY2
from rq.exceptions import NoSuchJobError
from rq.job import Job
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
from rq.scheduler import RQScheduler
from rq.utils import current_timestamp
from rq.worker import Worker
from .fixtures import kill_worker, say_hello
from tests import RQTestCase
import mock
class TestScheduledJobRegistry(RQTestCase):
def test_get_jobs_to_enqueue(self):
"""Getting job ids to enqueue from ScheduledJobRegistry."""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
timestamp = current_timestamp()
self.testconn.zadd(registry.key, {'foo': 1})
self.testconn.zadd(registry.key, {'bar': timestamp + 10})
self.testconn.zadd(registry.key, {'baz': timestamp + 30})
self.assertEqual(registry.get_jobs_to_enqueue(), ['foo'])
self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20),
['foo', 'bar'])
def test_get_scheduled_time(self):
"""get_scheduled_time() returns job's scheduled datetime"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
job = Job.create('myfunc', connection=self.testconn)
job.save()
dt = datetime(2019, 1, 1, tzinfo=utc)
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
self.assertEqual(registry.get_scheduled_time(job), dt)
# get_scheduled_time() should also work with job ID
self.assertEqual(registry.get_scheduled_time(job.id), dt)
# registry.get_scheduled_time() raises NoSuchJobError if
# job.id is not found
self.assertRaises(NoSuchJobError, registry.get_scheduled_time, '123')
def test_schedule(self):
"""Adding job with the correct score to ScheduledJobRegistry"""
queue = Queue(connection=self.testconn)
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry = ScheduledJobRegistry(queue=queue)
if PY2:
# On Python 2, datetime needs to have timezone
self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1))
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800) # 2019-01-01 UTC in Unix timestamp
else:
from datetime import timezone
# If we pass in a datetime with no timezone, `schedule()`
# assumes local timezone so depending on your local timezone,
# the timestamp maybe different
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800 + time.timezone) # 2019-01-01 UTC in Unix timestamp
# Score is always stored in UTC even if datetime is in a different tz
tz = timezone(timedelta(hours=7))
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry.schedule(job, datetime(2019, 1, 1, 7, tzinfo=tz))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
1546300800) # 2019-01-01 UTC in Unix timestamp
class TestScheduler(RQTestCase):
def test_init(self):
"""Scheduler can be instantiated with queues or queue names"""
foo_queue = Queue('foo', connection=self.testconn)
scheduler = RQScheduler([foo_queue, 'bar'], connection=self.testconn)
self.assertEqual(scheduler._queue_names, {'foo', 'bar'})
self.assertEqual(scheduler.status, RQScheduler.Status.STOPPED)
def test_should_reacquire_locks(self):
"""scheduler.should_reacquire_locks works properly"""
queue = Queue(connection=self.testconn)
scheduler = RQScheduler([queue], connection=self.testconn)
self.assertTrue(scheduler.should_reacquire_locks)
scheduler.acquire_locks()
self.assertIsNotNone(scheduler.lock_acquisition_time)
# scheduler.should_reacquire_locks always returns False if
# scheduler.acquired_locks and scheduler._queue_names are the same
self.assertFalse(scheduler.should_reacquire_locks)
scheduler.lock_acquisition_time = datetime.now() - timedelta(minutes=16)
self.assertFalse(scheduler.should_reacquire_locks)
scheduler._queue_names = set(['default', 'foo'])
self.assertTrue(scheduler.should_reacquire_locks)
scheduler.acquire_locks()
self.assertFalse(scheduler.should_reacquire_locks)
def test_lock_acquisition(self):
"""Test lock acquisition"""
name_1 = 'lock-test-1'
name_2 = 'lock-test-2'
name_3 = 'lock-test-3'
scheduler = RQScheduler([name_1], self.testconn)
self.assertEqual(scheduler.acquire_locks(), {name_1})
self.assertEqual(scheduler._acquired_locks, {name_1})
self.assertEqual(scheduler.acquire_locks(), set([]))
# Only name_2 is returned since name_1 is already locked
scheduler = RQScheduler([name_1, name_2], self.testconn)
self.assertEqual(scheduler.acquire_locks(), {name_2})
self.assertEqual(scheduler._acquired_locks, {name_2})
# When a new lock is successfully acquired, _acquired_locks is added
scheduler._queue_names.add(name_3)
self.assertEqual(scheduler.acquire_locks(), {name_3})
self.assertEqual(scheduler._acquired_locks, {name_2, name_3})
def test_lock_acquisition_with_auto_start(self):
"""Test lock acquisition with auto_start=True"""
scheduler = RQScheduler(['auto-start'], self.testconn)
with mock.patch.object(scheduler, 'start') as mocked:
scheduler.acquire_locks(auto_start=True)
self.assertEqual(mocked.call_count, 1)
# If process has started, scheduler.start() won't be called
scheduler = RQScheduler(['auto-start2'], self.testconn)
scheduler._process = 1
with mock.patch.object(scheduler, 'start') as mocked:
scheduler.acquire_locks(auto_start=True)
self.assertEqual(mocked.call_count, 0)
def test_heartbeat(self):
"""Test that heartbeat updates locking keys TTL"""
name_1 = 'lock-test-1'
name_2 = 'lock-test-2'
scheduler = RQScheduler([name_1, name_2], self.testconn)
scheduler.acquire_locks()
locking_key_1 = RQScheduler.get_locking_key(name_1)
locking_key_2 = RQScheduler.get_locking_key(name_2)
with self.testconn.pipeline() as pipeline:
pipeline.expire(locking_key_1, 1000)
pipeline.expire(locking_key_2, 1000)
scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 6)
self.assertEqual(self.testconn.ttl(locking_key_1), 6)
# scheduler.stop() releases locks and sets status to STOPPED
scheduler._status = scheduler.Status.WORKING
scheduler.stop()
self.assertFalse(self.testconn.exists(locking_key_1))
self.assertFalse(self.testconn.exists(locking_key_2))
self.assertEqual(scheduler.status, scheduler.Status.STOPPED)
# Heartbeat also works properly for schedulers with a single queue
scheduler = RQScheduler([name_1], self.testconn)
scheduler.acquire_locks()
self.testconn.expire(locking_key_1, 1000)
scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 6)
def test_enqueue_scheduled_jobs(self):
"""Scheduler can enqueue scheduled jobs"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
job = Job.create('myfunc', connection=self.testconn)
job.save()
registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
# After job is scheduled, registry should be empty
self.assertEqual(len(registry), 0)
# Jobs scheduled in the far future should not be affected
registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc))
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
def test_prepare_registries(self):
"""prepare_registries() creates self._scheduled_job_registries"""
foo_queue = Queue('foo', connection=self.testconn)
bar_queue = Queue('bar', connection=self.testconn)
scheduler = RQScheduler([foo_queue, bar_queue], connection=self.testconn)
self.assertEqual(scheduler._scheduled_job_registries, [])
scheduler.prepare_registries([foo_queue.name])
self.assertEqual(scheduler._scheduled_job_registries, [ScheduledJobRegistry(queue=foo_queue)])
scheduler.prepare_registries([foo_queue.name, bar_queue.name])
self.assertEqual(
scheduler._scheduled_job_registries,
[ScheduledJobRegistry(queue=foo_queue), ScheduledJobRegistry(queue=bar_queue)]
)
class TestWorker(RQTestCase):
def test_work_burst(self):
"""worker.work() with scheduler enabled works properly"""
queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=False)
self.assertIsNone(worker.scheduler)
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=True)
self.assertIsNotNone(worker.scheduler)
@mock.patch.object(RQScheduler, 'acquire_locks')
def test_run_maintenance_tasks(self, mocked):
"""scheduler.acquire_locks() is called only when scheduled is enabled"""
queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn)
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0)
worker.last_cleaned_at = None
worker.scheduler = RQScheduler([queue], connection=self.testconn)
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0)
worker.last_cleaned_at = datetime.now()
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 1)
def test_work(self):
queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn)
p = Process(target=kill_worker, args=(os.getpid(), False, 5))
p.start()
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
worker.work(burst=False, with_scheduler=True)
p.join(1)
self.assertIsNotNone(worker.scheduler)
registry = FinishedJobRegistry(queue=queue)
self.assertEqual(len(registry), 1)
class TestQueue(RQTestCase):
def test_enqueue_at(self):
"""queue.enqueue_at() puts job in the scheduled"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 1)
# After enqueue_scheduled_jobs() is called, the registry is empty
# and job is enqueued
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
self.assertEqual(len(registry), 0)
def test_enqueue_in(self):
"""queue.enqueue_in() schedules job correctly"""
queue = Queue(connection=self.testconn)
registry = ScheduledJobRegistry(queue=queue)
job = queue.enqueue_in(timedelta(seconds=30), say_hello)
now = datetime.now(utc)
scheduled_time = registry.get_scheduled_time(job)
# Ensure that job is scheduled roughly 30 seconds from now
self.assertTrue(
now + timedelta(seconds=28) < scheduled_time < now + timedelta(seconds=32)
)

@ -22,9 +22,9 @@ from mock import Mock
from tests import RQTestCase, slow
from tests.fixtures import (
create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello,
say_pid, run_dummy_heroku_worker, access_self, modify_self,
modify_self_and_error, long_running_job, save_key_ttl
access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing,
kill_worker, long_running_job, modify_self, modify_self_and_error,
run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid,
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
@ -59,6 +59,9 @@ class TestWorker(RQTestCase):
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
self.assertEqual(w.queue_keys(), [w.queues[0].key, w.queues[1].key])
self.assertEqual(w.queue_names(), ['foo', 'bar'])
# With iterable of strings
w = Worker(iter(['foo', 'bar']))
self.assertEqual(w.queues[0].name, 'foo')
@ -952,16 +955,6 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.python_version, python_version)
def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
if double_kill:
# give the worker time to switch signal handler
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
os.kill(pid, signal.SIGKILL)
@ -1203,7 +1196,7 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
# Mimic how exception info is actually passed forwards
try:
raise Exception(u"💪")
except:
except Exception:
self.exc_info = sys.exc_info()
def test_handle_exception_handles_non_ascii_in_exception_message(self):

Loading…
Cancel
Save