diff --git a/rq/worker.py b/rq/worker.py index 3dd7bd6..a324ad6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,7 +13,7 @@ from datetime import timedelta from enum import Enum from uuid import uuid4 from random import shuffle -from typing import Callable, List, Optional, TYPE_CHECKING, Type +from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union if TYPE_CHECKING: from redis import Redis @@ -155,15 +155,15 @@ class Worker: return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod - def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): - """Returns the number of workers by queue or connection + def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int: + """Returns the number of workers by queue or connection. Args: - connection (Optional['Redis'], optional): _description_. Defaults to None. - queue (Optional['Queue'], optional): _description_. Defaults to None. + connection (Optional[Redis], optional): Redis connection. Defaults to None. + queue (Optional[Queue], optional): The queue to use. Defaults to None. Returns: - _type_: _description_ + length (int): The queue length. """ return len(get_keys(queue=queue, connection=connection)) @@ -172,13 +172,26 @@ class Worker: cls, worker_key: str, connection: Optional['Redis'] = None, - job_class: Type['Job'] = None, - queue_class: Type['Queue'] = None, + job_class: Optional[Type['Job']] = None, + queue_class: Optional[Type['Queue']] = None, serializer=None, - ): + ) -> 'Worker': """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. + + Args: + worker_key (str): The worker key + connection (Optional[Redis], optional): Redis connection. Defaults to None. + job_class (Optional[Type[Job]], optional): The job class if custom class is being used. Defaults to None. + queue_class (Optional[Type[Queue]], optional): The queue class if a custom class is being used. Defaults to None. + serializer (Any, optional): The serializer to use. Defaults to None. + + Raises: + ValueError: If the key doesn't start with `rq:worker:`, the default worker namespace prefix. + + Returns: + worker (Worker): The Worker instance. """ prefix = cls.redis_worker_namespace_prefix if not worker_key.startswith(prefix): @@ -202,7 +215,6 @@ class Worker: ) worker.refresh() - return worker def __init__( @@ -262,7 +274,7 @@ class Worker: self.successful_job_count: int = 0 self.failed_job_count: int = 0 self.total_working_time: int = 0 - self.current_job_working_time: int = 0 + self.current_job_working_time: float = 0 self.birth_date = None self.scheduler: Optional[RQScheduler] = None self.pubsub = None @@ -324,12 +336,20 @@ class Worker: if not isinstance(queue, self.queue_class): raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class)) - def queue_names(self): - """Returns the queue names of this worker's queues.""" + def queue_names(self) -> List[str]: + """Returns the queue names of this worker's queues. + + Returns: + List[str]: The queue names. + """ return [queue.name for queue in self.queues] - def queue_keys(self): - """Returns the Redis keys representing this worker's queues.""" + def queue_keys(self) -> List[str]: + """Returns the Redis keys representing this worker's queues. + + Returns: + List[str]: The list of strings with queues keys + """ return [queue.key for queue in self.queues] @property @@ -423,13 +443,6 @@ class Worker: """Sets the date on which the worker received a (warm) shutdown request""" self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow())) - # @property - # def birth_date(self): - # """Fetches birth date from Redis.""" - # birth_timestamp = self.connection.hget(self.key, 'birth') - # if birth_timestamp is not None: - # return utcparse(as_text(birth_timestamp)) - @property def shutdown_requested_date(self): """Fetches shutdown_requested_date from Redis.""" @@ -444,7 +457,13 @@ class Worker: if death_timestamp is not None: return utcparse(as_text(death_timestamp)) - def set_state(self, state, pipeline: Optional['Pipeline'] = None): + def set_state(self, state: str, pipeline: Optional['Pipeline'] = None): + """Sets the worker's state. + + Args: + state (str): The state + pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None. + """ self._state = state connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'state', state) @@ -454,7 +473,7 @@ class Worker: warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning) self.set_state(state) - def get_state(self): + def get_state(self) -> str: return self._state def _get_state(self): @@ -464,42 +483,65 @@ class Worker: state = property(_get_state, _set_state) - def set_current_job_working_time(self, current_job_working_time, pipeline: Optional['Pipeline'] = None): + def set_current_job_working_time(self, current_job_working_time: float, pipeline: Optional['Pipeline'] = None): + """Sets the current job working time in seconds + + Args: + current_job_working_time (float): The current job working time in seconds + pipeline (Optional[Pipeline], optional): Pipeline to use. Defaults to None. + """ self.current_job_working_time = current_job_working_time connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'current_job_working_time', current_job_working_time) def set_current_job_id(self, job_id: Optional[str] = None, pipeline: Optional['Pipeline'] = None): - connection = pipeline if pipeline is not None else self.connection + """Sets the current job id. + If `None` is used it will delete the current job key. + Args: + job_id (Optional[str], optional): The job id. Defaults to None. + pipeline (Optional[Pipeline], optional): The pipeline to use. Defaults to None. + """ + connection = pipeline if pipeline is not None else self.connection if job_id is None: connection.hdel(self.key, 'current_job') else: connection.hset(self.key, 'current_job', job_id) - def get_current_job_id(self, pipeline: Optional['Pipeline'] = None): + def get_current_job_id(self, pipeline: Optional['Pipeline'] = None) -> Optional[str]: + """Retrieves the current job id. + + Args: + pipeline (Optional['Pipeline'], optional): The pipeline to use. Defaults to None. + + Returns: + job_id (Optional[str): The job id + """ connection = pipeline if pipeline is not None else self.connection return as_text(connection.hget(self.key, 'current_job')) - def get_current_job(self): - """Returns the job id of the currently executing job.""" - job_id = self.get_current_job_id() + def get_current_job(self) -> Optional['Job']: + """Returns the currently executing job instance. + Returns: + job (Job): The job instance. + """ + job_id = self.get_current_job_id() if job_id is None: return None - return self.job_class.fetch(job_id, self.connection, self.serializer) def _install_signal_handlers(self): - """Installs signal handlers for handling SIGINT and SIGTERM - gracefully. + """Installs signal handlers for handling SIGINT and SIGTERM gracefully. """ signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGTERM, self.request_stop) - def kill_horse(self, sig=SIGKILL): - """ - Kill the horse but catch "No such process" error has the horse could already be dead. + def kill_horse(self, sig: signal.Signals = SIGKILL): + """Kill the horse but catch "No such process" error has the horse could already be dead. + + Args: + sig (signal.Signals, optional): _description_. Defaults to SIGKILL. """ try: os.killpg(os.getpgid(self.horse_pid), sig) @@ -511,9 +553,9 @@ class Worker: else: raise - def wait_for_horse(self): - """ - A waiting the end of the horse process and recycling resources. + def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]: + """Waits for the horse process to complete. + Uses `0` as argument as to include "any child in the process group of the current process". """ pid = None stat = None @@ -525,7 +567,15 @@ class Worker: return pid, stat def request_force_stop(self, signum, frame): - """Terminates the application (cold shutdown).""" + """Terminates the application (cold shutdown). + + Args: + signum (Any): Signum + frame (Any): Frame + + Raises: + SystemExit: SystemExit + """ self.log.warning('Cold shut down') # Take down the horse with the worker @@ -538,6 +588,10 @@ class Worker: def request_stop(self, signum, frame): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). + + Args: + signum (Any): Signum + frame (Any): Frame """ self.log.debug('Got signal %s', signal_name(signum)) @@ -566,8 +620,9 @@ class Worker: def handle_warm_shutdown_request(self): self.log.info('Warm shut down requested') - def check_for_suspension(self, burst): - """Check to see if workers have been suspended by `rq suspend`""" + def check_for_suspension(self, burst: bool): + """Check to see if workers have been suspended by `rq suspend` + """ before_state = None notified = False @@ -595,8 +650,9 @@ class Worker: on first run since worker.work() already calls `scheduler.enqueue_scheduled_jobs()` on startup. 2. Cleaning registries + + No need to try to start scheduler on first run """ - # No need to try to start scheduler on first run if self.last_cleaned_at: if self.scheduler and (not self.scheduler._process or not self.scheduler._process.is_alive()): self.scheduler.acquire_locks(auto_start=True) @@ -619,17 +675,23 @@ class Worker: self.pubsub.close() def reorder_queues(self, reference_queue): + """Method placeholder to workers that implement some reordering strategy. + `pass` here means that the queue will remain with the same job order. + + Args: + reference_queue (Union[Queue, str]): The queue + """ pass def work( self, burst: bool = False, logging_level: str = "INFO", - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, - max_jobs=None, + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + max_jobs: Optional[int] = None, with_scheduler: bool = False, - ): + ) -> bool: """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -637,6 +699,17 @@ class Worker: queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. + + Args: + burst (bool, optional): Whether to work on burst mode. Defaults to False. + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. + with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. + + Returns: + worked (bool): Will return True if any job was processed, False otherwise. """ setup_loghandlers(logging_level, date_format, log_format) completed_jobs = 0 @@ -723,16 +796,24 @@ class Worker: return bool(completed_jobs) def stop_scheduler(self): - """Ensure scheduler process is stopped""" + """Ensure scheduler process is stopped + Will send the kill signal to scheduler process, + if there's an OSError, just passes and `join()`'s the scheduler process, + waiting for the process to finish. + """ if self.scheduler._process and self.scheduler._process.pid: - # Send the kill signal to scheduler process try: os.kill(self.scheduler._process.pid, signal.SIGTERM) except OSError: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout): + def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: + """Dequeues a job while maintaining the TTL. + + Returns: + result (Tuple[Job, Queue]): A tuple with the job and the queue. + """ result = None qnames = ','.join(self.queue_names()) @@ -781,7 +862,7 @@ class Worker: self.heartbeat() return result - def heartbeat(self, timeout=None, pipeline: Optional['Pipeline'] = None): + def heartbeat(self, timeout: Optional[int] = None, pipeline: Optional['Pipeline'] = None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -791,6 +872,10 @@ class Worker: If no timeout is given, the worker_ttl will be used to update the expiration time of the worker. + + Args: + timeout (Optional[int]): Timeout + pipeline (Optional[Redis]): A Redis pipeline """ timeout = timeout or self.worker_ttl + 60 connection = pipeline if pipeline is not None else self.connection @@ -801,6 +886,9 @@ class Worker: ) def refresh(self): + """Refreshes the worker data. + It will get the data from the datastore and update the Worker's attributes + """ data = self.connection.hmget( self.key, 'queues', @@ -868,18 +956,43 @@ class Worker: ] def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): + """Used to keep the worker stats up to date in Redis. + Increments the failed job count. + + Args: + pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. + """ connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'failed_job_count', 1) def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None): + """Used to keep the worker stats up to date in Redis. + Increments the successful job count. + + Args: + pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. + """ connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'successful_job_count', 1) - def increment_total_working_time(self, job_execution_time, pipeline): + def increment_total_working_time(self, job_execution_time: timedelta, pipeline: 'Pipeline'): + """Used to keep the worker stats up to date in Redis. + Increments the time the worker has been workig for (in seconds). + + Args: + job_execution_time (timedelta): A timedelta object. + pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. + """ pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) def fork_work_horse(self, job: 'Job', queue: 'Queue'): - """Spawns a work horse to perform the actual work and passes it a job.""" + """Spawns a work horse to perform the actual work and passes it a job. + This is where the `fork()` actually happens. + + Args: + job (Job): The Job that will be ran + queue (Queue): The queue + """ child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -891,7 +1004,15 @@ class Worker: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) - def get_heartbeat_ttl(self, job: 'Job'): + def get_heartbeat_ttl(self, job: 'Job') -> Union[float, int]: + """Get's the TTL for the next heartbeat. + + Args: + job (Job): The Job + + Returns: + int: The heartbeat TTL. + """ if job.timeout and job.timeout > 0: remaining_execution_time = job.timeout - self.current_job_working_time return min(remaining_execution_time, self.job_monitoring_interval) + 60 @@ -902,8 +1023,11 @@ class Worker: """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to failed - """ + Args: + job (Job): _description_ + queue (Queue): _description_ + """ ret_val = None job.started_at = utcnow() while True: @@ -998,11 +1122,14 @@ class Worker: self.connection.delete(job.key) def main_work_horse(self, job: 'Job', queue: 'Queue'): - """This is the entry point of the newly spawned work horse.""" - # After fork()'ing, always assure we are generating random sequences - # that are different from the worker. - random.seed() + """This is the entry point of the newly spawned work horse. + After fork()'ing, always assure we are generating random sequences + that are different from the worker. + os._exit() is the way to exit from childs after a fork(), in + contrast to the regular sys.exit() + """ + random.seed() self.setup_work_horse_signals() self._is_horse = True self.log = logger @@ -1010,18 +1137,18 @@ class Worker: self.perform_job(job, queue) except: # noqa os._exit(1) - - # os._exit() is the way to exit from childs after a fork(), in - # contrast to the regular sys.exit() os._exit(0) def setup_work_horse_signals(self): - """Setup signal handing for the newly spawned work horse.""" - # Always ignore Ctrl+C in the work horse, as it might abort the - # currently running job. - # The main worker catches the Ctrl+C and requests graceful shutdown - # after the current work is done. When cold shutdown is requested, it - # kills the current job anyway. + """Setup signal handing for the newly spawned work horse + + Always ignore Ctrl+C in the work horse, as it might abort the + currently running job. + + The main worker catches the Ctrl+C and requests graceful shutdown + after the current work is done. When cold shutdown is requested, it + kills the current job anyway. + """ signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -1102,6 +1229,22 @@ class Worker: pass def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): + """Handles the successful execution of certain job. + It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`, + and run a few maintenance tasks including: + - Resting the current job ID + - Enqueue dependents + - Incrementing the job count and working time + - Handling of the job successful execution + + Runs within a loop with the `watch` method so that protects interactions + with dependents keys. + + Args: + job (Job): The job that was successful. + queue (Queue): The queue + started_job_registry (StartedJobRegistry): The started registry + """ self.log.debug('Handling successful execution of job %s', job.id) with self.connection.pipeline() as pipeline: @@ -1137,26 +1280,42 @@ class Worker: except redis.exceptions.WatchError: continue - def execute_success_callback(self, job: 'Job', result): - """Executes success_callback with timeout""" + def execute_success_callback(self, job: 'Job', result: Any): + """Executes success_callback for a job. + with timeout . + + Args: + job (Job): The Job + result (Any): The job's result. + """ self.log.debug(f"Running success callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.success_callback(job, self.connection, result) - def execute_failure_callback(self, job): - """Executes failure_callback with timeout""" + def execute_failure_callback(self, job: 'Job'): + """Executes failure_callback with timeout + + Args: + job (Job): The Job + """ self.log.debug(f"Running failure callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.failure_callback(job, self.connection, *sys.exc_info()) - def perform_job(self, job: 'Job', queue: 'Queue'): + def perform_job(self, job: 'Job', queue: 'Queue') -> bool: """Performs the actual work of a job. Will/should only be called inside the work horse's process. + + Args: + job (Job): The Job + queue (Queue): The Queue + + Returns: + bool: True after finished. """ push_connection(self.connection) - started_job_registry = queue.started_job_registry self.log.debug("Started Job Registry set.") @@ -1220,13 +1379,13 @@ class Worker: return True def handle_exception(self, job: 'Job', *exc_info): - """Walks the exception handler stack to delegate exception handling.""" + """Walks the exception handler stack to delegate exception handling. + If the job cannot be deserialized, it will raise when func_name or + the other properties are accessed, which will stop exceptions from + being properly logged, so we guard against it here. + """ self.log.debug(f"Handling exception for {job.id}.") exc_string = ''.join(traceback.format_exception(*exc_info)) - - # If the job cannot be deserialized, it will raise when func_name or - # the other properties are accessed, which will stop exceptions from - # being properly logged, so we guard against it here. try: extra = { 'func': job.func_name, @@ -1308,9 +1467,16 @@ class SimpleWorker(Worker): self.perform_job(job, queue) self.set_state(WorkerStatus.IDLE) - def get_heartbeat_ttl(self, job: 'Job'): - # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. - # # We should just stick to DEFAULT_WORKER_TTL. + def get_heartbeat_ttl(self, job: 'Job') -> Union[float, int]: + """-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. + We should just stick to DEFAULT_WORKER_TTL. + + Args: + job (Job): The Job + + Returns: + ttl (float | int): TTL + """ if job.timeout == -1: return DEFAULT_WORKER_TTL else: @@ -1324,9 +1490,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ - imminent_shutdown_delay = 6 - frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] def setup_work_horse_signals(self):