refactored worker code

Moved code into a new handle_job_success() method and reduced context of used
pipelines.
main
Stefan Hammer 8 years ago
parent 44f98693c7
commit a0cee2d2a0

@ -536,29 +536,23 @@ class Worker(object):
# Job completed and its ttl has expired # Job completed and its ttl has expired
break break
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
with self.connection._pipeline() as pipeline: self.handle_job_failure(
self.handle_job_failure( job=job
job=job, )
pipeline=pipeline
#Unhandled failure: move the job to the failed queue
self.log.warning(
'Moving job to {0!r} queue'.format(
self.failed_queue.name
) )
try: )
pipeline.execute() self.failed_queue.quarantine(
except Exception: job,
pass exc_info=(
"Work-horse proccess "
#Unhandled failure: move the job to the failed queue "was terminated unexpectedly"
self.log.warning(
'Moving job to {0!r} queue'.format(
self.failed_queue.name
)
)
self.failed_queue.quarantine(
job,
exc_info=(
"Work-horse proccess "
"was terminated unexpectedly"
)
) )
)
break break
except OSError as e: except OSError as e:
# In case we encountered an OSError due to EINTR (which is # In case we encountered an OSError due to EINTR (which is
@ -631,8 +625,7 @@ class Worker(object):
def handle_job_failure( def handle_job_failure(
self, self,
job, job,
started_job_registry=None, started_job_registry=None
pipeline=None
): ):
"""Handles the failure or an executing job by: """Handles the failure or an executing job by:
1. Setting the job status to failed 1. Setting the job status to failed
@ -640,89 +633,99 @@ class Worker(object):
3. Setting the workers current job to None 3. Setting the workers current job to None
""" """
if started_job_registry is None: with self.connection._pipeline() as pipeline:
started_job_registry = StartedJobRegistry( if started_job_registry is None:
job.origin, started_job_registry = StartedJobRegistry(
self.connection job.origin,
) self.connection
job.set_status(JobStatus.FAILED, pipeline=pipeline) )
started_job_registry.remove(job, pipeline=pipeline) job.set_status(JobStatus.FAILED, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
def perform_job(self, job, queue): try:
"""Performs the actual work of a job. Will/should only be called pipeline.execute()
inside the work horse's process. except Exception:
""" # Ensure that custom exception handlers are called
self.prepare_job_execution(job) # even if Redis is down
pass
def handle_job_success(
self,
job,
queue,
started_job_registry
):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
while True:
try:
# if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute()
pipeline.watch(job.dependents_key)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
push_connection(self.connection) self.set_current_job_id(None, pipeline=pipeline)
started_job_registry = StartedJobRegistry(job.origin, self.connection) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
try: finished_job_registry = FinishedJobRegistry(job.origin,
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): self.connection)
rv = job.perform() finished_job_registry.add(job, result_ttl, pipeline)
# Pickle the result in the same try-except block since we need job.cleanup(result_ttl, pipeline=pipeline,
# to use the same exc handling when pickling fails remove_from_queue=False)
job._result = rv started_job_registry.remove(job, pipeline=pipeline)
result_ttl = job.get_result_ttl(self.default_result_ttl) pipeline.execute()
if result_ttl != 0: break
job.ended_at = utcnow() except WatchError:
continue
while True: def perform_job(self, job, queue):
try: """Performs the actual work of a job. Will/should only be called
# if dependencies are inserted after enqueue_dependents inside the work horse's process.
# a WatchError is thrown by execute() """
pipeline.watch(job.dependents_key) self.prepare_job_execution(job)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) push_connection(self.connection)
if result_ttl != 0: started_job_registry = StartedJobRegistry(job.origin, self.connection)
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, try:
self.connection) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
finished_job_registry.add(job, result_ttl, pipeline) rv = job.perform()
job.cleanup(result_ttl, pipeline=pipeline, job.ended_at = utcnow()
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute() # Pickle the result in the same try-except block since we need
break # to use the same exc handling when pickling fails
except WatchError: job._result = rv
continue
except Exception: self.handle_job_success(
self.handle_job_failure( job=job,
job=job, queue=queue,
started_job_registry=started_job_registry, started_job_registry=started_job_registry
pipeline=pipeline )
) except Exception:
try: self.handle_job_failure(
pipeline.execute() job=job,
except Exception: started_job_registry=started_job_registry
# Ensure that custom exception handlers are called )
# even if Redis is down self.handle_exception(job, *sys.exc_info())
pass return False
self.handle_exception(job, *sys.exc_info())
return False
finally: finally:
pop_connection() pop_connection()
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
if rv is not None: if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv))) log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.debug('Result: {0}'.format(yellow(log_result))) self.log.debug('Result: {0}'.format(yellow(log_result)))
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl == 0: if result_ttl == 0:
self.log.info('Result discarded immediately') self.log.info('Result discarded immediately')
elif result_ttl > 0: elif result_ttl > 0:

@ -578,7 +578,7 @@ class TestWorker(RQTestCase):
def new_enqueue_dependents(self, job, *args, **kwargs): def new_enqueue_dependents(self, job, *args, **kwargs):
orig_enqueue_dependents(self, job, *args, **kwargs) orig_enqueue_dependents(self, job, *args, **kwargs)
if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id: if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue is not None and Queue._add_enqueue.id == job.id:
Queue._add_enqueue = None Queue._add_enqueue = None
Queue().enqueue_call(say_hello, depends_on=job) Queue().enqueue_call(say_hello, depends_on=job)

Loading…
Cancel
Save