Worker docstrings (#1810)

* docs: worker docstrings

* docs: more type annotaions
main
lowercase00 2 years ago committed by GitHub
parent eef6dbbb3c
commit 9311eeba17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,7 +13,7 @@ from datetime import timedelta
from enum import Enum
from uuid import uuid4
from random import shuffle
from typing import Callable, List, Optional, TYPE_CHECKING, Type
from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union
if TYPE_CHECKING:
from redis import Redis
@ -155,15 +155,15 @@ class Worker:
return [as_text(key) for key in get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None):
"""Returns the number of workers by queue or connection
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int:
"""Returns the number of workers by queue or connection.
Args:
connection (Optional['Redis'], optional): _description_. Defaults to None.
queue (Optional['Queue'], optional): _description_. Defaults to None.
connection (Optional[Redis], optional): Redis connection. Defaults to None.
queue (Optional[Queue], optional): The queue to use. Defaults to None.
Returns:
_type_: _description_
length (int): The queue length.
"""
return len(get_keys(queue=queue, connection=connection))
@ -172,13 +172,26 @@ class Worker:
cls,
worker_key: str,
connection: Optional['Redis'] = None,
job_class: Type['Job'] = None,
queue_class: Type['Queue'] = None,
job_class: Optional[Type['Job']] = None,
queue_class: Optional[Type['Queue']] = None,
serializer=None,
):
) -> 'Worker':
"""Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys.
Args:
worker_key (str): The worker key
connection (Optional[Redis], optional): Redis connection. Defaults to None.
job_class (Optional[Type[Job]], optional): The job class if custom class is being used. Defaults to None.
queue_class (Optional[Type[Queue]], optional): The queue class if a custom class is being used. Defaults to None.
serializer (Any, optional): The serializer to use. Defaults to None.
Raises:
ValueError: If the key doesn't start with `rq:worker:`, the default worker namespace prefix.
Returns:
worker (Worker): The Worker instance.
"""
prefix = cls.redis_worker_namespace_prefix
if not worker_key.startswith(prefix):
@ -202,7 +215,6 @@ class Worker:
)
worker.refresh()
return worker
def __init__(
@ -262,7 +274,7 @@ class Worker:
self.successful_job_count: int = 0
self.failed_job_count: int = 0
self.total_working_time: int = 0
self.current_job_working_time: int = 0
self.current_job_working_time: float = 0
self.birth_date = None
self.scheduler: Optional[RQScheduler] = None
self.pubsub = None
@ -324,12 +336,20 @@ class Worker:
if not isinstance(queue, self.queue_class):
raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class))
def queue_names(self):
"""Returns the queue names of this worker's queues."""
def queue_names(self) -> List[str]:
"""Returns the queue names of this worker's queues.
Returns:
List[str]: The queue names.
"""
return [queue.name for queue in self.queues]
def queue_keys(self):
"""Returns the Redis keys representing this worker's queues."""
def queue_keys(self) -> List[str]:
"""Returns the Redis keys representing this worker's queues.
Returns:
List[str]: The list of strings with queues keys
"""
return [queue.key for queue in self.queues]
@property
@ -423,13 +443,6 @@ class Worker:
"""Sets the date on which the worker received a (warm) shutdown request"""
self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
# @property
# def birth_date(self):
# """Fetches birth date from Redis."""
# birth_timestamp = self.connection.hget(self.key, 'birth')
# if birth_timestamp is not None:
# return utcparse(as_text(birth_timestamp))
@property
def shutdown_requested_date(self):
"""Fetches shutdown_requested_date from Redis."""
@ -444,7 +457,13 @@ class Worker:
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline: Optional['Pipeline'] = None):
def set_state(self, state: str, pipeline: Optional['Pipeline'] = None):
"""Sets the worker's state.
Args:
state (str): The state
pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None.
"""
self._state = state
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'state', state)
@ -454,7 +473,7 @@ class Worker:
warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning)
self.set_state(state)
def get_state(self):
def get_state(self) -> str:
return self._state
def _get_state(self):
@ -464,42 +483,65 @@ class Worker:
state = property(_get_state, _set_state)
def set_current_job_working_time(self, current_job_working_time, pipeline: Optional['Pipeline'] = None):
def set_current_job_working_time(self, current_job_working_time: float, pipeline: Optional['Pipeline'] = None):
"""Sets the current job working time in seconds
Args:
current_job_working_time (float): The current job working time in seconds
pipeline (Optional[Pipeline], optional): Pipeline to use. Defaults to None.
"""
self.current_job_working_time = current_job_working_time
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'current_job_working_time', current_job_working_time)
def set_current_job_id(self, job_id: Optional[str] = None, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
"""Sets the current job id.
If `None` is used it will delete the current job key.
Args:
job_id (Optional[str], optional): The job id. Defaults to None.
pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
if job_id is None:
connection.hdel(self.key, 'current_job')
else:
connection.hset(self.key, 'current_job', job_id)
def get_current_job_id(self, pipeline: Optional['Pipeline'] = None):
def get_current_job_id(self, pipeline: Optional['Pipeline'] = None) -> Optional[str]:
"""Retrieves the current job id.
Args:
pipeline (Optional['Pipeline'], optional): The pipeline to use. Defaults to None.
Returns:
job_id (Optional[str): The job id
"""
connection = pipeline if pipeline is not None else self.connection
return as_text(connection.hget(self.key, 'current_job'))
def get_current_job(self):
"""Returns the job id of the currently executing job."""
job_id = self.get_current_job_id()
def get_current_job(self) -> Optional['Job']:
"""Returns the currently executing job instance.
Returns:
job (Job): The job instance.
"""
job_id = self.get_current_job_id()
if job_id is None:
return None
return self.job_class.fetch(job_id, self.connection, self.serializer)
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
"""Installs signal handlers for handling SIGINT and SIGTERM gracefully.
"""
signal.signal(signal.SIGINT, self.request_stop)
signal.signal(signal.SIGTERM, self.request_stop)
def kill_horse(self, sig=SIGKILL):
"""
Kill the horse but catch "No such process" error has the horse could already be dead.
def kill_horse(self, sig: signal.Signals = SIGKILL):
"""Kill the horse but catch "No such process" error has the horse could already be dead.
Args:
sig (signal.Signals, optional): _description_. Defaults to SIGKILL.
"""
try:
os.killpg(os.getpgid(self.horse_pid), sig)
@ -511,9 +553,9 @@ class Worker:
else:
raise
def wait_for_horse(self):
"""
A waiting the end of the horse process and recycling resources.
def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]:
"""Waits for the horse process to complete.
Uses `0` as argument as to include "any child in the process group of the current process".
"""
pid = None
stat = None
@ -525,7 +567,15 @@ class Worker:
return pid, stat
def request_force_stop(self, signum, frame):
"""Terminates the application (cold shutdown)."""
"""Terminates the application (cold shutdown).
Args:
signum (Any): Signum
frame (Any): Frame
Raises:
SystemExit: SystemExit
"""
self.log.warning('Cold shut down')
# Take down the horse with the worker
@ -538,6 +588,10 @@ class Worker:
def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
Args:
signum (Any): Signum
frame (Any): Frame
"""
self.log.debug('Got signal %s', signal_name(signum))
@ -566,8 +620,9 @@ class Worker:
def handle_warm_shutdown_request(self):
self.log.info('Warm shut down requested')
def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`"""
def check_for_suspension(self, burst: bool):
"""Check to see if workers have been suspended by `rq suspend`
"""
before_state = None
notified = False
@ -595,8 +650,9 @@ class Worker:
on first run since worker.work() already calls
`scheduler.enqueue_scheduled_jobs()` on startup.
2. Cleaning registries
No need to try to start scheduler on first run
"""
# No need to try to start scheduler on first run
if self.last_cleaned_at:
if self.scheduler and (not self.scheduler._process or not self.scheduler._process.is_alive()):
self.scheduler.acquire_locks(auto_start=True)
@ -619,17 +675,23 @@ class Worker:
self.pubsub.close()
def reorder_queues(self, reference_queue):
"""Method placeholder to workers that implement some reordering strategy.
`pass` here means that the queue will remain with the same job order.
Args:
reference_queue (Union[Queue, str]): The queue
"""
pass
def work(
self,
burst: bool = False,
logging_level: str = "INFO",
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT,
max_jobs=None,
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
with_scheduler: bool = False,
):
) -> bool:
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@ -637,6 +699,17 @@ class Worker:
queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed.
Args:
burst (bool, optional): Whether to work on burst mode. Defaults to False.
logging_level (str, optional): Logging level to use. Defaults to "INFO".
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns:
worked (bool): Will return True if any job was processed, False otherwise.
"""
setup_loghandlers(logging_level, date_format, log_format)
completed_jobs = 0
@ -723,16 +796,24 @@ class Worker:
return bool(completed_jobs)
def stop_scheduler(self):
"""Ensure scheduler process is stopped"""
"""Ensure scheduler process is stopped
Will send the kill signal to scheduler process,
if there's an OSError, just passes and `join()`'s the scheduler process,
waiting for the process to finish.
"""
if self.scheduler._process and self.scheduler._process.pid:
# Send the kill signal to scheduler process
try:
os.kill(self.scheduler._process.pid, signal.SIGTERM)
except OSError:
pass
self.scheduler._process.join()
def dequeue_job_and_maintain_ttl(self, timeout):
def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
result (Tuple[Job, Queue]): A tuple with the job and the queue.
"""
result = None
qnames = ','.join(self.queue_names())
@ -781,7 +862,7 @@ class Worker:
self.heartbeat()
return result
def heartbeat(self, timeout=None, pipeline: Optional['Pipeline'] = None):
def heartbeat(self, timeout: Optional[int] = None, pipeline: Optional['Pipeline'] = None):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
@ -791,6 +872,10 @@ class Worker:
If no timeout is given, the worker_ttl will be used to update
the expiration time of the worker.
Args:
timeout (Optional[int]): Timeout
pipeline (Optional[Redis]): A Redis pipeline
"""
timeout = timeout or self.worker_ttl + 60
connection = pipeline if pipeline is not None else self.connection
@ -801,6 +886,9 @@ class Worker:
)
def refresh(self):
"""Refreshes the worker data.
It will get the data from the datastore and update the Worker's attributes
"""
data = self.connection.hmget(
self.key,
'queues',
@ -868,18 +956,43 @@ class Worker:
]
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
"""Used to keep the worker stats up to date in Redis.
Increments the failed job count.
Args:
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1)
def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None):
"""Used to keep the worker stats up to date in Redis.
Increments the successful job count.
Args:
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'successful_job_count', 1)
def increment_total_working_time(self, job_execution_time, pipeline):
def increment_total_working_time(self, job_execution_time: timedelta, pipeline: 'Pipeline'):
"""Used to keep the worker stats up to date in Redis.
Increments the time the worker has been workig for (in seconds).
Args:
job_execution_time (timedelta): A timedelta object.
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds())
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job."""
"""Spawns a work horse to perform the actual work and passes it a job.
This is where the `fork()` actually happens.
Args:
job (Job): The Job that will be ran
queue (Queue): The queue
"""
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
@ -891,7 +1004,15 @@ class Worker:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def get_heartbeat_ttl(self, job: 'Job'):
def get_heartbeat_ttl(self, job: 'Job') -> Union[float, int]:
"""Get's the TTL for the next heartbeat.
Args:
job (Job): The Job
Returns:
int: The heartbeat TTL.
"""
if job.timeout and job.timeout > 0:
remaining_execution_time = job.timeout - self.current_job_working_time
return min(remaining_execution_time, self.job_monitoring_interval) + 60
@ -902,8 +1023,11 @@ class Worker:
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
"""
Args:
job (Job): _description_
queue (Queue): _description_
"""
ret_val = None
job.started_at = utcnow()
while True:
@ -998,11 +1122,14 @@ class Worker:
self.connection.delete(job.key)
def main_work_horse(self, job: 'Job', queue: 'Queue'):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
random.seed()
"""This is the entry point of the newly spawned work horse.
After fork()'ing, always assure we are generating random sequences
that are different from the worker.
os._exit() is the way to exit from childs after a fork(), in
contrast to the regular sys.exit()
"""
random.seed()
self.setup_work_horse_signals()
self._is_horse = True
self.log = logger
@ -1010,18 +1137,18 @@ class Worker:
self.perform_job(job, queue)
except: # noqa
os._exit(1)
# os._exit() is the way to exit from childs after a fork(), in
# contrast to the regular sys.exit()
os._exit(0)
def setup_work_horse_signals(self):
"""Setup signal handing for the newly spawned work horse."""
# Always ignore Ctrl+C in the work horse, as it might abort the
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
"""Setup signal handing for the newly spawned work horse
Always ignore Ctrl+C in the work horse, as it might abort the
currently running job.
The main worker catches the Ctrl+C and requests graceful shutdown
after the current work is done. When cold shutdown is requested, it
kills the current job anyway.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
@ -1102,6 +1229,22 @@ class Worker:
pass
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry):
"""Handles the successful execution of certain job.
It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`,
and run a few maintenance tasks including:
- Resting the current job ID
- Enqueue dependents
- Incrementing the job count and working time
- Handling of the job successful execution
Runs within a loop with the `watch` method so that protects interactions
with dependents keys.
Args:
job (Job): The job that was successful.
queue (Queue): The queue
started_job_registry (StartedJobRegistry): The started registry
"""
self.log.debug('Handling successful execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
@ -1137,26 +1280,42 @@ class Worker:
except redis.exceptions.WatchError:
continue
def execute_success_callback(self, job: 'Job', result):
"""Executes success_callback with timeout"""
def execute_success_callback(self, job: 'Job', result: Any):
"""Executes success_callback for a job.
with timeout .
Args:
job (Job): The Job
result (Any): The job's result.
"""
self.log.debug(f"Running success callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.success_callback(job, self.connection, result)
def execute_failure_callback(self, job):
"""Executes failure_callback with timeout"""
def execute_failure_callback(self, job: 'Job'):
"""Executes failure_callback with timeout
Args:
job (Job): The Job
"""
self.log.debug(f"Running failure callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection, *sys.exc_info())
def perform_job(self, job: 'Job', queue: 'Queue'):
def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
Args:
job (Job): The Job
queue (Queue): The Queue
Returns:
bool: True after finished.
"""
push_connection(self.connection)
started_job_registry = queue.started_job_registry
self.log.debug("Started Job Registry set.")
@ -1220,13 +1379,13 @@ class Worker:
return True
def handle_exception(self, job: 'Job', *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
"""Walks the exception handler stack to delegate exception handling.
If the job cannot be deserialized, it will raise when func_name or
the other properties are accessed, which will stop exceptions from
being properly logged, so we guard against it here.
"""
self.log.debug(f"Handling exception for {job.id}.")
exc_string = ''.join(traceback.format_exception(*exc_info))
# If the job cannot be deserialized, it will raise when func_name or
# the other properties are accessed, which will stop exceptions from
# being properly logged, so we guard against it here.
try:
extra = {
'func': job.func_name,
@ -1308,9 +1467,16 @@ class SimpleWorker(Worker):
self.perform_job(job, queue)
self.set_state(WorkerStatus.IDLE)
def get_heartbeat_ttl(self, job: 'Job'):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
# # We should just stick to DEFAULT_WORKER_TTL.
def get_heartbeat_ttl(self, job: 'Job') -> Union[float, int]:
"""-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
We should just stick to DEFAULT_WORKER_TTL.
Args:
job (Job): The Job
Returns:
ttl (float | int): TTL
"""
if job.timeout == -1:
return DEFAULT_WORKER_TTL
else:
@ -1324,9 +1490,7 @@ class HerokuWorker(Worker):
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later
"""
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
def setup_work_horse_signals(self):

Loading…
Cancel
Save