Add a "stopped" JobStatus and the machinery to properly handle it (#1394)

* Add a "stopped" JobStatus and the machinery to properly handle it

fixes #1389

* Apply requested changes
main
Daniel Alley 4 years ago committed by GitHub
parent 14ca7881e4
commit fc7940c77b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -99,7 +99,7 @@ print('Status: %s' % job.get_status())
```
Some interesting job attributes include:
* `job.get_status()` Possible values are `queued`, `started`, `deferred`, `finished`, and `failed`
* `job.get_status()` Possible values are `queued`, `started`, `deferred`, `finished`, `stopped`, and `failed`
* `job.origin` queue name of this job
* `job.func_name`
* `job.args` arguments passed to the underlying job function
@ -136,6 +136,8 @@ redis = Redis()
send_stop_job_command(redis, job_id)
```
Unlike failed jobs, stopped jobs will *not* be automatically retried if retry is configured. Subclasses of `Worker` which override `handle_job_failure()` should likewise take care to handle jobs with a `stopped` status appropriately.
## Job / Queue Creation with Custom Serializer
When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments.

@ -72,6 +72,9 @@ def handle_stop_job_command(worker, payload):
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:
# Sets the '_stopped_job_id' so that the job failure handler knows it
# was intentional.
worker._stopped_job_id = job_id
worker.kill_horse()
else:
worker.log.info('Not working on job %s, command ignored.', job_id)

@ -35,6 +35,7 @@ JobStatus = enum(
STARTED='started',
DEFERRED='deferred',
SCHEDULED='scheduled',
STOPPED='stopped',
)
# Sentinel value to mark that some of our lazily evaluated properties have not
@ -175,6 +176,10 @@ class Job(object):
def is_scheduled(self):
return self.get_status() == JobStatus.SCHEDULED
@property
def is_stopped(self):
return self.get_status() == JobStatus.STOPPED
@property
def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present to

@ -208,6 +208,8 @@ class Worker(object):
self._is_horse = False
self._horse_pid = 0
self._stop_requested = False
self._stopped_job_id = None
self.log = logger
self.log_job_description = log_job_description
self.last_cleaned_at = None
@ -797,8 +799,14 @@ class Worker(object):
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
elif job_status == JobStatus.STOPPED:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
self.handle_job_failure(
job, queue=queue,
exc_string="Job stopped by user, work-horse terminated."
)
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
@ -811,7 +819,7 @@ class Worker(object):
self.handle_job_failure(
job, queue=queue,
exc_string="Work-horse was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
"(waitpid returned %s)" % ret_val
)
def execute_job(self, job, queue):
@ -895,14 +903,19 @@ class Worker(object):
job_class=self.job_class
)
job.worker_name = None
# Requeue/reschedule if retry is configured
if job.retries_left and job.retries_left > 0:
retry = True
retry_interval = job.get_retry_interval()
job.retries_left = job.retries_left - 1
# check whether a job was stopped intentionally and set the job
# status appropriately if it was this job.
job_is_stopped = self._stopped_job_id == job.id
retry = job.retries_left and job.retries_left > 0 and not job_is_stopped
if job_is_stopped:
job.set_status(JobStatus.STOPPED, pipeline=pipeline)
self._stopped_job_id = None
else:
retry = False
job.set_status(JobStatus.FAILED, pipeline=pipeline)
# Requeue/reschedule if retry is configured, otherwise
if not retry:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
@ -920,6 +933,8 @@ class Worker(object):
)
if retry:
retry_interval = job.get_retry_interval()
job.retries_left = job.retries_left - 1
if retry_interval:
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
job.set_status(JobStatus.SCHEDULED)

@ -92,6 +92,10 @@ class TestCommands(RQTestCase):
send_stop_job_command(connection, job_id=job.id)
time.sleep(0.25)
# Job status is set appropriately
self.assertTrue(job.is_stopped)
# Worker has stopped working
worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.IDLE)

Loading…
Cancel
Save