add the ability to have the worker stop executing after a max amount of jobs (#1094)

* add the ability to have the worker stop executing after a max amount of jobs

* rename to max-jobs

* updated logging messages
main
Paul Robertson 6 years ago committed by Selwin Ong
parent 2baef02dbd
commit e1c135d4de

@ -1,3 +1,7 @@
### Unreleased
- Added `max_jobs` to `Worker.work` and `--max-jobs` to `rq worker` CLI.
### 1.0 (2019-04-06) ### 1.0 (2019-04-06)
Backward incompatible changes: Backward incompatible changes:

@ -68,6 +68,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--log-format`: Format for the worker logs, defaults to `'%(asctime)s %(message)s'` * `--log-format`: Format for the worker logs, defaults to `'%(asctime)s %(message)s'`
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging. * `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute.
## Inside the worker ## Inside the worker

@ -188,11 +188,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl, def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn, worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn,
exception_handler, pid, disable_default_exception_handler, queues, exception_handler, pid, disable_default_exception_handler, max_jobs, queues,
log_format, date_format, **options): log_format, date_format, **options):
"""Starts an RQ worker.""" """Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {} settings = read_config_file(cli_config.config) if cli_config.config else {}
@ -237,7 +238,7 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
from rq.contrib.sentry import register_sentry from rq.contrib.sentry import register_sentry
register_sentry(sentry_dsn) register_sentry(sentry_dsn)
worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format) worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format, max_jobs=max_jobs)
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)

@ -435,7 +435,7 @@ class Worker(object):
self.set_state(before_state) self.set_state(before_state)
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT): log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None):
"""Starts the work loop. """Starts the work loop.
Pops and performs all jobs on the current list of queues. When all Pops and performs all jobs on the current list of queues. When all
@ -446,7 +446,7 @@ class Worker(object):
""" """
setup_loghandlers(logging_level, date_format, log_format) setup_loghandlers(logging_level, date_format, log_format)
self._install_signal_handlers() self._install_signal_handlers()
did_perform_work = False completed_jobs = 0
self.register_birth() self.register_birth()
self.log.info("RQ worker %r started, version %s", self.key, VERSION) self.log.info("RQ worker %r started, version %s", self.key, VERSION)
self.set_state(WorkerStatus.STARTED) self.set_state(WorkerStatus.STARTED)
@ -477,7 +477,14 @@ class Worker(object):
self.execute_job(job, queue) self.execute_job(job, queue)
self.heartbeat() self.heartbeat()
did_perform_work = True completed_jobs += 1
if max_jobs is not None:
if completed_jobs >= max_jobs:
self.log.info(
"RQ Worker %r finished executing %d jobs, quitting",
self.key, completed_jobs
)
break
except StopRequested: except StopRequested:
break break
@ -488,14 +495,14 @@ class Worker(object):
except: # noqa except: # noqa
self.log.error( self.log.error(
'Worker %s: found an unhandled exception, quitting...', 'RQ Worker %r: found an unhandled exception, quitting...',
self.name, exc_info=True self.key, exc_info=True
) )
break break
finally: finally:
if not self.is_horse: if not self.is_horse:
self.register_death() self.register_death()
return did_perform_work return bool(completed_jobs)
def dequeue_job_and_maintain_ttl(self, timeout): def dequeue_job_and_maintain_ttl(self, timeout):
result = None result = None

@ -340,6 +340,17 @@ class TestWorker(RQTestCase):
# total_working_time should be around 0.05 seconds # total_working_time should be around 0.05 seconds
self.assertTrue(0.05 <= worker.total_working_time < 0.06) self.assertTrue(0.05 <= worker.total_working_time < 0.06)
def test_max_jobs(self):
"""Worker exits after number of jobs complete."""
queue = Queue()
job1 = queue.enqueue(do_nothing)
job2 = queue.enqueue(do_nothing)
worker = Worker([queue])
worker.work(max_jobs=1)
self.assertEqual(JobStatus.FINISHED, job1.get_status())
self.assertEqual(JobStatus.QUEUED, job2.get_status())
def test_disable_default_exception_handler(self): def test_disable_default_exception_handler(self):
""" """
Job is not moved to FailedJobRegistry when default custom exception Job is not moved to FailedJobRegistry when default custom exception
@ -616,7 +627,7 @@ class TestWorker(RQTestCase):
# Updates worker statuses # Updates worker statuses
self.assertEqual(worker.get_state(), 'busy') self.assertEqual(worker.get_state(), 'busy')
self.assertEqual(worker.get_current_job_id(), job.id) self.assertEqual(worker.get_current_job_id(), job.id)
def test_prepare_job_execution_inf_timeout(self): def test_prepare_job_execution_inf_timeout(self):
"""Prepare job execution handles infinite job timeout""" """Prepare job execution handles infinite job timeout"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
@ -632,7 +643,7 @@ class TestWorker(RQTestCase):
# Score in queue is +inf # Score in queue is +inf
self.assertEqual(self.testconn.zscore(registry.key, job.id), float('Inf')) self.assertEqual(self.testconn.zscore(registry.key, job.id), float('Inf'))
def test_work_unicode_friendly(self): def test_work_unicode_friendly(self):
"""Worker processes work with unicode description, then quits.""" """Worker processes work with unicode description, then quits."""

Loading…
Cancel
Save