diff --git a/CHANGES.md b/CHANGES.md index b0fe541..c103cbf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,5 @@ -### RQ 1.2.0 (Unreleased) +### RQ 1.2.0 (2020-01-04) +* This release also contains an alpha version of RQ's builtin job scheduling mechanism. Thanks @selwin! * Various internal API changes in preparation to support multiple job dependencies. Thanks @thomasmatecki! * `--verbose` or `--quiet` CLI arguments should override `--logging-level`. Thanks @zyt312074545! * Fixes a bug in `rq info` where it doesn't show workers for empty queues. Thanks @zyt312074545! diff --git a/dev-requirements.txt b/dev-requirements.txt index 93253de..615fac9 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,2 +1,2 @@ mock -pytest +pytest \ No newline at end of file diff --git a/docs/docs/job_registries.md b/docs/docs/job_registries.md index cbd739c..6e29c57 100644 --- a/docs/docs/job_registries.md +++ b/docs/docs/job_registries.md @@ -10,6 +10,7 @@ executed and removed right after completion (success or failure). * `FailedJobRegistry` Holds jobs that have been executed, but didn't finish successfully. * `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that job to finish). +* `ScheduledJobRegistry` Holds schedduled jobs. You can get the number of jobs in a registry, the ids of the jobs in the registry, and more. Below is an example using a `StartedJobRegistry`. @@ -44,6 +45,24 @@ print('Job in registry %s' % (job in registry)) print('Job in registry %s' % (job.id in registry)) ``` +_New in version 1.2.0_ + +You can quickly access job registries from `Queue` objects. + +```python +from redis import Redis +from rq import Queue + +redis = Redis() +queue = Queue(connection=redis) + +queue.started_job_registry # Returns StartedJobRegistry +queue.deferred_job_registry # Returns DeferredJobRegistry +queue.finished_job_registry # Returns FinishedJobRegistry +queue.failed_job_registry # Returns FailedJobRegistry +queue.scheduled_job_registry # Returns ScheduledobRegistry +``` + ## Removing Jobs _New in version 1.2.0_ @@ -69,5 +88,4 @@ for job_id in registry.get_job_ids(): # use `delete_job=True` for job_id in registry.get_job_ids(): registry.remove(job_id, delete_job=True) - ``` \ No newline at end of file diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md new file mode 100644 index 0000000..11fb7f1 --- /dev/null +++ b/docs/docs/scheduling.md @@ -0,0 +1,114 @@ +--- +title: "RQ: Scheduling Jobs" +layout: docs +--- + +_New in version 1.2.0._ + +This builtin version of `RQScheduler` is still in alpha, use at your own risk! + +If you need a battle tested version of RQ job scheduling, please take a look at +https://github.com/rq/rq-scheduler instead. + +New in RQ 1.2.0 is `RQScheduler`, a built-in component that allows you to schedule jobs +for future execution. + +This component is developed based on prior experience of developing the external +`rq-scheduler` library. The goal of taking this component in house is to allow +RQ to have job scheduling capabilities without: +1. Running a separate `rqscheduler` CLI command. +2. Worrying about a separate `Scheduler` class. + + +# Scheduling Jobs for Execution + +There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`. + +`queue.enqueue_at()` works almost like `queue.enqueue()`, except that it expects a datetime +for its first argument. + +```python +from datetime import datetime +from rq import Queue +from redis import Redis +from somewhere import say_hello + +queue = Queue(name='default', connection=Redis()) + +# Schedules job to be run at 9:15, October 10th in the local timezone +job = queue.enqueue_at(datetime(2019, 10, 8, 9, 15), say_hello) +``` + +Note that if you pass in a naive datetime object, RQ will automatically convert it +to the local timezone. + +`queue.enqueue_in()` accepts a `timedelta` as its first argument. + +```python +from datetime import timedelta +from rq import Queue +from redis import Redis +from somewhere import say_hello + +queue = Queue(name='default', connection=Redis()) + +# Schedules job to be run in 10 seconds +job = queue.enqueue_at(timedelta(seconds=10), say_hello) +``` + +Jobs that are scheduled for execution are not placed in the queue, but they are +stored in `ScheduledJobRegistry`. + +```python +from datetime import timedelta +from redis import Redis + +from rq import Queue +from rq.registry import ScheduledJobRegistry + +redis = Redis() + +queue = Queue(name='default', connection=redis) +job = queue.enqueue_in(timedelta(seconds=10), say_nothing) +print(job in queue) # Outputs False as job is not enqueued + +registry = ScheduledJobRegistry(queue=queue) +print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry +``` + +# Running the Scheduler + +If you use RQ's scheduling features, you need to run RQ workers with the +scheduler component enabled. + +```console +$ rq worker --with-scheduler +``` + +You can also run a worker with scheduler enabled in a programmatic way. + +```python +from rq import Worker, Queue +from redis import Redis + +redis = Redis() + +queue = Queue(connection=redis) +worker = Worker(queues=[queue], connection=redis) +worker.work(with_scheduler=True) +``` + +Only a single scheduler can run for a specific queue at any one time. If you run multiple +workers with scheduler enabled, only one scheduler will be actively working for a given queue. + +Active schedulers are responsible for enqueueing scheduled jobs. Active schedulers will check for +scheduled jobs once every second. + +Idle schedulers will periodically (every 15 minutes) check whether the queues they're +responsible for have active schedulers. If they don't, one of the idle schedulers will start +working. This way, if a worker with active scheduler dies, the scheduling work will be picked +up by other workers with the scheduling component enabled. + + + + diff --git a/rq/cli/cli.py b/rq/cli/cli.py index fb2ab0b..a5dad27 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -200,12 +200,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') +@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.argument('queues', nargs=-1) @pass_cli_config def worker(cli_config, burst, logging_level, name, results_ttl, worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn, - exception_handler, pid, disable_default_exception_handler, max_jobs, queues, - log_format, date_format, **options): + exception_handler, pid, disable_default_exception_handler, max_jobs, with_scheduler, + queues, log_format, date_format, **options): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} # Worker specific default arguments @@ -253,7 +254,9 @@ def worker(cli_config, burst, logging_level, name, results_ttl, if verbose or quiet: logging_level = None - worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format, max_jobs=max_jobs) + worker.work(burst=burst, logging_level=logging_level, + date_format=date_format, log_format=log_format, + max_jobs=max_jobs, with_scheduler=with_scheduler) except ConnectionError as e: print(e) sys.exit(1) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 81f4aff..3f8b3aa 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -84,3 +84,23 @@ else: def decode_redis_hash(h): return h + + +try: + from datetime import timezone + utc = timezone.utc +except ImportError: + # Python 2.x workaround + from datetime import timedelta, tzinfo + + class UTC(tzinfo): + def utcoffset(self, dt): + return timedelta(0) + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return timedelta(0) + + utc = UTC() \ No newline at end of file diff --git a/rq/job.py b/rq/job.py index daf08a7..3a5c7c3 100644 --- a/rq/job.py +++ b/rq/job.py @@ -33,7 +33,8 @@ JobStatus = enum( FINISHED='finished', FAILED='failed', STARTED='started', - DEFERRED='deferred' + DEFERRED='deferred', + SCHEDULED='scheduled', ) # Sentinel value to mark that some of our lazily evaluated properties have not @@ -128,9 +129,9 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() - job.result_ttl = result_ttl - job.failure_ttl = failure_ttl - job.ttl = ttl + job.result_ttl = parse_timeout(result_ttl) + job.failure_ttl = parse_timeout(failure_ttl) + job.ttl = parse_timeout(ttl) job.timeout = parse_timeout(timeout) job._status = status job.meta = meta or {} diff --git a/rq/queue.py b/rq/queue.py index 135a99a..89bbea3 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -5,13 +5,14 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings +from datetime import datetime + from redis import WatchError -from .compat import as_text, string_types, total_ordering +from .compat import as_text, string_types, total_ordering, utc from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL -from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError, - UnpickleError) +from .exceptions import DequeueTimeout, NoSuchJobError, UnpickleError from .job import Job, JobStatus from .utils import backend_class, import_attribute, parse_timeout, utcnow @@ -184,7 +185,31 @@ class Queue(object): def failed_job_registry(self): """Returns this queue's FailedJobRegistry.""" from rq.registry import FailedJobRegistry - return FailedJobRegistry(queue=self) + return FailedJobRegistry(queue=self, job_class=self.job_class) + + @property + def started_job_registry(self): + """Returns this queue's FailedJobRegistry.""" + from rq.registry import StartedJobRegistry + return StartedJobRegistry(queue=self, job_class=self.job_class) + + @property + def finished_job_registry(self): + """Returns this queue's FailedJobRegistry.""" + from rq.registry import FinishedJobRegistry + return FinishedJobRegistry(queue=self) + + @property + def deferred_job_registry(self): + """Returns this queue's FailedJobRegistry.""" + from rq.registry import DeferredJobRegistry + return DeferredJobRegistry(queue=self, job_class=self.job_class) + + @property + def scheduled_job_registry(self): + """Returns this queue's FailedJobRegistry.""" + from rq.registry import ScheduledJobRegistry + return ScheduledJobRegistry(queue=self, job_class=self.job_class) def remove(self, job_or_id, pipeline=None): """Removes Job from queue, accepts either a Job instance or ID.""" @@ -220,6 +245,23 @@ class Queue(object): else: connection.rpush(self.key, job_id) + def create_job(self, func, args=None, kwargs=None, timeout=None, + result_ttl=None, ttl=None, failure_ttl=None, + description=None, depends_on=None, job_id=None, + meta=None): + """Creates a job based on parameters given.""" + timeout = parse_timeout(timeout) or self._default_timeout + + job = self.job_class.create( + func, args=args, kwargs=kwargs, connection=self.connection, + result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, + status=JobStatus.QUEUED, description=description, + depends_on=depends_on, timeout=timeout, id=job_id, + origin=self.name, meta=meta + ) + + return job + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, @@ -248,12 +290,12 @@ class Queue(object): job = self.job_class.create( func, args=args, kwargs=kwargs, connection=self.connection, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - status=JobStatus.QUEUED, description=description, - depends_on=depends_on, timeout=timeout, id=job_id, - origin=self.name, meta=meta) + description=description, depends_on=depends_on, origin=self.name, + id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, + ) # If a _dependent_ job depends on any unfinished job, register all the - #_dependent_ job's dependencies instead of enqueueing it. + # _dependent_ job's dependencies instead of enqueueing it. # # `Job#fetch_dependencies` sets WATCH on all dependencies. If # WatchError is raised in the when the pipeline is executed, that means @@ -338,6 +380,24 @@ class Queue(object): at_front=at_front, meta=meta ) + def enqueue_at(self, datetime, func, *args, **kwargs): + """Schedules a job to be enqueued at specified time""" + from .registry import ScheduledJobRegistry + + job = self.create_job(func, *args, **kwargs) + registry = ScheduledJobRegistry(queue=self) + with self.connection.pipeline() as pipeline: + job.save(pipeline=pipeline) + registry.schedule(job, datetime, pipeline=pipeline) + pipeline.execute() + + return job + + def enqueue_in(self, time_delta, func, *args, **kwargs): + """Schedules a job to be executed in a given `timedelta` object""" + return self.enqueue_at(datetime.now(utc) + time_delta, + func, *args, **kwargs) + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. diff --git a/rq/registry.py b/rq/registry.py index 104de06..9232ddf 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,4 +1,8 @@ -from .compat import as_text +import calendar +import time +from datetime import datetime, timedelta + +from .compat import as_text, utc from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL from .exceptions import InvalidJobOperation, NoSuchJobError @@ -32,6 +36,9 @@ class BaseRegistry(object): """Returns the number of jobs in this registry""" return self.count + def __eq__(self, other): + return (self.name == other.name and self.connection == other.connection) + def __contains__(self, item): """ Returns a boolean indicating registry contains the given @@ -92,6 +99,11 @@ class BaseRegistry(object): """Returns Queue object associated with this registry.""" return Queue(self.name, connection=self.connection) + def get_expiration_time(self, job): + """Returns job's expiration time.""" + score = self.connection.zscore(self.key, job.id) + return datetime.utcfromtimestamp(score) + class StartedJobRegistry(BaseRegistry): """ @@ -221,6 +233,70 @@ class DeferredJobRegistry(BaseRegistry): pass +class ScheduledJobRegistry(BaseRegistry): + """ + Registry of scheduled jobs. + """ + key_template = 'rq:scheduled:{0}' + + def __init__(self, *args, **kwargs): + super(ScheduledJobRegistry, self).__init__(*args, **kwargs) + # The underlying implementation of get_jobs_to_enqueue() is + # the same as get_expired_job_ids, but get_expired_job_ids() doesn't + # make sense in this context + self.get_jobs_to_enqueue = self.get_expired_job_ids + + def schedule(self, job, scheduled_datetime, pipeline=None): + """ + Adds job to registry, scored by its execution time (in UTC). + If datetime has no tzinfo, it will assume localtimezone. + """ + # If datetime has no timezone, assume server's local timezone + # if we're on Python 3. If we're on Python 2.7, raise an + # exception since Python < 3.2 has no builtin `timezone` class + if not scheduled_datetime.tzinfo: + try: + from datetime import timezone + except ImportError: + raise ValueError('datetime object with no timezone') + tz = timezone(timedelta(seconds=-time.timezone)) + scheduled_datetime = scheduled_datetime.replace(tzinfo=tz) + + timestamp = calendar.timegm(scheduled_datetime.utctimetuple()) + return self.connection.zadd(self.key, {job.id: timestamp}) + + def cleanup(self): + """This method is only here to prevent errors because this method is + automatically called by `count()` and `get_job_ids()` methods + implemented in BaseRegistry.""" + pass + + def remove_jobs(self, timestamp=None, pipeline=None): + """Remove jobs whose timestamp is in the past from registry.""" + connection = pipeline if pipeline is not None else self.connection + score = timestamp if timestamp is not None else current_timestamp() + return connection.zremrangebyscore(self.key, 0, score) + + def get_jobs_to_schedule(self, timestamp=None): + """Remove jobs whose timestamp is in the past from registry.""" + score = timestamp if timestamp is not None else current_timestamp() + return [as_text(job_id) for job_id in + self.connection.zrangebyscore(self.key, 0, score)] + + def get_scheduled_time(self, job_or_id): + """Returns datetime (UTC) at which job is scheduled to be enqueued""" + if isinstance(job_or_id, self.job_class): + job_id = job_or_id.id + else: + job_id = job_or_id + + score = self.connection.zscore(self.key, job_id) + if not score: + raise NoSuchJobError + + return datetime.fromtimestamp(score, tz=utc) + + def clean_registries(queue): """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" registry = FinishedJobRegistry(name=queue.name, diff --git a/rq/scheduler.py b/rq/scheduler.py new file mode 100644 index 0000000..d4ebec5 --- /dev/null +++ b/rq/scheduler.py @@ -0,0 +1,197 @@ +import logging +import os +import signal +import time +import traceback + +from datetime import datetime +from multiprocessing import Process + +from .job import Job +from .queue import Queue +from .registry import ScheduledJobRegistry +from .utils import current_timestamp, enum + + +SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' +SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s' + +format = "%(asctime)s: %(message)s" +logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") + + +class RQScheduler(object): + + # STARTED: scheduler has been started but sleeping + # WORKING: scheduler is in the midst of scheduling jobs + # STOPPED: scheduler is in stopped condition + + Status = enum( + 'SchedulerStatus', + STARTED='started', + WORKING='working', + STOPPED='stopped' + ) + + def __init__(self, queues, connection, interval=1): + self._queue_names = set(parse_names(queues)) + self._acquired_locks = set([]) + self._scheduled_job_registries = [] + self.lock_acquisition_time = None + self.connection = connection + self.interval = interval + self._stop_requested = False + self._status = self.Status.STOPPED + self._process = None + + @property + def acquired_locks(self): + return self._acquired_locks + + @property + def status(self): + return self._status + + @property + def should_reacquire_locks(self): + """Returns True if lock_acquisition_time is longer than 15 minutes ago""" + if self._queue_names == self.acquired_locks: + return False + if not self.lock_acquisition_time: + return True + return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900 + + def acquire_locks(self, auto_start=False): + """Returns names of queue it successfully acquires lock on""" + successful_locks = set([]) + pid = os.getpid() + logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) + for name in self._queue_names: + if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5): + successful_locks.add(name) + self._acquired_locks = self._acquired_locks.union(successful_locks) + if self._acquired_locks: + self.prepare_registries(self._acquired_locks) + + self.lock_acquisition_time = datetime.now() + + # If auto_start is requested and scheduler is not started, + # run self.start() + if self._acquired_locks and auto_start: + if not self._process: + self.start() + + return successful_locks + + def prepare_registries(self, queue_names): + """Prepare scheduled job registries for use""" + self._scheduled_job_registries = [] + for name in queue_names: + self._scheduled_job_registries.append( + ScheduledJobRegistry(name, connection=self.connection) + ) + + @classmethod + def get_locking_key(self, name): + """Returns scheduler key for a given queue name""" + return SCHEDULER_LOCKING_KEY_TEMPLATE % name + + def enqueue_scheduled_jobs(self): + """Enqueue jobs whose timestamp is in the past""" + self._status = self.Status.WORKING + for registry in self._scheduled_job_registries: + timestamp = current_timestamp() + + # TODO: try to use Lua script to make get_jobs_to_schedule() + # and remove_jobs() atomic + job_ids = registry.get_jobs_to_schedule(timestamp) + + if not job_ids: + continue + + queue = Queue(registry.name, connection=self.connection) + + with self.connection.pipeline() as pipeline: + # This should be done in bulk + for job_id in job_ids: + job = Job.fetch(job_id, connection=self.connection) + queue.enqueue_job(job, pipeline=pipeline) + registry.remove_jobs(timestamp) + pipeline.execute() + self._status = self.Status.STARTED + + def _install_signal_handlers(self): + """Installs signal handlers for handling SIGINT and SIGTERM + gracefully. + """ + signal.signal(signal.SIGINT, self.request_stop) + signal.signal(signal.SIGTERM, self.request_stop) + + def request_stop(self, signum=None, frame=None): + """Toggle self._stop_requested that's checked on every loop""" + self._stop_requested = True + + def heartbeat(self): + """Updates the TTL on scheduler keys and the locks""" + logging.info("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) + if len(self._queue_names) > 1: + with self.connection.pipeline() as pipeline: + for name in self._queue_names: + key = self.get_locking_key(name) + pipeline.expire(key, self.interval + 5) + pipeline.execute() + else: + key = self.get_locking_key(next(iter(self._queue_names))) + self.connection.expire(key, self.interval + 5) + + def stop(self): + logging.info("Scheduler stopping, releasing locks for %s...", + ','.join(self._queue_names)) + keys = [self.get_locking_key(name) for name in self._queue_names] + self.connection.delete(*keys) + self._status = self.Status.STOPPED + + def start(self): + self._status = self.Status.STARTED + self._process = Process(target=run, args=(self,), name='Scheduler') + self._process.start() + return self._process + + def work(self): + self._install_signal_handlers() + while True: + if self._stop_requested: + self.stop() + break + + if self.should_reacquire_locks: + self.acquire_locks() + + self.enqueue_scheduled_jobs() + self.heartbeat() + time.sleep(self.interval) + + +def run(scheduler): + logging.info("Scheduler for %s started with PID %s", + ','.join(scheduler._queue_names), os.getpid()) + try: + scheduler.work() + except: # noqa + logging.error( + 'Scheduler [PID %s] raised an exception.\n%s', + os.getpid(), traceback.format_exc() + ) + raise + logging.info("Scheduler with PID %s has stopped", os.getpid()) + + +def parse_names(queues_or_names): + """Given a list of strings or queues, returns queue names""" + names = [] + for queue_or_name in queues_or_names: + if isinstance(queue_or_name, Queue): + names.append(queue_or_name.name) + else: + names.append(str(queue_or_name)) + return names diff --git a/rq/utils.py b/rq/utils.py index b479ec7..c399b55 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -125,10 +125,7 @@ class ColorizingStreamHandler(logging.StreamHandler): def __init__(self, exclude=None, *args, **kwargs): self.exclude = exclude - if is_python_version((2, 6)): - logging.StreamHandler.__init__(self, *args, **kwargs) - else: - super(ColorizingStreamHandler, self).__init__(*args, **kwargs) + super(ColorizingStreamHandler, self).__init__(*args, **kwargs) @property def is_tty(self): diff --git a/rq/worker.py b/rq/worker.py index 6960d34..25b98e8 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -33,8 +33,8 @@ from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue -from .registry import (FailedJobRegistry, FinishedJobRegistry, - StartedJobRegistry, clean_registries) +from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries +from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .utils import (backend_class, ensure_list, enum, @@ -204,6 +204,7 @@ class Worker(object): self.failed_job_count = 0 self.total_working_time = 0 self.birth_date = None + self.scheduler = None self.disable_default_exception_handler = disable_default_exception_handler @@ -221,11 +222,11 @@ class Worker(object): def queue_names(self): """Returns the queue names of this worker's queues.""" - return list(map(lambda q: q.name, self.queues)) + return [queue.name for queue in self.queues] def queue_keys(self): """Returns the Redis keys representing this worker's queues.""" - return list(map(lambda q: q.key, self.queues)) + return [queue.key for queue in self.queues] @property def key(self): @@ -411,7 +412,11 @@ class Worker(object): self.set_shutdown_requested_date() self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') + if self.scheduler: + self.stop_scheduler() else: + if self.scheduler: + self.stop_scheduler() raise StopRequested() def handle_warm_shutdown_request(self): @@ -440,8 +445,22 @@ class Worker(object): if before_state: self.set_state(before_state) + def run_maintenance_tasks(self): + """ + Runs periodic maintenance tasks, these include: + 1. Check if scheduler should be started. This check should not be run + on first run since worker.work() already calls + `scheduler.enqueue_scheduled_jobs()` on startup. + 2. Cleaning registries + """ + # No need to try to start scheduler on first run + if self.last_cleaned_at: + if self.scheduler and not self.scheduler._process: + self.scheduler.acquire_locks(auto_start=True) + self.clean_registries() + def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None): + log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -450,8 +469,7 @@ class Worker(object): The return value indicates whether any jobs were processed. """ - setup_loghandlers(logging_level, date_format, log_format) - self._install_signal_handlers() + setup_loghandlers(logging_level, date_format, log_format) completed_jobs = 0 self.register_birth() self.log.info("Worker %s: started, version %s", self.key, VERSION) @@ -459,13 +477,27 @@ class Worker(object): qnames = self.queue_names() self.log.info('*** Listening on %s...', green(', '.join(qnames))) + if with_scheduler: + self.scheduler = RQScheduler(self.queues, connection=self.connection) + self.scheduler.acquire_locks() + # If lock is acquired, start scheduler + if self.scheduler.acquired_locks: + # If worker is run on burst mode, enqueue_scheduled_jobs() + # before working. Otherwise, start scheduler in a separate process + if burst: + self.scheduler.enqueue_scheduled_jobs() + else: + self.scheduler.start() + + self._install_signal_handlers() + try: while True: try: self.check_for_suspension(burst) if self.should_run_maintenance_tasks: - self.clean_registries() + self.run_maintenance_tasks() if self._stop_requested: self.log.info('Worker %s: stopping on request', self.key) @@ -507,9 +539,23 @@ class Worker(object): break finally: if not self.is_horse: + + if self.scheduler: + self.stop_scheduler() + self.register_death() return bool(completed_jobs) + def stop_scheduler(self): + """Ensure scheduler process is stopped""" + if self.scheduler._process and self.scheduler._process.pid: + # Send the kill signal to scheduler process + try: + os.kill(self.scheduler._process.pid, signal.SIGTERM) + except OSError: + pass + self.scheduler._process.join() + def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = ','.join(self.queue_names()) @@ -521,6 +567,9 @@ class Worker(object): while True: self.heartbeat() + if self.should_run_maintenance_tasks: + self.run_maintenance_tasks() + try: result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection, @@ -798,9 +847,7 @@ class Worker(object): # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection, - job_class=self.job_class) + finished_job_registry = queue.finished_job_registry finished_job_registry.add(job, result_ttl, pipeline) job.cleanup(result_ttl, pipeline=pipeline, @@ -819,9 +866,7 @@ class Worker(object): self.prepare_job_execution(job, heartbeat_ttl) push_connection(self.connection) - started_job_registry = StartedJobRegistry(job.origin, - self.connection, - job_class=self.job_class) + started_job_registry = queue.started_job_registry try: job.started_at = utcnow() @@ -837,7 +882,7 @@ class Worker(object): self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) - except: + except: # NOQA job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = self._get_safe_exception_string( diff --git a/tests/fixtures.py b/tests/fixtures.py index 882bdad..46cdaac 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -8,6 +8,7 @@ from __future__ import (absolute_import, division, print_function, import os import time +import signal import sys from rq import Connection, get_current_job, get_current_connection, Queue @@ -164,3 +165,13 @@ def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay): class DummyQueue(object): pass + + +def kill_worker(pid, double_kill, interval=0.5): + # wait for the worker to be started over on the main process + time.sleep(interval) + os.kill(pid, signal.SIGTERM) + if double_kill: + # give the worker time to switch signal handler + time.sleep(interval) + os.kill(pid, signal.SIGTERM) diff --git a/tests/test_cli.py b/tests/test_cli.py index 343e286..1dea1e3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,14 +2,17 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import datetime + from click.testing import CliRunner from redis import Redis from rq import Queue +from rq.compat import utc from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig from rq.job import Job -from rq.registry import FailedJobRegistry +from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.worker import Worker, WorkerStatus import pytest @@ -244,6 +247,21 @@ class TestRQCli(RQTestCase): self.assertTrue(len(pid.read()) > 0) self.assert_normal_execution(result) + def test_worker_with_scheduler(self): + """rq worker -u --with-scheduler""" + queue = Queue(connection=self.connection) + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + registry = ScheduledJobRegistry(queue=queue) + + runner = CliRunner() + result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) + self.assert_normal_execution(result) + self.assertEqual(len(registry), 1) # 1 job still scheduled + + result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--with-scheduler']) + self.assert_normal_execution(result) + self.assertEqual(len(registry), 0) # Job has been enqueued + def test_worker_logging_options(self): """--quiet and --verbose logging options are supported""" runner = CliRunner() @@ -256,7 +274,7 @@ class TestRQCli(RQTestCase): # --quiet and --verbose are mutually exclusive result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) - + def test_exception_handlers(self): """rq worker -u -b --exception-handler """ connection = Redis.from_url(self.redis_url) diff --git a/tests/test_queue.py b/tests/test_queue.py index 22b6b11..865e14c 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,11 +2,18 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import datetime, timedelta + from rq import Queue -from rq.exceptions import InvalidJobDependency, NoSuchJobError +from rq.compat import utc +from rq.exceptions import NoSuchJobError + from rq.job import Job, JobStatus -from rq.registry import DeferredJobRegistry +from rq.registry import (DeferredJobRegistry, FailedJobRegistry, + FinishedJobRegistry, ScheduledJobRegistry, + StartedJobRegistry) from rq.worker import Worker + from tests import RQTestCase from tests.fixtures import echo, say_hello @@ -236,7 +243,7 @@ class TestQueue(RQTestCase): None ) self.assertEqual(q.count, 0) - + def test_enqueue_with_ttl(self): """Negative TTL value is not allowed""" queue = Queue() @@ -520,3 +527,23 @@ class TestQueue(RQTestCase): job_fetch = q1.fetch_job(job_orig.id) self.assertIsNotNone(job_fetch) + + def test_getting_registries(self): + """Getting job registries from queue object""" + queue = Queue('example') + self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue)) + self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue)) + self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) + self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) + self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) + + +class TestJobScheduling(RQTestCase): + def test_enqueue_at(self): + """enqueue_at() creates a job in ScheduledJobRegistry""" + queue = Queue(connection=self.testconn) + scheduled_time = datetime.now(utc) + timedelta(seconds=10) + job = queue.enqueue_at(scheduled_time, say_hello) + registry = ScheduledJobRegistry(queue=queue) + self.assertIn(job, registry) + self.assertTrue(registry.get_expiration_time(job), scheduled_time) diff --git a/tests/test_registry.py b/tests/test_registry.py index d0d7b09..b7921d1 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import +from datetime import datetime, timedelta + from rq.compat import as_text from rq.defaults import DEFAULT_FAILURE_TTL from rq.exceptions import InvalidJobOperation @@ -57,6 +59,18 @@ class TestRegistry(RQTestCase): self.assertTrue(job in registry) self.assertTrue(job.id in registry) + def test_get_expiration_time(self): + """registry.get_expiration_time() returns correct datetime objects""" + registry = StartedJobRegistry(connection=self.testconn) + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + registry.add(job, 5) + self.assertEqual( + registry.get_expiration_time(job), + (datetime.utcnow() + timedelta(seconds=5)).replace(microsecond=0) + ) + def test_add_and_remove(self): """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..459fec8 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,291 @@ +import os +import time + +from datetime import datetime, timedelta +from multiprocessing import Process + +from rq import Queue +from rq.compat import utc, PY2 +from rq.exceptions import NoSuchJobError +from rq.job import Job +from rq.registry import FinishedJobRegistry, ScheduledJobRegistry +from rq.scheduler import RQScheduler +from rq.utils import current_timestamp +from rq.worker import Worker + +from .fixtures import kill_worker, say_hello +from tests import RQTestCase + +import mock + + +class TestScheduledJobRegistry(RQTestCase): + + def test_get_jobs_to_enqueue(self): + """Getting job ids to enqueue from ScheduledJobRegistry.""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + timestamp = current_timestamp() + + self.testconn.zadd(registry.key, {'foo': 1}) + self.testconn.zadd(registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(registry.key, {'baz': timestamp + 30}) + + self.assertEqual(registry.get_jobs_to_enqueue(), ['foo']) + self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20), + ['foo', 'bar']) + + def test_get_scheduled_time(self): + """get_scheduled_time() returns job's scheduled datetime""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + + job = Job.create('myfunc', connection=self.testconn) + job.save() + dt = datetime(2019, 1, 1, tzinfo=utc) + registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + self.assertEqual(registry.get_scheduled_time(job), dt) + # get_scheduled_time() should also work with job ID + self.assertEqual(registry.get_scheduled_time(job.id), dt) + + # registry.get_scheduled_time() raises NoSuchJobError if + # job.id is not found + self.assertRaises(NoSuchJobError, registry.get_scheduled_time, '123') + + def test_schedule(self): + """Adding job with the correct score to ScheduledJobRegistry""" + queue = Queue(connection=self.testconn) + job = Job.create('myfunc', connection=self.testconn) + job.save() + registry = ScheduledJobRegistry(queue=queue) + + if PY2: + # On Python 2, datetime needs to have timezone + self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1)) + registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + self.assertEqual(self.testconn.zscore(registry.key, job.id), + 1546300800) # 2019-01-01 UTC in Unix timestamp + else: + from datetime import timezone + # If we pass in a datetime with no timezone, `schedule()` + # assumes local timezone so depending on your local timezone, + # the timestamp maybe different + registry.schedule(job, datetime(2019, 1, 1)) + self.assertEqual(self.testconn.zscore(registry.key, job.id), + 1546300800 + time.timezone) # 2019-01-01 UTC in Unix timestamp + + # Score is always stored in UTC even if datetime is in a different tz + tz = timezone(timedelta(hours=7)) + job = Job.create('myfunc', connection=self.testconn) + job.save() + registry.schedule(job, datetime(2019, 1, 1, 7, tzinfo=tz)) + self.assertEqual(self.testconn.zscore(registry.key, job.id), + 1546300800) # 2019-01-01 UTC in Unix timestamp + + +class TestScheduler(RQTestCase): + + def test_init(self): + """Scheduler can be instantiated with queues or queue names""" + foo_queue = Queue('foo', connection=self.testconn) + scheduler = RQScheduler([foo_queue, 'bar'], connection=self.testconn) + self.assertEqual(scheduler._queue_names, {'foo', 'bar'}) + self.assertEqual(scheduler.status, RQScheduler.Status.STOPPED) + + def test_should_reacquire_locks(self): + """scheduler.should_reacquire_locks works properly""" + queue = Queue(connection=self.testconn) + scheduler = RQScheduler([queue], connection=self.testconn) + self.assertTrue(scheduler.should_reacquire_locks) + scheduler.acquire_locks() + self.assertIsNotNone(scheduler.lock_acquisition_time) + + # scheduler.should_reacquire_locks always returns False if + # scheduler.acquired_locks and scheduler._queue_names are the same + self.assertFalse(scheduler.should_reacquire_locks) + scheduler.lock_acquisition_time = datetime.now() - timedelta(minutes=16) + self.assertFalse(scheduler.should_reacquire_locks) + + scheduler._queue_names = set(['default', 'foo']) + self.assertTrue(scheduler.should_reacquire_locks) + scheduler.acquire_locks() + self.assertFalse(scheduler.should_reacquire_locks) + + def test_lock_acquisition(self): + """Test lock acquisition""" + name_1 = 'lock-test-1' + name_2 = 'lock-test-2' + name_3 = 'lock-test-3' + scheduler = RQScheduler([name_1], self.testconn) + + self.assertEqual(scheduler.acquire_locks(), {name_1}) + self.assertEqual(scheduler._acquired_locks, {name_1}) + self.assertEqual(scheduler.acquire_locks(), set([])) + + # Only name_2 is returned since name_1 is already locked + scheduler = RQScheduler([name_1, name_2], self.testconn) + self.assertEqual(scheduler.acquire_locks(), {name_2}) + self.assertEqual(scheduler._acquired_locks, {name_2}) + + # When a new lock is successfully acquired, _acquired_locks is added + scheduler._queue_names.add(name_3) + self.assertEqual(scheduler.acquire_locks(), {name_3}) + self.assertEqual(scheduler._acquired_locks, {name_2, name_3}) + + def test_lock_acquisition_with_auto_start(self): + """Test lock acquisition with auto_start=True""" + scheduler = RQScheduler(['auto-start'], self.testconn) + with mock.patch.object(scheduler, 'start') as mocked: + scheduler.acquire_locks(auto_start=True) + self.assertEqual(mocked.call_count, 1) + + # If process has started, scheduler.start() won't be called + scheduler = RQScheduler(['auto-start2'], self.testconn) + scheduler._process = 1 + with mock.patch.object(scheduler, 'start') as mocked: + scheduler.acquire_locks(auto_start=True) + self.assertEqual(mocked.call_count, 0) + + def test_heartbeat(self): + """Test that heartbeat updates locking keys TTL""" + name_1 = 'lock-test-1' + name_2 = 'lock-test-2' + scheduler = RQScheduler([name_1, name_2], self.testconn) + scheduler.acquire_locks() + + locking_key_1 = RQScheduler.get_locking_key(name_1) + locking_key_2 = RQScheduler.get_locking_key(name_2) + + with self.testconn.pipeline() as pipeline: + pipeline.expire(locking_key_1, 1000) + pipeline.expire(locking_key_2, 1000) + + scheduler.heartbeat() + self.assertEqual(self.testconn.ttl(locking_key_1), 6) + self.assertEqual(self.testconn.ttl(locking_key_1), 6) + + # scheduler.stop() releases locks and sets status to STOPPED + scheduler._status = scheduler.Status.WORKING + scheduler.stop() + self.assertFalse(self.testconn.exists(locking_key_1)) + self.assertFalse(self.testconn.exists(locking_key_2)) + self.assertEqual(scheduler.status, scheduler.Status.STOPPED) + + # Heartbeat also works properly for schedulers with a single queue + scheduler = RQScheduler([name_1], self.testconn) + scheduler.acquire_locks() + self.testconn.expire(locking_key_1, 1000) + scheduler.heartbeat() + self.assertEqual(self.testconn.ttl(locking_key_1), 6) + + def test_enqueue_scheduled_jobs(self): + """Scheduler can enqueue scheduled jobs""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + job = Job.create('myfunc', connection=self.testconn) + job.save() + registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc)) + scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.acquire_locks() + scheduler.enqueue_scheduled_jobs() + self.assertEqual(len(queue), 1) + + # After job is scheduled, registry should be empty + self.assertEqual(len(registry), 0) + + # Jobs scheduled in the far future should not be affected + registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc)) + scheduler.enqueue_scheduled_jobs() + self.assertEqual(len(queue), 1) + + def test_prepare_registries(self): + """prepare_registries() creates self._scheduled_job_registries""" + foo_queue = Queue('foo', connection=self.testconn) + bar_queue = Queue('bar', connection=self.testconn) + scheduler = RQScheduler([foo_queue, bar_queue], connection=self.testconn) + self.assertEqual(scheduler._scheduled_job_registries, []) + scheduler.prepare_registries([foo_queue.name]) + self.assertEqual(scheduler._scheduled_job_registries, [ScheduledJobRegistry(queue=foo_queue)]) + scheduler.prepare_registries([foo_queue.name, bar_queue.name]) + self.assertEqual( + scheduler._scheduled_job_registries, + [ScheduledJobRegistry(queue=foo_queue), ScheduledJobRegistry(queue=bar_queue)] + ) + + +class TestWorker(RQTestCase): + + def test_work_burst(self): + """worker.work() with scheduler enabled works properly""" + queue = Queue(connection=self.testconn) + worker = Worker(queues=[queue], connection=self.testconn) + worker.work(burst=True, with_scheduler=False) + self.assertIsNone(worker.scheduler) + + worker = Worker(queues=[queue], connection=self.testconn) + worker.work(burst=True, with_scheduler=True) + self.assertIsNotNone(worker.scheduler) + + @mock.patch.object(RQScheduler, 'acquire_locks') + def test_run_maintenance_tasks(self, mocked): + """scheduler.acquire_locks() is called only when scheduled is enabled""" + queue = Queue(connection=self.testconn) + worker = Worker(queues=[queue], connection=self.testconn) + + worker.run_maintenance_tasks() + self.assertEqual(mocked.call_count, 0) + + worker.last_cleaned_at = None + worker.scheduler = RQScheduler([queue], connection=self.testconn) + worker.run_maintenance_tasks() + self.assertEqual(mocked.call_count, 0) + + worker.last_cleaned_at = datetime.now() + worker.run_maintenance_tasks() + self.assertEqual(mocked.call_count, 1) + + def test_work(self): + queue = Queue(connection=self.testconn) + worker = Worker(queues=[queue], connection=self.testconn) + p = Process(target=kill_worker, args=(os.getpid(), False, 5)) + + p.start() + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + worker.work(burst=False, with_scheduler=True) + p.join(1) + self.assertIsNotNone(worker.scheduler) + registry = FinishedJobRegistry(queue=queue) + self.assertEqual(len(registry), 1) + + +class TestQueue(RQTestCase): + + def test_enqueue_at(self): + """queue.enqueue_at() puts job in the scheduled""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + scheduler = RQScheduler([queue], connection=self.testconn) + scheduler.acquire_locks() + # Jobs created using enqueue_at is put in the ScheduledJobRegistry + queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + self.assertEqual(len(queue), 0) + self.assertEqual(len(registry), 1) + + # After enqueue_scheduled_jobs() is called, the registry is empty + # and job is enqueued + scheduler.enqueue_scheduled_jobs() + self.assertEqual(len(queue), 1) + self.assertEqual(len(registry), 0) + + def test_enqueue_in(self): + """queue.enqueue_in() schedules job correctly""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + + job = queue.enqueue_in(timedelta(seconds=30), say_hello) + now = datetime.now(utc) + scheduled_time = registry.get_scheduled_time(job) + # Ensure that job is scheduled roughly 30 seconds from now + self.assertTrue( + now + timedelta(seconds=28) < scheduled_time < now + timedelta(seconds=32) + ) diff --git a/tests/test_worker.py b/tests/test_worker.py index 25fde13..e983bf5 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -22,9 +22,9 @@ from mock import Mock from tests import RQTestCase, slow from tests.fixtures import ( - create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, - say_pid, run_dummy_heroku_worker, access_self, modify_self, - modify_self_and_error, long_running_job, save_key_ttl + access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing, + kill_worker, long_running_job, modify_self, modify_self_and_error, + run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, ) from rq import Queue, SimpleWorker, Worker, get_current_connection @@ -59,6 +59,9 @@ class TestWorker(RQTestCase): self.assertEqual(w.queues[0].name, 'foo') self.assertEqual(w.queues[1].name, 'bar') + self.assertEqual(w.queue_keys(), [w.queues[0].key, w.queues[1].key]) + self.assertEqual(w.queue_names(), ['foo', 'bar']) + # With iterable of strings w = Worker(iter(['foo', 'bar'])) self.assertEqual(w.queues[0].name, 'foo') @@ -952,16 +955,6 @@ class TestWorker(RQTestCase): self.assertEqual(worker.python_version, python_version) -def kill_worker(pid, double_kill): - # wait for the worker to be started over on the main process - time.sleep(0.5) - os.kill(pid, signal.SIGTERM) - if double_kill: - # give the worker time to switch signal handler - time.sleep(0.5) - os.kill(pid, signal.SIGTERM) - - def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) os.kill(pid, signal.SIGKILL) @@ -1203,7 +1196,7 @@ class TestExceptionHandlerMessageEncoding(RQTestCase): # Mimic how exception info is actually passed forwards try: raise Exception(u"💪") - except: + except Exception: self.exc_info = sys.exc_info() def test_handle_exception_handles_non_ascii_in_exception_message(self):