Add job status setting in enqueue_at (and in enqueue_in) methods (#1181)

* Add job status setting in enqueue_at (and in enqueue_in) methods

Update tests for this change
Closes: #1179

* Add status param to create_job func, rework enqueue_at status setting
main
Ivan Kiryanov 5 years ago committed by Selwin Ong
parent ccfd4a02cb
commit ed67de22c6

@ -172,6 +172,10 @@ class Job(object):
def is_deferred(self): def is_deferred(self):
return self.get_status() == JobStatus.DEFERRED return self.get_status() == JobStatus.DEFERRED
@property
def is_scheduled(self):
return self.get_status() == JobStatus.SCHEDULED
@property @property
def _dependency_id(self): def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present """Returns the first item in self._dependency_ids. Present
@ -618,6 +622,13 @@ class Job(object):
job_class=self.__class__) job_class=self.__class__)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_scheduled:
from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.is_failed: elif self.is_failed:
self.failed_job_registry.remove(self, pipeline=pipeline) self.failed_job_registry.remove(self, pipeline=pipeline)

@ -248,14 +248,14 @@ class Queue(object):
def create_job(self, func, args=None, kwargs=None, timeout=None, def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
meta=None): meta=None, status=JobStatus.QUEUED):
"""Creates a job based on parameters given.""" """Creates a job based on parameters given."""
timeout = parse_timeout(timeout) or self._default_timeout timeout = parse_timeout(timeout) or self._default_timeout
job = self.job_class.create( job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection, func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=JobStatus.QUEUED, description=description, status=status, description=description,
depends_on=depends_on, timeout=timeout, id=job_id, depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta origin=self.name, meta=meta
) )
@ -384,7 +384,7 @@ class Queue(object):
"""Schedules a job to be enqueued at specified time""" """Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
job = self.create_job(func, *args, **kwargs) job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs)
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
job.save(pipeline=pipeline) job.save(pipeline=pipeline)

@ -14,7 +14,8 @@ from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry) FinishedJobRegistry, StartedJobRegistry,
ScheduledJobRegistry)
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase, fixtures from tests import RQTestCase, fixtures
@ -579,6 +580,16 @@ class TestJob(RQTestCase):
job.delete() job.delete()
self.assertFalse(job in registry) self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED,
connection=self.testconn, origin='default')
job.save()
registry = ScheduledJobRegistry(connection=self.testconn)
registry.add(job, 500)
job.delete()
self.assertFalse(job in registry)
def test_job_with_dependents_delete_parent_with_saved(self): def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the """job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis.""" dependent job was saved, it will remain in redis."""

@ -267,10 +267,13 @@ class TestQueue(RQTestCase):
scheduler = RQScheduler([queue], connection=self.testconn) scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks() scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry # Jobs created using enqueue_at is put in the ScheduledJobRegistry
queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
self.assertEqual(len(queue), 0) self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 1) self.assertEqual(len(registry), 1)
# enqueue_at set job status to "scheduled"
self.assertTrue(job.get_status() == 'scheduled')
# After enqueue_scheduled_jobs() is called, the registry is empty # After enqueue_scheduled_jobs() is called, the registry is empty
# and job is enqueued # and job is enqueued
scheduler.enqueue_scheduled_jobs() scheduler.enqueue_scheduled_jobs()

Loading…
Cancel
Save