Added job_id property, storing the id of the currently executing job.

main
Marc Brinkmann 11 years ago
parent 606d4fa10f
commit c745f6811a

@ -93,6 +93,7 @@ class Worker(object):
worker = cls([], name, connection=connection) worker = cls([], name, connection=connection)
queues = as_text(connection.hget(worker.key, 'queues')) queues = as_text(connection.hget(worker.key, 'queues'))
worker._state = connection.hget(worker.key, 'state') or '?' worker._state = connection.hget(worker.key, 'state') or '?'
worker._job_id = connection.hget(worker.key, 'current_job') or None
if queues: if queues:
worker.queues = [Queue(queue, connection=connection) worker.queues = [Queue(queue, connection=connection)
for queue in queues.split(',')] for queue in queues.split(',')]
@ -226,6 +227,21 @@ class Worker(object):
state = property(get_state, set_state) state = property(get_state, set_state)
def set_job_id(self, new_job_id, pipeline=None):
self._job_id = new_job_id
connection = pipeline if pipeline is not None else self.connection
if new_job_id is None:
connection.hdel(self.key, 'current_job')
else:
connection.hset(self.key, 'current_job', new_job_id)
def get_job_id(self):
return self._job_id
job_id = property(get_job_id, set_job_id)
@property @property
def stopped(self): def stopped(self):
return self._stopped return self._stopped
@ -320,6 +336,8 @@ class Worker(object):
self.state = 'busy' self.state = 'busy'
job, queue = result job, queue = result
self.job_id = job.id
# Use the public setter here, to immediately update Redis # Use the public setter here, to immediately update Redis
job.status = Status.STARTED job.status = Status.STARTED
self.log.info('%s: %s (%s)' % (green(queue.name), self.log.info('%s: %s (%s)' % (green(queue.name),
@ -328,6 +346,8 @@ class Worker(object):
self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl) self.connection.expire(self.key, self.default_worker_ttl)
self.job_id = None
if job.status == 'finished': if job.status == 'finished':
queue.enqueue_dependents(job) queue.enqueue_dependents(job)

Loading…
Cancel
Save