Add the queue to the Redis queues set when scheduling a job (#1238)

* Add the queue to the queues set when scheduling a job

* Fix the registry properties docstrings
main
Pierre Mdawar 5 years ago committed by GitHub
parent 0dd9ff0ec9
commit eb92d688a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -192,25 +192,25 @@ class Queue(object):
@property @property
def started_job_registry(self): def started_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class) return StartedJobRegistry(queue=self, job_class=self.job_class)
@property @property
def finished_job_registry(self): def finished_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry from rq.registry import FinishedJobRegistry
return FinishedJobRegistry(queue=self) return FinishedJobRegistry(queue=self)
@property @property
def deferred_job_registry(self): def deferred_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class) return DeferredJobRegistry(queue=self, job_class=self.job_class)
@property @property
def scheduled_job_registry(self): def scheduled_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class) return ScheduledJobRegistry(queue=self, job_class=self.job_class)
@ -401,6 +401,8 @@ class Queue(object):
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
# Add Queue key set
pipeline.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline) registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute() pipeline.execute()

Loading…
Cancel
Save