Moved some logic into worker.prepare_job_execution to make things testable.

main
Selwin Ong 11 years ago
parent a28575088b
commit b0c0a84ab0

@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION
from .working_queue import WorkingQueue
try:
from procname import setprocname
@ -403,7 +404,7 @@ class Worker(object):
self.heartbeat()
return result
def heartbeat(self, timeout=0):
def heartbeat(self, timeout=0, pipeline=None):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
@ -415,7 +416,8 @@ class Worker(object):
only larger.
"""
timeout = max(timeout, self.default_worker_ttl)
self.connection.expire(self.key, timeout)
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
@ -468,19 +470,26 @@ class Worker(object):
# constrast to the regular sys.exit()
os._exit(int(not success))
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
self.set_state('busy')
self.set_current_job_id(job.id)
self.heartbeat((job.timeout or 180) + 60)
with self.connection._pipeline() as pipeline:
self.set_state('busy', pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline:
try:
job.set_status(Status.STARTED)

Loading…
Cancel
Save