|
|
@ -843,25 +843,23 @@ class Worker(object):
|
|
|
|
self.connection,
|
|
|
|
self.connection,
|
|
|
|
job_class=self.job_class
|
|
|
|
job_class=self.job_class
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Requeue/reschedule if retry is configured
|
|
|
|
# Requeue/reschedule if retry is configured
|
|
|
|
if job.retries_left and job.retries_left > 0:
|
|
|
|
if job.retries_left and job.retries_left > 0:
|
|
|
|
retry = True
|
|
|
|
retry = True
|
|
|
|
retry_interval = job.get_retry_interval()
|
|
|
|
retry_interval = job.get_retry_interval()
|
|
|
|
job.retries_left = job.retries_left - 1
|
|
|
|
job.retries_left = job.retries_left - 1
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
retry = False
|
|
|
|
retry = False
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
if not self.disable_default_exception_handler:
|
|
|
|
if not self.disable_default_exception_handler:
|
|
|
|
|
|
|
|
|
|
|
|
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
|
|
|
|
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
|
|
|
|
job_class=self.job_class)
|
|
|
|
job_class=self.job_class)
|
|
|
|
failed_job_registry.add(job, ttl=job.failure_ttl,
|
|
|
|
failed_job_registry.add(job, ttl=job.failure_ttl,
|
|
|
|
exc_string=exc_string, pipeline=pipeline)
|
|
|
|
exc_string=exc_string, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
self.increment_failed_job_count(pipeline)
|
|
|
|
self.increment_failed_job_count(pipeline)
|
|
|
@ -869,7 +867,7 @@ class Worker(object):
|
|
|
|
self.increment_total_working_time(
|
|
|
|
self.increment_total_working_time(
|
|
|
|
job.ended_at - job.started_at, pipeline
|
|
|
|
job.ended_at - job.started_at, pipeline
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if retry:
|
|
|
|
if retry:
|
|
|
|
if retry_interval:
|
|
|
|
if retry_interval:
|
|
|
|
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
|
|
|
|
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
|
|
|
|