Moved some logic into worker.prepare_job_execution to make things testable.

main
Selwin Ong 11 years ago
parent 90c7eeb111
commit f38d0dc791

@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION from .version import VERSION
from .working_queue import WorkingQueue
try: try:
from procname import setprocname from procname import setprocname
@ -403,7 +404,7 @@ class Worker(object):
self.heartbeat() self.heartbeat()
return result return result
def heartbeat(self, timeout=0): def heartbeat(self, timeout=0, pipeline=None):
"""Specifies a new worker timeout, typically by extending the """Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat" expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes. to not expire the worker until the timeout passes.
@ -415,7 +416,8 @@ class Worker(object):
only larger. only larger.
""" """
timeout = max(timeout, self.default_worker_ttl) timeout = max(timeout, self.default_worker_ttl)
self.connection.expire(self.key, timeout) connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout)) 'Next one should arrive within {0} seconds.'.format(timeout))
@ -468,19 +470,26 @@ class Worker(object):
# constrast to the regular sys.exit() # constrast to the regular sys.exit()
os._exit(int(not success)) os._exit(int(not success))
def perform_job(self, job): def prepare_job_execution(self, job):
"""Performs the actual work of a job. Will/should only be called """Performs misc bookkeeping like updating states prior to
inside the work horse's process. job execution.
""" """
with self.connection._pipeline() as pipeline:
self.set_state('busy') self.set_state('busy', pipeline=pipeline)
self.set_current_job_id(job.id) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat((job.timeout or 180) + 60) self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
job.func_name, job.func_name,
job.origin, time.time())) job.origin, time.time()))
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
try: try:
job.set_status(Status.STARTED) job.set_status(Status.STARTED)

Loading…
Cancel
Save