diff --git a/CHANGES.md b/CHANGES.md index abd2043..f57c58a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,7 @@ +### Unreleased + +- Added `max_jobs` to `Worker.work` and `--max-jobs` to `rq worker` CLI. + ### 1.0 (2019-04-06) Backward incompatible changes: diff --git a/docs/docs/workers.md b/docs/docs/workers.md index b84859e..77048e1 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -68,6 +68,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: * `--log-format`: Format for the worker logs, defaults to `'%(asctime)s %(message)s'` * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--disable-job-desc-logging`: Turn off job description logging. +* `--max-jobs`: Maximum number of jobs to execute. ## Inside the worker diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 7b22986..e621441 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -188,11 +188,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') +@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') @click.argument('queues', nargs=-1) @pass_cli_config def worker(cli_config, burst, logging_level, name, results_ttl, worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn, - exception_handler, pid, disable_default_exception_handler, queues, + exception_handler, pid, disable_default_exception_handler, max_jobs, queues, log_format, date_format, **options): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -237,7 +238,7 @@ def worker(cli_config, burst, logging_level, name, results_ttl, from rq.contrib.sentry import register_sentry register_sentry(sentry_dsn) - worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format) + worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format, max_jobs=max_jobs) except ConnectionError as e: print(e) sys.exit(1) diff --git a/rq/worker.py b/rq/worker.py index b5b141b..fcf32e9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -435,7 +435,7 @@ class Worker(object): self.set_state(before_state) def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT): + log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -446,7 +446,7 @@ class Worker(object): """ setup_loghandlers(logging_level, date_format, log_format) self._install_signal_handlers() - did_perform_work = False + completed_jobs = 0 self.register_birth() self.log.info("RQ worker %r started, version %s", self.key, VERSION) self.set_state(WorkerStatus.STARTED) @@ -477,7 +477,14 @@ class Worker(object): self.execute_job(job, queue) self.heartbeat() - did_perform_work = True + completed_jobs += 1 + if max_jobs is not None: + if completed_jobs >= max_jobs: + self.log.info( + "RQ Worker %r finished executing %d jobs, quitting", + self.key, completed_jobs + ) + break except StopRequested: break @@ -488,14 +495,14 @@ class Worker(object): except: # noqa self.log.error( - 'Worker %s: found an unhandled exception, quitting...', - self.name, exc_info=True + 'RQ Worker %r: found an unhandled exception, quitting...', + self.key, exc_info=True ) break finally: if not self.is_horse: self.register_death() - return did_perform_work + return bool(completed_jobs) def dequeue_job_and_maintain_ttl(self, timeout): result = None diff --git a/tests/test_worker.py b/tests/test_worker.py index 91fa93b..1c17a76 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -340,6 +340,17 @@ class TestWorker(RQTestCase): # total_working_time should be around 0.05 seconds self.assertTrue(0.05 <= worker.total_working_time < 0.06) + def test_max_jobs(self): + """Worker exits after number of jobs complete.""" + queue = Queue() + job1 = queue.enqueue(do_nothing) + job2 = queue.enqueue(do_nothing) + worker = Worker([queue]) + worker.work(max_jobs=1) + + self.assertEqual(JobStatus.FINISHED, job1.get_status()) + self.assertEqual(JobStatus.QUEUED, job2.get_status()) + def test_disable_default_exception_handler(self): """ Job is not moved to FailedJobRegistry when default custom exception @@ -616,7 +627,7 @@ class TestWorker(RQTestCase): # Updates worker statuses self.assertEqual(worker.get_state(), 'busy') self.assertEqual(worker.get_current_job_id(), job.id) - + def test_prepare_job_execution_inf_timeout(self): """Prepare job execution handles infinite job timeout""" queue = Queue(connection=self.testconn) @@ -632,7 +643,7 @@ class TestWorker(RQTestCase): # Score in queue is +inf self.assertEqual(self.testconn.zscore(registry.key, job.id), float('Inf')) - + def test_work_unicode_friendly(self): """Worker processes work with unicode description, then quits."""