|
|
|
@ -231,25 +231,24 @@ class Worker(object):
|
|
|
|
|
"worker.state is deprecated, use worker.get_state() instead."
|
|
|
|
|
)
|
|
|
|
|
return self.get_state()
|
|
|
|
|
|
|
|
|
|
state = property(_get_state, _set_state)
|
|
|
|
|
|
|
|
|
|
def set_job_id(self, new_job_id, pipeline=None):
|
|
|
|
|
self._job_id = new_job_id
|
|
|
|
|
state = property(_get_state, _set_state)
|
|
|
|
|
|
|
|
|
|
def set_current_job_id(self, job_id, pipeline=None):
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
|
|
|
|
|
if new_job_id is None:
|
|
|
|
|
if job_id is None:
|
|
|
|
|
connection.hdel(self.key, 'current_job')
|
|
|
|
|
else:
|
|
|
|
|
connection.hset(self.key, 'current_job', new_job_id)
|
|
|
|
|
connection.hset(self.key, 'current_job', job_id)
|
|
|
|
|
|
|
|
|
|
def get_job_id(self):
|
|
|
|
|
return self._job_id
|
|
|
|
|
def get_current_job_id(self, pipeline=None):
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
return connection.hget(self.key, 'current_job')
|
|
|
|
|
|
|
|
|
|
def get_current_job(self):
|
|
|
|
|
"""Returns the job id of the currently executing job."""
|
|
|
|
|
job_id = self.get_job_id()
|
|
|
|
|
job_id = self.get_current_job_id()
|
|
|
|
|
|
|
|
|
|
if job_id is None:
|
|
|
|
|
return None
|
|
|
|
@ -346,7 +345,7 @@ class Worker(object):
|
|
|
|
|
self.set_state('busy')
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
|
self.set_job_id(job.id)
|
|
|
|
|
self.set_current_job_id(job.id)
|
|
|
|
|
|
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
|
job.status = Status.STARTED
|
|
|
|
@ -356,7 +355,7 @@ class Worker(object):
|
|
|
|
|
self.heartbeat((job.timeout or 180) + 60)
|
|
|
|
|
self.execute_job(job)
|
|
|
|
|
self.heartbeat()
|
|
|
|
|
self.set_job_id(None)
|
|
|
|
|
self.set_current_job_id(None)
|
|
|
|
|
|
|
|
|
|
if job.status == Status.FINISHED:
|
|
|
|
|
queue.enqueue_dependents(job)
|
|
|
|
|