|
|
@ -14,7 +14,7 @@ import traceback
|
|
|
|
import logging
|
|
|
|
import logging
|
|
|
|
from .queue import Queue, get_failed_queue
|
|
|
|
from .queue import Queue, get_failed_queue
|
|
|
|
from .connections import get_current_connection
|
|
|
|
from .connections import get_current_connection
|
|
|
|
from .job import Status
|
|
|
|
from .job import Job, Status
|
|
|
|
from .utils import make_colorizer, utcnow, utcformat
|
|
|
|
from .utils import make_colorizer, utcnow, utcformat
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
from .logutils import setup_loghandlers
|
|
|
|
from .exceptions import NoQueueError, DequeueTimeout
|
|
|
|
from .exceptions import NoQueueError, DequeueTimeout
|
|
|
@ -91,6 +91,7 @@ class Worker(object):
|
|
|
|
worker = cls([], name, connection=connection)
|
|
|
|
worker = cls([], name, connection=connection)
|
|
|
|
queues = as_text(connection.hget(worker.key, 'queues'))
|
|
|
|
queues = as_text(connection.hget(worker.key, 'queues'))
|
|
|
|
worker._state = connection.hget(worker.key, 'state') or '?'
|
|
|
|
worker._state = connection.hget(worker.key, 'state') or '?'
|
|
|
|
|
|
|
|
worker._job_id = connection.hget(worker.key, 'current_job') or None
|
|
|
|
if queues:
|
|
|
|
if queues:
|
|
|
|
worker.queues = [Queue(queue, connection=connection)
|
|
|
|
worker.queues = [Queue(queue, connection=connection)
|
|
|
|
for queue in queues.split(',')]
|
|
|
|
for queue in queues.split(',')]
|
|
|
@ -213,9 +214,10 @@ class Worker(object):
|
|
|
|
p.expire(self.key, 60)
|
|
|
|
p.expire(self.key, 60)
|
|
|
|
p.execute()
|
|
|
|
p.execute()
|
|
|
|
|
|
|
|
|
|
|
|
def set_state(self, state):
|
|
|
|
def set_state(self, state, pipeline=None):
|
|
|
|
self._state = state
|
|
|
|
self._state = state
|
|
|
|
self.connection.hset(self.key, 'state', state)
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
|
|
|
connection.hset(self.key, 'state', state)
|
|
|
|
|
|
|
|
|
|
|
|
def _set_state(self, state):
|
|
|
|
def _set_state(self, state):
|
|
|
|
self.set_state(state)
|
|
|
|
self.set_state(state)
|
|
|
@ -232,6 +234,28 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
state = property(_get_state, _set_state)
|
|
|
|
state = property(_get_state, _set_state)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_job_id(self, new_job_id, pipeline=None):
|
|
|
|
|
|
|
|
self._job_id = new_job_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if new_job_id is None:
|
|
|
|
|
|
|
|
connection.hdel(self.key, 'current_job')
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
connection.hset(self.key, 'current_job', new_job_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_job_id(self):
|
|
|
|
|
|
|
|
return self._job_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_current_job(self):
|
|
|
|
|
|
|
|
"""Returns the job id of the currently executing job."""
|
|
|
|
|
|
|
|
job_id = self.get_job_id()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if job_id is None:
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Job.safe_fetch(job_id)
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
|
def stopped(self):
|
|
|
|
def stopped(self):
|
|
|
|
return self._stopped
|
|
|
|
return self._stopped
|
|
|
@ -322,6 +346,8 @@ class Worker(object):
|
|
|
|
self.set_state('busy')
|
|
|
|
self.set_state('busy')
|
|
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
job, queue = result
|
|
|
|
|
|
|
|
self.set_job_id(job.id)
|
|
|
|
|
|
|
|
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
job.status = Status.STARTED
|
|
|
|
job.status = Status.STARTED
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
@ -330,6 +356,8 @@ class Worker(object):
|
|
|
|
self.heartbeat((job.timeout or 180) + 60)
|
|
|
|
self.heartbeat((job.timeout or 180) + 60)
|
|
|
|
self.execute_job(job)
|
|
|
|
self.execute_job(job)
|
|
|
|
self.heartbeat()
|
|
|
|
self.heartbeat()
|
|
|
|
|
|
|
|
self.set_job_id(None)
|
|
|
|
|
|
|
|
|
|
|
|
if job.status == Status.FINISHED:
|
|
|
|
if job.status == Status.FINISHED:
|
|
|
|
queue.enqueue_dependents(job)
|
|
|
|
queue.enqueue_dependents(job)
|
|
|
|
|
|
|
|
|
|
|
|