diff --git a/rq/job.py b/rq/job.py index 6e0333d..006283c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1377,7 +1377,7 @@ class Job: self.set_status(JobStatus.SCHEDULED) queue.schedule_job(self, scheduled_datetime, pipeline=pipeline) else: - queue.enqueue_job(self, pipeline=pipeline) + queue._enqueue_job(self, pipeline=pipeline) def register_dependency(self, pipeline: Optional['Pipeline'] = None): """Jobs may have dependencies. Jobs are enqueued only if the jobs they diff --git a/rq/queue.py b/rq/queue.py index c9d7283..985e02e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -660,12 +660,7 @@ class Queue: on_success=on_success, on_failure=on_failure, ) - - job = self.setup_dependencies(job, pipeline=pipeline) - # If we do not depend on an unfinished job, enqueue the job. - if job.get_status(refresh=False) != JobStatus.DEFERRED: - return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) - return job + return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) @staticmethod def prepare_data( @@ -736,7 +731,7 @@ class Queue: """ pipe = pipeline if pipeline is not None else self.connection.pipeline() jobs = [ - self.enqueue_job( + self._enqueue_job( self.create_job( job_data.func, args=job_data.args, @@ -977,7 +972,26 @@ class Queue: return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: - """Enqueues a job for delayed execution. + """Enqueues a job for delayed execution checking dependencies. + + Args: + job (Job): The job to enqueue + pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None. + at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False. + + Returns: + Job: The enqued job + """ + job.origin = self.name + job = self.setup_dependencies(job, pipeline=pipeline) + # If we do not depend on an unfinished job, enqueue the job. + if job.get_status(refresh=False) != JobStatus.DEFERRED: + return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) + return job + + + def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: + """Enqueues a job for delayed execution without checking dependencies. If Queue is instantiated with is_async=False, job is executed immediately. @@ -1103,10 +1117,10 @@ class Queue: registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) else: queue = self.__class__(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) # Only delete dependents_key if all dependents have been enqueued if len(jobs_to_enqueue) == len(dependent_job_ids): diff --git a/rq/registry.py b/rq/registry.py index 2b51fff..f7534fd 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -194,7 +194,7 @@ class BaseRegistry: job.ended_at = None job._exc_info = '' job.save() - job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front) + job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front) pipeline.execute() return job diff --git a/rq/scheduler.py b/rq/scheduler.py index e49b57e..86566ba 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -165,7 +165,7 @@ class RQScheduler: jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: - queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) + queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 980e805..a290a87 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -130,6 +130,24 @@ class TestDependencies(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FINISHED) + def test_enqueue_job_dependency(self): + """Enqueue via Queue.enqueue_job() with depencency""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job = Job.create(say_hello) + parent_job.save() + job = Job.create(say_hello, depends_on=parent_job) + q.enqueue_job(job) + w.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + q.enqueue_job(parent_job) + w.work(burst=True) + self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn)