Allows enqueueing by the cli (#1466)

* Allows enqueueing by the cli

#372

* schedule support

* `_` to `-`

* fix flake8

* echo job-id

* Some improvements

 - Description as in python jobs
 - return result
 - quiet mode
 - allows `--boolean` and `--integer`
 - raises errors if not used correctly

* added tests

* add schedule tests

* add retry test

* use click exceptions

* add error test

* add job_func test

* change messages

https://github.com/rq/rq/pull/1466#discussion_r640211128
https://github.com/rq/rq/pull/1466#discussion_r640210850

* Use different format for arguments

View https://github.com/rq/rq/pull/1466#discussion_r650510889

* Add file support

Usage: @filename

* ast.literal_eval support with `#` instead of `:`

* func -> function

Makes error messages more readable

* click Error

* print function string

* add docs

* increase seconds in test

* Update `parse_function_arg`

Add `ParsingMode` enum (https://github.com/rq/rq/pull/1466#discussion_r656676114)
Change error messages (https://github.com/rq/rq/pull/1466#discussion_r656676800, https://github.com/rq/rq/pull/1466#discussion_r656677082)

* `#` to `%`

`#` is the letter for a comment in bash

* Add some tests

(https://github.com/rq/rq/pull/1466#discussion_r656674539, https://github.com/rq/rq/pull/1466#discussion_r656676543)

* Add some tests

* docs: Add some examples

* catch all literal_eval exceptions

There are some edge cases with other exceptions

* remove job_func
(https://github.com/rq/rq/pull/1466#pullrequestreview-690110118)

* edit docs

https://github.com/rq/rq/pull/1466#pullrequestreview-695758691

* format examples

* format examples

`queue.enqueue(path.to.func, args=['abc'])` to `queue.enqueue(path.to.func, 'abc')`

https://github.com/rq/rq/pull/1466#discussion_r673615464

* add examples

https://github.com/rq/rq/pull/1466#discussion_r673658933

* add doc test

https://github.com/rq/rq/pull/1466#discussion_r673659124

* Update index.md

* Update test_cli.py

* Update test_cli.py

* Add version info

Co-authored-by: rpkak <rpkak@users.noreply.github.com>
main
rpkak 3 years ago committed by GitHub
parent 794556c74b
commit d41f60b906
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -201,6 +201,80 @@ def report_failure(job, connection, type, value, traceback):
Failure callbacks are limited to 60 seconds of execution time. Failure callbacks are limited to 60 seconds of execution time.
### CLI Enqueueing
_New in version 1.10.0._
If you prefer enqueueing jobs via the command line interface or do not use python
you can use this.
#### Usage:
```bash
rq enqueue [OPTIONS] FUNCTION [ARGUMENTS]
```
#### Options:
* `-q, --queue [value]` The name of the queue.
* `--timeout [value]` Specifies the maximum runtime of the job before it is
interrupted and marked as failed.
* `--result-ttl [value]` Specifies how long successful jobs and their results
are kept.
* `--ttl [value]` Specifies the maximum queued time of the job before
it is discarded.
* `--failure-ttl [value]` Specifies how long failed jobs are kept.
* `--description [value]` Additional description of the job
* `--depends-on [value]` Specifies another job id that must complete before this
job will be queued.
* `--job-id [value]` The id of this job
* `--at-front` Will place the job at the front of the queue, instead
of the end
* `--retry-max [value]` Maximum number of retries
* `--retry-interval [value]` Interval between retries in seconds
* `--schedule-in [value]` Delay until the function is enqueued (e.g. 10s, 5m, 2d).
* `--schedule-at [value]` Schedule job to be enqueued at a certain time formatted
in ISO 8601 without timezone (e.g. 2021-05-27T21:45:00).
* `--quiet` Only logs errors.
#### Function:
There are two options:
* Execute a function: dot-separated string of package, module and function (Just like
passing a string to `queue.enqueue()`).
* Execute a python file: dot-separated pathname of the file. Because it is technically
an import `__name__ == '__main__'` will not work.
#### Arguments:
| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) |
|-|-|-|-|
| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` |
| no keyword | `[value]` | `:[value]` | `%[value]` |
Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding
parsing method.
If the first character of `[value]` is `@` the subsequent path will be read.
##### Examples:
* `rq enqueue path.to.func abc` -> `queue.enqueue(path.to.func, 'abc')`
* `rq enqueue path.to.func abc=def` -> `queue.enqueue(path.to.func, abc='def')`
* `rq enqueue path.to.func ':{"json": "abc"}'` -> `queue.enqueue(path.to.func, {'json': 'abc'})`
* `rq enqueue path.to.func 'key:={"json": "abc"}'` -> `queue.enqueue(path.to.func, key={'json': 'abc'})`
* `rq enqueue path.to.func '%1, 2'` -> `queue.enqueue(path.to.func, (1, 2))`
* `rq enqueue path.to.func '%None'` -> `queue.enqueue(path.to.func, None)`
* `rq enqueue path.to.func '%True'` -> `queue.enqueue(path.to.func, True)`
* `rq enqueue path.to.func 'key%=(1, 2)'` -> `queue.enqueue(path.to.func, key=(1, 2))`
* `rq enqueue path.to.func 'key%={"foo": True}'` -> `queue.enqueue(path.to.func, key={"foo": True})`
* `rq enqueue path.to.func @path/to/file` -> `queue.enqueue(path.to.func, open('path/to/file', 'r').read())`
* `rq enqueue path.to.func key=@path/to/file` -> `queue.enqueue(path.to.func, key=open('path/to/file', 'r').read())`
* `rq enqueue path.to.func :@path/to/file.json` -> `queue.enqueue(path.to.func, json.loads(open('path/to/file.json', 'r').read()))`
* `rq enqueue path.to.func key:=@path/to/file.json` -> `queue.enqueue(path.to.func, key=json.loads(open('path/to/file.json', 'r').read()))`
**Warning:** Do not use plain text without keyword if you do not know what the value is.
If the value starts with `@`, `:` or `%` or includes `=` it would be recognised as something else.
## Working with Queues ## Working with Queues
Besides enqueuing jobs, Queues have a few useful methods: Besides enqueuing jobs, Queues have a few useful methods:

@ -12,10 +12,11 @@ import sys
import click import click
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Connection, __version__ as version from rq import Connection, Retry, __version__ as version
from rq.cli.helpers import (read_config_file, refresh, from rq.cli.helpers import (read_config_file, refresh,
setup_loghandlers_from_args, setup_loghandlers_from_args,
show_both, show_queues, show_workers, CliConfig) show_both, show_queues, show_workers, CliConfig, parse_function_args,
parse_schedule)
from rq.contrib.legacy import cleanup_ghosts from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS,
@ -25,11 +26,14 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_SERIALIZER_CLASS) DEFAULT_SERIALIZER_CLASS)
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries from rq.registry import FailedJobRegistry, clean_registries
from rq.utils import import_attribute from rq.utils import import_attribute, get_call_string, make_colorizer
from rq.serializers import DefaultSerializer
from rq.suspension import (suspend as connection_suspend, from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended) resume as connection_resume, is_suspended)
from rq.worker_registration import clean_worker_registry from rq.worker_registration import clean_worker_registry
from rq.job import JobStatus
blue = make_colorizer('darkblue')
# Disable the warning that Click displays (as of Click version 5.0) when users # Disable the warning that Click displays (as of Click version 5.0) when users
@ -303,3 +307,53 @@ def resume(cli_config, **options):
"""Resumes processing of queues, that were suspended with `rq suspend`""" """Resumes processing of queues, that were suspended with `rq suspend`"""
connection_resume(cli_config.connection) connection_resume(cli_config.connection)
click.echo("Resuming workers.") click.echo("Resuming workers.")
@main.command()
@click.option('--queue', '-q', help='The name of the queue.', default='default')
@click.option('--timeout',
help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.')
@click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.')
@click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.')
@click.option('--failure-ttl', help='Specifies how long failed jobs are kept.')
@click.option('--description', help='Additional description of the job')
@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.',
multiple=True)
@click.option('--job-id', help='The id of this job')
@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end')
@click.option('--retry-max', help='Maximum amound of retries', default=0, type=int)
@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0])
@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).')
@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
'timezone (e.g. 2021-05-27T21:45:00).')
@click.option('--quiet', is_flag=True, help='Only logs errors.')
@click.argument('function')
@click.argument('arguments', nargs=-1)
@pass_cli_config
def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front,
retry_max, retry_interval, schedule_in, schedule_at, quiet, function, arguments, **options):
"""Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments)
function_string = get_call_string(function, args, kwargs)
description = description or function_string
retry = None
if retry_max > 0:
retry = Retry(retry_max, retry_interval)
schedule = parse_schedule(schedule_in, schedule_at)
with Connection(cli_config.connection):
queue = cli_config.queue_class(queue)
if schedule is None:
job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl,
description, depends_on, job_id, at_front, None, retry)
else:
job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl,
description, depends_on, job_id, None, JobStatus.SCHEDULED, retry)
queue.schedule_job(job, schedule)
if not quiet:
click.echo('Enqueued %s with job-id \'%s\'.' % (blue(function_string), job.id))

@ -7,6 +7,11 @@ import importlib
import time import time
import os import os
from functools import partial from functools import partial
from enum import Enum
from datetime import datetime, timezone, timedelta
from json import loads, JSONDecodeError
from ast import literal_eval
import click import click
import redis import redis
@ -15,7 +20,7 @@ from redis.sentinel import Sentinel
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
from rq.logutils import setup_loghandlers from rq.logutils import setup_loghandlers
from rq.utils import import_attribute from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus from rq.worker import WorkerStatus
red = partial(click.style, fg='red') red = partial(click.style, fg='red')
@ -208,6 +213,83 @@ def setup_loghandlers_from_args(verbose, quiet, date_format, log_format):
setup_loghandlers(level, date_format=date_format, log_format=log_format) setup_loghandlers(level, date_format=date_format, log_format=log_format)
def parse_function_arg(argument, arg_pos):
class ParsingMode(Enum):
PLAIN_TEXT = 0
JSON = 1
LITERAL_EVAL = 2
keyword = None
if argument.startswith(':'): # no keyword, json
mode = ParsingMode.JSON
value = argument[1:]
elif argument.startswith('%'): # no keyword, literal_eval
mode = ParsingMode.LITERAL_EVAL
value = argument[1:]
else:
index = argument.find('=')
if index > 0:
if ':' in argument and argument.index(':') + 1 == index: # keyword, json
mode = ParsingMode.JSON
keyword = argument[:index - 1]
elif '%' in argument and argument.index('%') + 1 == index: # keyword, literal_eval
mode = ParsingMode.LITERAL_EVAL
keyword = argument[:index - 1]
else: # keyword, text
mode = ParsingMode.PLAIN_TEXT
keyword = argument[:index]
value = argument[index + 1:]
else: # no keyword, text
mode = ParsingMode.PLAIN_TEXT
value = argument
if value.startswith('@'):
try:
with open(value[1:], 'r') as file:
value = file.read()
except FileNotFoundError:
raise click.FileError(value[1:], 'Not found')
if mode == ParsingMode.JSON: # json
try:
value = loads(value)
except JSONDecodeError:
raise click.BadParameter('Unable to parse %s as JSON.' % (keyword or '%s. non keyword argument' % arg_pos))
elif mode == ParsingMode.LITERAL_EVAL: # literal_eval
try:
value = literal_eval(value)
except Exception:
raise click.BadParameter('Unable to eval %s as Python object. See '
'https://docs.python.org/3/library/ast.html#ast.literal_eval'
% (keyword or '%s. non keyword argument' % arg_pos))
return keyword, value
def parse_function_args(arguments):
args = []
kwargs = {}
for argument in arguments:
keyword, value = parse_function_arg(argument, len(args) + 1)
if keyword is not None:
if keyword in kwargs:
raise click.BadParameter('You can\'t specify multiple values for the same keyword.')
kwargs[keyword] = value
else:
args.append(value)
return args, kwargs
def parse_schedule(schedule_in, schedule_at):
if schedule_in is not None:
if schedule_at is not None:
raise click.BadArgumentUsage('You can\'t specify both --schedule-in and --schedule-at')
return datetime.now(timezone.utc) + timedelta(seconds=parse_timeout(schedule_in))
elif schedule_at is not None:
return datetime.strptime(schedule_at, '%Y-%m-%dT%H:%M:%S')
class CliConfig: class CliConfig:
"""A helper class to be used with click commands, to handle shared options""" """A helper class to be used with click commands, to handle shared options"""
def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS, def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS,

@ -0,0 +1,3 @@
{
"test": true
}

@ -2,20 +2,24 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import datetime, timezone from datetime import datetime, timezone, timedelta
from time import sleep
from uuid import uuid4
import os import os
import json
from click.testing import CliRunner from click.testing import CliRunner
from redis import Redis from redis import Redis
from rq import Queue from rq import Queue
from rq.cli import main from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule
from rq.job import Job from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
from rq.worker import Worker, WorkerStatus from rq.worker import Worker, WorkerStatus
from rq.scheduler import RQScheduler
import pytest import pytest
@ -369,3 +373,287 @@ class TestRQCli(RQTestCase):
runner.invoke(main, ['worker', '-u', self.redis_url, runner.invoke(main, ['worker', '-u', self.redis_url,
'--serializer rq.serializer.JSONSerializer']) '--serializer rq.serializer.JSONSerializer'])
self.assertIn(job.id, q.job_ids) self.assertIn(job.id, q.job_ids)
def test_cli_enqueue(self):
"""rq enqueue -u <url> tests.fixtures.say_hello"""
queue = Queue(connection=self.connection)
self.assertTrue(queue.is_empty())
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello'])
self.assert_normal_execution(result)
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
print(result.stdout)
self.assertTrue(result.stdout.startswith(prefix))
self.assertTrue(result.stdout.endswith(suffix))
job_id = result.stdout[len(prefix):-len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
worker = Worker(queue)
worker.work(True)
self.assertEqual(Job(job_id).result, 'Hi there, Stranger!')
def test_cli_enqueue_args(self):
"""rq enqueue -u <url> tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def"""
queue = Queue(connection=self.connection)
self.assertTrue(queue.is_empty())
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'hello',
':[1, {"key": "value"}]', ':@tests/test.json', '%1, 2', 'json:=[3.0, true]',
'nojson=abc', 'file=@tests/test.json'])
self.assert_normal_execution(result)
job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii')
worker = Worker(queue)
worker.work(True)
args, kwargs = Job(job_id).result
self.assertEqual(args, ('hello', [1, {'key': 'value'}], {"test": True}, (1, 2)))
self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{\n "test": true\n}\n'})
def test_cli_enqueue_schedule_in(self):
"""rq enqueue -u <url> tests.fixtures.say_hello --schedule-in 1s"""
queue = Queue(connection=self.connection)
registry = ScheduledJobRegistry(queue=queue)
worker = Worker(queue)
scheduler = RQScheduler(queue, self.connection)
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 0)
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
'--schedule-in', '10s'])
self.assert_normal_execution(result)
scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 1)
self.assertFalse(worker.work(True))
sleep(11)
scheduler.enqueue_scheduled_jobs()
self.assertTrue(len(queue) == 1)
self.assertTrue(len(registry) == 0)
self.assertTrue(worker.work(True))
def test_cli_enqueue_schedule_at(self):
"""
rq enqueue -u <url> tests.fixtures.say_hello --schedule-at 2021-01-01T00:00:00
rq enqueue -u <url> tests.fixtures.say_hello --schedule-at 2100-01-01T00:00:00
"""
queue = Queue(connection=self.connection)
registry = ScheduledJobRegistry(queue=queue)
worker = Worker(queue)
scheduler = RQScheduler(queue, self.connection)
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 0)
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
'--schedule-at', '2021-01-01T00:00:00'])
self.assert_normal_execution(result)
scheduler.acquire_locks()
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 1)
scheduler.enqueue_scheduled_jobs()
self.assertTrue(len(queue) == 1)
self.assertTrue(len(registry) == 0)
self.assertTrue(worker.work(True))
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 0)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
'--schedule-at', '2100-01-01T00:00:00'])
self.assert_normal_execution(result)
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 1)
scheduler.enqueue_scheduled_jobs()
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 1)
self.assertFalse(worker.work(True))
def test_cli_enqueue_retry(self):
"""rq enqueue -u <url> tests.fixtures.say_hello --retry-max 3 --retry-interval 10 --retry-interval 20
--retry-interval 40"""
queue = Queue(connection=self.connection)
self.assertTrue(queue.is_empty())
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--retry-max', '3',
'--retry-interval', '10', '--retry-interval', '20', '--retry-interval', '40'])
self.assert_normal_execution(result)
job = Job.fetch(self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'),
connection=self.connection)
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [10, 20, 40])
def test_cli_enqueue_errors(self):
"""
rq enqueue -u <url> tests.fixtures.echo :invalid_json
rq enqueue -u <url> tests.fixtures.echo %invalid_eval_statement
rq enqueue -u <url> tests.fixtures.echo key=value key=value
rq enqueue -u <url> tests.fixtures.echo --schedule-in 1s --schedule-at 2000-01-01T00:00:00
rq enqueue -u <url> tests.fixtures.echo @not_existing_file
"""
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', ':invalid_json'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo',
'%invalid_eval_statement'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify multiple values for the same keyword.', result.output)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '--schedule-in', '1s',
'--schedule-at', '2000-01-01T00:00:00'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output)
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file'])
self.assertNotEqual(result.exit_code, 0)
self.assertIn('Not found', result.output)
def test_parse_schedule(self):
"""executes the rq.cli.helpers.parse_schedule function"""
self.assertEqual(parse_schedule(None, '2000-01-23T23:45:01'), datetime(2000, 1, 23, 23, 45, 1))
start = datetime.now(timezone.utc) + timedelta(minutes=5)
middle = parse_schedule('5m', None)
end = datetime.now(timezone.utc) + timedelta(minutes=5)
self.assertGreater(middle, start)
self.assertLess(middle, end)
def test_parse_function_arg(self):
"""executes the rq.cli.helpers.parse_function_arg function"""
self.assertEqual(parse_function_arg('abc', 0), (None, 'abc'))
self.assertEqual(parse_function_arg(':{"json": true}', 1), (None, {'json': True}))
self.assertEqual(parse_function_arg('%1, 2', 2), (None, (1, 2)))
self.assertEqual(parse_function_arg('key=value', 3), ('key', 'value'))
self.assertEqual(parse_function_arg('jsonkey:=["json", "value"]', 4), ('jsonkey', ['json', 'value']))
self.assertEqual(parse_function_arg('evalkey%=1.2', 5), ('evalkey', 1.2))
self.assertEqual(parse_function_arg(':@tests/test.json', 6), (None, {'test': True}))
self.assertEqual(parse_function_arg('@tests/test.json', 7), (None, '{\n "test": true\n}\n'))
def test_cli_enqueue_doc_test(self):
"""tests the examples of the documentation"""
runner = CliRunner()
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), (['abc'], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%1, 2'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([(1, 2)], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%None'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([None], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%True'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([True], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {}))
id = str(uuid4())
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json'])
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())}))

Loading…
Cancel
Save