diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 22cc1bd..4d38bb2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.5, 3.6, 3.7, 3.8] + python-version: [3.5, 3.6, 3.7, 3.8.3] redis-version: [3, 4, 5, 6] redis-py-version: [3.5.0] diff --git a/rq/registry.py b/rq/registry.py index 9cb410c..579d0c5 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -37,7 +37,10 @@ class BaseRegistry(object): return self.count def __eq__(self, other): - return (self.name == other.name and self.connection == other.connection) + return ( + self.name == other.name and + self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs + ) def __contains__(self, item): """ diff --git a/rq/scheduler.py b/rq/scheduler.py index cc1b999..4d2312f 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -13,6 +13,8 @@ from .registry import ScheduledJobRegistry from .utils import current_timestamp, enum from .logutils import setup_loghandlers +from redis import Redis + SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s' @@ -43,11 +45,19 @@ class RQScheduler(object): self._acquired_locks = set() self._scheduled_job_registries = [] self.lock_acquisition_time = None - self.connection = connection + self._connection_kwargs = connection.connection_pool.connection_kwargs + self._connection = None self.interval = interval self._stop_requested = False self._status = self.Status.STOPPED self._process = None + + @property + def connection(self): + if self._connection: + return self._connection + self._connection = Redis(**self._connection_kwargs) + return Redis(**self._connection_kwargs) @property def acquired_locks(self): @@ -59,12 +69,12 @@ class RQScheduler(object): @property def should_reacquire_locks(self): - """Returns True if lock_acquisition_time is longer than 15 minutes ago""" + """Returns True if lock_acquisition_time is longer than 10 minutes ago""" if self._queue_names == self.acquired_locks: return False if not self.lock_acquisition_time: return True - return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900 + return (datetime.now() - self.lock_acquisition_time).total_seconds() > 600 def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" @@ -74,9 +84,10 @@ class RQScheduler(object): for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5): successful_locks.add(name) - self._acquired_locks = self._acquired_locks.union(successful_locks) - if self._acquired_locks: - self.prepare_registries(self._acquired_locks) + + # Always reset _scheduled_job_registries when acquiring locks + self._scheduled_job_registries = [] + self._acquired_locks = self._acquired_locks.union(successful_locks) self.lock_acquisition_time = datetime.now() @@ -88,9 +99,11 @@ class RQScheduler(object): return successful_locks - def prepare_registries(self, queue_names): + def prepare_registries(self, queue_names=None): """Prepare scheduled job registries for use""" self._scheduled_job_registries = [] + if not queue_names: + queue_names = self._acquired_locks for name in queue_names: self._scheduled_job_registries.append( ScheduledJobRegistry(name, connection=self.connection) @@ -104,6 +117,10 @@ class RQScheduler(object): def enqueue_scheduled_jobs(self): """Enqueue jobs whose timestamp is in the past""" self._status = self.Status.WORKING + + if not self._scheduled_job_registries and self._acquired_locks: + self.prepare_registries() + for registry in self._scheduled_job_registries: timestamp = current_timestamp() @@ -158,12 +175,16 @@ class RQScheduler(object): def start(self): self._status = self.Status.STARTED + # Redis instance can't be pickled across processes so we need to + # clean this up before forking + self._connection = None self._process = Process(target=run, args=(self,), name='Scheduler') self._process.start() return self._process def work(self): self._install_signal_handlers() + while True: if self._stop_requested: self.stop() diff --git a/rq/worker.py b/rq/worker.py index 7cfa130..87d405f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1006,10 +1006,10 @@ class Worker(object): @property def should_run_maintenance_tasks(self): - """Maintenance tasks should run on first startup or 15 minutes.""" + """Maintenance tasks should run on first startup or every 10 minutes.""" if self.last_cleaned_at is None: return True - if (utcnow() - self.last_cleaned_at) > timedelta(minutes=15): + if (utcnow() - self.last_cleaned_at) > timedelta(minutes=10): return True return False