enqueue_at should support explicit args and kwargs (#1211)

main
Selwin Ong 5 years ago committed by GitHub
parent 8f7dbf1b1d
commit d8bd455c12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -250,7 +250,19 @@ class Queue(object):
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED): meta=None, status=JobStatus.QUEUED):
"""Creates a job based on parameters given.""" """Creates a job based on parameters given."""
timeout = parse_timeout(timeout) or self._default_timeout timeout = parse_timeout(timeout)
if timeout is None:
timeout = self._default_timeout
elif timeout == 0:
raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
result_ttl = parse_timeout(result_ttl)
failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl)
if ttl is not None and ttl <= 0:
raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create( job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection, func, args=args, kwargs=kwargs, connection=self.connection,
@ -273,25 +285,11 @@ class Queue(object):
and kwargs as explicit arguments. Any kwargs passed to this function and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = parse_timeout(timeout)
if timeout is None:
timeout = self._default_timeout
elif timeout == 0:
raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
result_ttl = parse_timeout(result_ttl)
failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl)
if ttl is not None and ttl <= 0:
raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create( job = self.create_job(
func, args=args, kwargs=kwargs, connection=self.connection, func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, failure_ttl=failure_ttl, description=description, depends_on=depends_on,
description=description, depends_on=depends_on, origin=self.name, job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
) )
# If a _dependent_ job depends on any unfinished job, register all the # If a _dependent_ job depends on any unfinished job, register all the
@ -338,12 +336,10 @@ class Queue(object):
job.cleanup(DEFAULT_RESULT_TTL) job.cleanup(DEFAULT_RESULT_TTL)
return job return job
def enqueue(self, f, *args, **kwargs): @classmethod
"""Creates a job to represent the delayed function call and enqueues def parse_args(cls, f, *args, **kwargs):
it. """
Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()`
Expects the function to call, along with the arguments and keyword
arguments.
The function argument `f` may be any of the following: The function argument `f` may be any of the following:
@ -373,6 +369,15 @@ class Queue(object):
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call( return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout, func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
@ -380,11 +385,17 @@ class Queue(object):
at_front=at_front, meta=meta at_front=at_front, meta=meta
) )
def enqueue_at(self, datetime, func, *args, **kwargs): def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time""" """Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs) (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta)
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
job.save(pipeline=pipeline) job.save(pipeline=pipeline)

@ -327,6 +327,19 @@ class TestQueue(RQTestCase):
((1,), {'timeout': 1, 'result_ttl': 1}) ((1,), {'timeout': 1, 'result_ttl': 1})
) )
# Explicit args and kwargs should also work with enqueue_at
time = datetime.now(utc) + timedelta(seconds=10)
job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
self.assertEqual(
job.perform(),
((1,), {'timeout': 1, 'result_ttl': 1})
)
# Positional arguments is not allowed if explicit args and kwargs are used
self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs)
def test_all_queues(self): def test_all_queues(self):
"""All queues""" """All queues"""
q1 = Queue('first-queue') q1 = Queue('first-queue')

Loading…
Cancel
Save