refactor: job status check (#1035)

main
Chyroc 6 years ago committed by Selwin Ong
parent 7eb95bf405
commit d9798fd64f

@ -25,7 +25,6 @@ except ImportError: # noqa # pragma: no cover
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads loads = pickle.loads
JobStatus = enum( JobStatus = enum(
'JobStatus', 'JobStatus',
QUEUED='queued', QUEUED='queued',
@ -183,6 +182,10 @@ class Job(object):
def is_started(self): def is_started(self):
return self.get_status() == JobStatus.STARTED return self.get_status() == JobStatus.STARTED
@property
def is_deferred(self):
return self.get_status() == JobStatus.DEFERRED
@property @property
def dependency(self): def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache """Returns a job's dependency. To avoid repeated Redis fetches, we cache
@ -457,7 +460,6 @@ class Job(object):
# Fallback to uncompressed string # Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info) self.exc_info = as_text(raw_exc_info)
def to_dict(self, include_meta=True): def to_dict(self, include_meta=True):
""" """
Returns a serialization of the current job instance Returns a serialization of the current job instance
@ -545,28 +547,28 @@ class Job(object):
self.cancel(pipeline=pipeline) self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if self.get_status() == JobStatus.FINISHED: if self.is_finished:
from .registry import FinishedJobRegistry from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin, registry = FinishedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.DEFERRED: elif self.is_deferred:
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, registry = DeferredJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.STARTED: elif self.is_started:
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin, registry = StartedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.FAILED: elif self.is_failed:
from .queue import get_failed_queue from .queue import get_failed_queue
failed_queue = get_failed_queue(connection=self.connection, failed_queue = get_failed_queue(connection=self.connection,
job_class=self.__class__) job_class=self.__class__)

Loading…
Cancel
Save