diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py new file mode 100644 index 0000000..2a20b9f --- /dev/null +++ b/rq/contrib/legacy.py @@ -0,0 +1,24 @@ +import logging +from rq import get_current_connection +from rq import Worker + + +logger = logging.getLogger(__name__) + + +def cleanup_ghosts(): + """ + RQ versions < 0.3.6 suffered from a race condition where workers, when + abruptly terminated, did not have a chance to clean up their worker + registration, leading to reports of ghosted workers in `rqinfo`. Since + 0.3.6, new worker registrations automatically expire, and the worker will + make sure to refresh the registrations as long as it's alive. + + This function will clean up any of such legacy ghosted workers. + """ + conn = get_current_connection() + for worker in Worker.all(): + if conn._ttl(worker.key) == -1: + ttl = worker.default_worker_ttl + conn.expire(worker.key, ttl) + logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl)) diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index 28f3921..61ecbb0 100755 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -7,6 +7,7 @@ import logging.config from rq import Queue, Worker from rq.logutils import setup_loghandlers from redis.exceptions import ConnectionError +from rq.contrib.legacy import cleanup_ghosts from rq.scripts import add_standard_arguments from rq.scripts import setup_redis from rq.scripts import read_config_file @@ -48,6 +49,8 @@ def main(): setup_loghandlers(args.verbose) setup_redis(args) + cleanup_ghosts() + try: queues = map(Queue, args.queues) w = Worker(queues, name=args.name)