diff --git a/rq/queue.py b/rq/queue.py index 273a142..cff48fb 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -250,7 +250,19 @@ class Queue(object): description=None, depends_on=None, job_id=None, meta=None, status=JobStatus.QUEUED): """Creates a job based on parameters given.""" - timeout = parse_timeout(timeout) or self._default_timeout + timeout = parse_timeout(timeout) + + if timeout is None: + timeout = self._default_timeout + elif timeout == 0: + raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout') + + result_ttl = parse_timeout(result_ttl) + failure_ttl = parse_timeout(failure_ttl) + + ttl = parse_timeout(ttl) + if ttl is not None and ttl <= 0: + raise ValueError('Job ttl must be greater than 0') job = self.job_class.create( func, args=args, kwargs=kwargs, connection=self.connection, @@ -273,25 +285,11 @@ class Queue(object): and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. """ - timeout = parse_timeout(timeout) - - if timeout is None: - timeout = self._default_timeout - elif timeout == 0: - raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout') - result_ttl = parse_timeout(result_ttl) - failure_ttl = parse_timeout(failure_ttl) - - ttl = parse_timeout(ttl) - if ttl is not None and ttl <= 0: - raise ValueError('Job ttl must be greater than 0') - - job = self.job_class.create( - func, args=args, kwargs=kwargs, connection=self.connection, - result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - description=description, depends_on=depends_on, origin=self.name, - id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, + job = self.create_job( + func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, + failure_ttl=failure_ttl, description=description, depends_on=depends_on, + job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, ) # If a _dependent_ job depends on any unfinished job, register all the @@ -338,12 +336,10 @@ class Queue(object): job.cleanup(DEFAULT_RESULT_TTL) return job - def enqueue(self, f, *args, **kwargs): - """Creates a job to represent the delayed function call and enqueues - it. - - Expects the function to call, along with the arguments and keyword - arguments. + @classmethod + def parse_args(cls, f, *args, **kwargs): + """ + Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()` The function argument `f` may be any of the following: @@ -373,6 +369,15 @@ class Queue(object): args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) + return (f, timeout, description, result_ttl, ttl, failure_ttl, + depends_on, job_id, at_front, meta, args, kwargs) + + def enqueue(self, f, *args, **kwargs): + """Creates a job to represent the delayed function call and enqueues it.""" + + (f, timeout, description, result_ttl, ttl, failure_ttl, + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, @@ -380,11 +385,17 @@ class Queue(object): at_front=at_front, meta=meta ) - def enqueue_at(self, datetime, func, *args, **kwargs): + def enqueue_at(self, datetime, f, *args, **kwargs): """Schedules a job to be enqueued at specified time""" from .registry import ScheduledJobRegistry - job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs) + (f, timeout, description, result_ttl, ttl, failure_ttl, + depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, + timeout=timeout, result_ttl=result_ttl, ttl=ttl, + failure_ttl=failure_ttl, description=description, + depends_on=depends_on, job_id=job_id, meta=meta) + registry = ScheduledJobRegistry(queue=self) with self.connection.pipeline() as pipeline: job.save(pipeline=pipeline) diff --git a/tests/test_queue.py b/tests/test_queue.py index 865e14c..ffe08c5 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -327,6 +327,19 @@ class TestQueue(RQTestCase): ((1,), {'timeout': 1, 'result_ttl': 1}) ) + # Explicit args and kwargs should also work with enqueue_at + time = datetime.now(utc) + timedelta(seconds=10) + job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs) + self.assertEqual(job.timeout, 2) + self.assertEqual(job.result_ttl, 2) + self.assertEqual( + job.perform(), + ((1,), {'timeout': 1, 'result_ttl': 1}) + ) + + # Positional arguments is not allowed if explicit args and kwargs are used + self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs) + def test_all_queues(self): """All queues""" q1 = Queue('first-queue')