Add to worker redis record scheduler info (#1787)

* add scheduler_pid property to queue

* Update return type

* Reformat code
main
Daniel M 2 years ago committed by GitHub
parent a02ad29cef
commit 3d840a79ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,11 +3,11 @@ import sys
import traceback
import uuid
import warnings
from collections import namedtuple
from datetime import datetime, timezone, timedelta
from functools import total_ordering
from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union
from redis import WatchError
if TYPE_CHECKING:
@ -24,7 +24,6 @@ from .types import FunctionReferenceType, JobDependencyType
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
@ -69,7 +68,7 @@ class Queue:
@classmethod
def all(
cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
) -> List['Queue']:
"""Returns an iterable of all Queues.
@ -94,11 +93,11 @@ class Queue:
@classmethod
def from_queue_key(
cls,
queue_key: str,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
cls,
queue_key: str,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@ -119,18 +118,18 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix) :]
name = queue_key[len(prefix):]
return cls(name, connection=connection, job_class=job_class, serializer=serializer)
def __init__(
self,
name: str = 'default',
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
**kwargs,
self,
name: str = 'default',
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
**kwargs,
):
"""Initializes a Queue object.
@ -196,6 +195,12 @@ class Queue:
"""Redis key used to indicate this queue has been cleaned."""
return 'rq:clean_registries:%s' % self.name
@property
def scheduler_pid(self) -> int:
from rq.scheduler import RQScheduler
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
def acquire_cleaning_lock(self) -> bool:
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
@ -453,23 +458,23 @@ class Queue:
self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
def create_job(
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
meta: Optional[Dict] = None,
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
meta: Optional[Dict] = None,
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given
@ -595,23 +600,23 @@ class Queue:
return job
def enqueue_call(
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
pipeline: Optional['Pipeline'] = None,
self,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@ -667,20 +672,20 @@ class Queue:
@staticmethod
def prepare_data(
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
func: 'FunctionReferenceType',
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@ -771,7 +776,7 @@ class Queue:
Job: _description_
"""
job.perform()
result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL)
result_ttl = job.get_result_ttl(default_ttl=DEFAULT_RESULT_TTL)
with self.connection.pipeline() as pipeline:
job._handle_success(result_ttl=result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
@ -1043,7 +1048,7 @@ class Queue:
return job
def enqueue_dependents(
self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it.
@ -1081,7 +1086,7 @@ class Queue:
dependent_job_ids, connection=self.connection, serializer=self.serializer
)
if dependent_job
and dependent_job.dependencies_are_met(
and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@ -1181,12 +1186,12 @@ class Queue:
@classmethod
def dequeue_any(
cls,
queues: List['Queue'],
timeout: int,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
cls,
queues: List['Queue'],
timeout: int,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.

@ -35,14 +35,14 @@ class RQScheduler:
Status = SchedulerStatus
def __init__(
self,
queues,
connection,
interval=1,
logging_level=logging.INFO,
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT,
serializer=None,
self,
queues,
connection,
interval=1,
logging_level=logging.INFO,
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT,
serializer=None,
):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()

@ -1,9 +1,10 @@
import os
from datetime import datetime, timedelta, timezone
from multiprocessing import Process
from unittest import mock
from rq import Queue
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from rq.exceptions import NoSuchJobError
from rq.job import Job, Retry
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
@ -11,10 +12,7 @@ from rq.scheduler import RQScheduler
from rq.serializers import JSONSerializer
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, find_empty_redis_database, ssl_test
from .fixtures import kill_worker, say_hello
@ -140,7 +138,7 @@ class TestScheduler(RQTestCase):
# scheduler.should_reacquire_locks always returns False if
# scheduler.acquired_locks and scheduler._queue_names are the same
self.assertFalse(scheduler.should_reacquire_locks)
scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL+6)
scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 6)
self.assertFalse(scheduler.should_reacquire_locks)
scheduler._queue_names = set(['default', 'foo'])
@ -196,6 +194,12 @@ class TestScheduler(RQTestCase):
self.assertEqual(mocked.call_count, 1)
self.assertEqual(stopped_process.is_alive.call_count, 1)
def test_queue_scheduler_pid(self):
queue = Queue(connection=self.testconn)
scheduler = RQScheduler([queue, ], connection=self.testconn)
scheduler.acquire_locks()
assert queue.scheduler_pid == os.getpid()
def test_heartbeat(self):
"""Test that heartbeat updates locking keys TTL"""
name_1 = 'lock-test-1'

Loading…
Cancel
Save