diff --git a/rq/job.py b/rq/job.py index 3a5c7c3..da2b674 100644 --- a/rq/job.py +++ b/rq/job.py @@ -172,6 +172,10 @@ class Job(object): def is_deferred(self): return self.get_status() == JobStatus.DEFERRED + @property + def is_scheduled(self): + return self.get_status() == JobStatus.SCHEDULED + @property def _dependency_id(self): """Returns the first item in self._dependency_ids. Present @@ -618,6 +622,13 @@ class Job(object): job_class=self.__class__) registry.remove(self, pipeline=pipeline) + elif self.is_scheduled: + from .registry import ScheduledJobRegistry + registry = ScheduledJobRegistry(self.origin, + connection=self.connection, + job_class=self.__class__) + registry.remove(self, pipeline=pipeline) + elif self.is_failed: self.failed_job_registry.remove(self, pipeline=pipeline) diff --git a/rq/queue.py b/rq/queue.py index 89bbea3..273a142 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -248,14 +248,14 @@ class Queue(object): def create_job(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, - meta=None): + meta=None, status=JobStatus.QUEUED): """Creates a job based on parameters given.""" timeout = parse_timeout(timeout) or self._default_timeout job = self.job_class.create( func, args=args, kwargs=kwargs, connection=self.connection, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - status=JobStatus.QUEUED, description=description, + status=status, description=description, depends_on=depends_on, timeout=timeout, id=job_id, origin=self.name, meta=meta ) @@ -384,7 +384,7 @@ class Queue(object): """Schedules a job to be enqueued at specified time""" from .registry import ScheduledJobRegistry - job = self.create_job(func, *args, **kwargs) + job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs) registry = ScheduledJobRegistry(queue=self) with self.connection.pipeline() as pipeline: job.save(pipeline=pipeline) diff --git a/tests/test_job.py b/tests/test_job.py index 8f168d2..aa0bdad 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -14,7 +14,8 @@ from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, - FinishedJobRegistry, StartedJobRegistry) + FinishedJobRegistry, StartedJobRegistry, + ScheduledJobRegistry) from rq.utils import utcformat from rq.worker import Worker from tests import RQTestCase, fixtures @@ -579,6 +580,16 @@ class TestJob(RQTestCase): job.delete() self.assertFalse(job in registry) + job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED, + connection=self.testconn, origin='default') + job.save() + + registry = ScheduledJobRegistry(connection=self.testconn) + registry.add(job, 500) + + job.delete() + self.assertFalse(job in registry) + def test_job_with_dependents_delete_parent_with_saved(self): """job.delete() deletes itself from Redis but not dependents. If the dependent job was saved, it will remain in redis.""" diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 459fec8..88c7fa5 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -267,10 +267,13 @@ class TestQueue(RQTestCase): scheduler = RQScheduler([queue], connection=self.testconn) scheduler.acquire_locks() # Jobs created using enqueue_at is put in the ScheduledJobRegistry - queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) + job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello) self.assertEqual(len(queue), 0) self.assertEqual(len(registry), 1) + # enqueue_at set job status to "scheduled" + self.assertTrue(job.get_status() == 'scheduled') + # After enqueue_scheduled_jobs() is called, the registry is empty # and job is enqueued scheduler.enqueue_scheduled_jobs()