|
|
|
@ -13,6 +13,8 @@ from .registry import ScheduledJobRegistry
|
|
|
|
|
from .utils import current_timestamp, enum
|
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
|
|
|
|
|
|
from redis import Redis
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
|
|
|
|
|
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
|
|
|
|
@ -43,11 +45,19 @@ class RQScheduler(object):
|
|
|
|
|
self._acquired_locks = set()
|
|
|
|
|
self._scheduled_job_registries = []
|
|
|
|
|
self.lock_acquisition_time = None
|
|
|
|
|
self.connection = connection
|
|
|
|
|
self._connection_kwargs = connection.connection_pool.connection_kwargs
|
|
|
|
|
self._connection = None
|
|
|
|
|
self.interval = interval
|
|
|
|
|
self._stop_requested = False
|
|
|
|
|
self._status = self.Status.STOPPED
|
|
|
|
|
self._process = None
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def connection(self):
|
|
|
|
|
if self._connection:
|
|
|
|
|
return self._connection
|
|
|
|
|
self._connection = Redis(**self._connection_kwargs)
|
|
|
|
|
return Redis(**self._connection_kwargs)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def acquired_locks(self):
|
|
|
|
@ -59,12 +69,12 @@ class RQScheduler(object):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def should_reacquire_locks(self):
|
|
|
|
|
"""Returns True if lock_acquisition_time is longer than 15 minutes ago"""
|
|
|
|
|
"""Returns True if lock_acquisition_time is longer than 10 minutes ago"""
|
|
|
|
|
if self._queue_names == self.acquired_locks:
|
|
|
|
|
return False
|
|
|
|
|
if not self.lock_acquisition_time:
|
|
|
|
|
return True
|
|
|
|
|
return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900
|
|
|
|
|
return (datetime.now() - self.lock_acquisition_time).total_seconds() > 600
|
|
|
|
|
|
|
|
|
|
def acquire_locks(self, auto_start=False):
|
|
|
|
|
"""Returns names of queue it successfully acquires lock on"""
|
|
|
|
@ -74,9 +84,10 @@ class RQScheduler(object):
|
|
|
|
|
for name in self._queue_names:
|
|
|
|
|
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
|
|
|
|
|
successful_locks.add(name)
|
|
|
|
|
self._acquired_locks = self._acquired_locks.union(successful_locks)
|
|
|
|
|
if self._acquired_locks:
|
|
|
|
|
self.prepare_registries(self._acquired_locks)
|
|
|
|
|
|
|
|
|
|
# Always reset _scheduled_job_registries when acquiring locks
|
|
|
|
|
self._scheduled_job_registries = []
|
|
|
|
|
self._acquired_locks = self._acquired_locks.union(successful_locks)
|
|
|
|
|
|
|
|
|
|
self.lock_acquisition_time = datetime.now()
|
|
|
|
|
|
|
|
|
@ -88,9 +99,11 @@ class RQScheduler(object):
|
|
|
|
|
|
|
|
|
|
return successful_locks
|
|
|
|
|
|
|
|
|
|
def prepare_registries(self, queue_names):
|
|
|
|
|
def prepare_registries(self, queue_names=None):
|
|
|
|
|
"""Prepare scheduled job registries for use"""
|
|
|
|
|
self._scheduled_job_registries = []
|
|
|
|
|
if not queue_names:
|
|
|
|
|
queue_names = self._acquired_locks
|
|
|
|
|
for name in queue_names:
|
|
|
|
|
self._scheduled_job_registries.append(
|
|
|
|
|
ScheduledJobRegistry(name, connection=self.connection)
|
|
|
|
@ -104,6 +117,10 @@ class RQScheduler(object):
|
|
|
|
|
def enqueue_scheduled_jobs(self):
|
|
|
|
|
"""Enqueue jobs whose timestamp is in the past"""
|
|
|
|
|
self._status = self.Status.WORKING
|
|
|
|
|
|
|
|
|
|
if not self._scheduled_job_registries and self._acquired_locks:
|
|
|
|
|
self.prepare_registries()
|
|
|
|
|
|
|
|
|
|
for registry in self._scheduled_job_registries:
|
|
|
|
|
timestamp = current_timestamp()
|
|
|
|
|
|
|
|
|
@ -158,12 +175,16 @@ class RQScheduler(object):
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
self._status = self.Status.STARTED
|
|
|
|
|
# Redis instance can't be pickled across processes so we need to
|
|
|
|
|
# clean this up before forking
|
|
|
|
|
self._connection = None
|
|
|
|
|
self._process = Process(target=run, args=(self,), name='Scheduler')
|
|
|
|
|
self._process.start()
|
|
|
|
|
return self._process
|
|
|
|
|
|
|
|
|
|
def work(self):
|
|
|
|
|
self._install_signal_handlers()
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
if self._stop_requested:
|
|
|
|
|
self.stop()
|
|
|
|
|