diff --git a/docs/docs/index.md b/docs/docs/index.md index 7cfc0a7..dc2d95a 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -201,6 +201,80 @@ def report_failure(job, connection, type, value, traceback): Failure callbacks are limited to 60 seconds of execution time. +### CLI Enqueueing + +_New in version 1.10.0._ + +If you prefer enqueueing jobs via the command line interface or do not use python +you can use this. + + +#### Usage: +```bash +rq enqueue [OPTIONS] FUNCTION [ARGUMENTS] +``` + +#### Options: +* `-q, --queue [value]` The name of the queue. +* `--timeout [value]` Specifies the maximum runtime of the job before it is + interrupted and marked as failed. +* `--result-ttl [value]` Specifies how long successful jobs and their results + are kept. +* `--ttl [value]` Specifies the maximum queued time of the job before + it is discarded. +* `--failure-ttl [value]` Specifies how long failed jobs are kept. +* `--description [value]` Additional description of the job +* `--depends-on [value]` Specifies another job id that must complete before this + job will be queued. +* `--job-id [value]` The id of this job +* `--at-front` Will place the job at the front of the queue, instead + of the end +* `--retry-max [value]` Maximum number of retries +* `--retry-interval [value]` Interval between retries in seconds +* `--schedule-in [value]` Delay until the function is enqueued (e.g. 10s, 5m, 2d). +* `--schedule-at [value]` Schedule job to be enqueued at a certain time formatted + in ISO 8601 without timezone (e.g. 2021-05-27T21:45:00). +* `--quiet` Only logs errors. + +#### Function: +There are two options: +* Execute a function: dot-separated string of package, module and function (Just like + passing a string to `queue.enqueue()`). +* Execute a python file: dot-separated pathname of the file. Because it is technically + an import `__name__ == '__main__'` will not work. + +#### Arguments: + +| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) | +|-|-|-|-| +| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` | +| no keyword | `[value]` | `:[value]` | `%[value]` | + +Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding +parsing method. + +If the first character of `[value]` is `@` the subsequent path will be read. + +##### Examples: + +* `rq enqueue path.to.func abc` -> `queue.enqueue(path.to.func, 'abc')` +* `rq enqueue path.to.func abc=def` -> `queue.enqueue(path.to.func, abc='def')` +* `rq enqueue path.to.func ':{"json": "abc"}'` -> `queue.enqueue(path.to.func, {'json': 'abc'})` +* `rq enqueue path.to.func 'key:={"json": "abc"}'` -> `queue.enqueue(path.to.func, key={'json': 'abc'})` +* `rq enqueue path.to.func '%1, 2'` -> `queue.enqueue(path.to.func, (1, 2))` +* `rq enqueue path.to.func '%None'` -> `queue.enqueue(path.to.func, None)` +* `rq enqueue path.to.func '%True'` -> `queue.enqueue(path.to.func, True)` +* `rq enqueue path.to.func 'key%=(1, 2)'` -> `queue.enqueue(path.to.func, key=(1, 2))` +* `rq enqueue path.to.func 'key%={"foo": True}'` -> `queue.enqueue(path.to.func, key={"foo": True})` +* `rq enqueue path.to.func @path/to/file` -> `queue.enqueue(path.to.func, open('path/to/file', 'r').read())` +* `rq enqueue path.to.func key=@path/to/file` -> `queue.enqueue(path.to.func, key=open('path/to/file', 'r').read())` +* `rq enqueue path.to.func :@path/to/file.json` -> `queue.enqueue(path.to.func, json.loads(open('path/to/file.json', 'r').read()))` +* `rq enqueue path.to.func key:=@path/to/file.json` -> `queue.enqueue(path.to.func, key=json.loads(open('path/to/file.json', 'r').read()))` + +**Warning:** Do not use plain text without keyword if you do not know what the value is. +If the value starts with `@`, `:` or `%` or includes `=` it would be recognised as something else. + + ## Working with Queues Besides enqueuing jobs, Queues have a few useful methods: diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 1486329..d1463de 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -12,10 +12,11 @@ import sys import click from redis.exceptions import ConnectionError -from rq import Connection, __version__ as version +from rq import Connection, Retry, __version__ as version from rq.cli.helpers import (read_config_file, refresh, setup_loghandlers_from_args, - show_both, show_queues, show_workers, CliConfig) + show_both, show_queues, show_workers, CliConfig, parse_function_args, + parse_schedule) from rq.contrib.legacy import cleanup_ghosts from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, @@ -25,11 +26,14 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_SERIALIZER_CLASS) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries -from rq.utils import import_attribute +from rq.utils import import_attribute, get_call_string, make_colorizer +from rq.serializers import DefaultSerializer from rq.suspension import (suspend as connection_suspend, resume as connection_resume, is_suspended) from rq.worker_registration import clean_worker_registry +from rq.job import JobStatus +blue = make_colorizer('darkblue') # Disable the warning that Click displays (as of Click version 5.0) when users @@ -303,3 +307,53 @@ def resume(cli_config, **options): """Resumes processing of queues, that were suspended with `rq suspend`""" connection_resume(cli_config.connection) click.echo("Resuming workers.") + + +@main.command() +@click.option('--queue', '-q', help='The name of the queue.', default='default') +@click.option('--timeout', + help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.') +@click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.') +@click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.') +@click.option('--failure-ttl', help='Specifies how long failed jobs are kept.') +@click.option('--description', help='Additional description of the job') +@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.', + multiple=True) +@click.option('--job-id', help='The id of this job') +@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') +@click.option('--retry-max', help='Maximum amound of retries', default=0, type=int) +@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) +@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') +@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' + 'timezone (e.g. 2021-05-27T21:45:00).') +@click.option('--quiet', is_flag=True, help='Only logs errors.') +@click.argument('function') +@click.argument('arguments', nargs=-1) +@pass_cli_config +def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, + retry_max, retry_interval, schedule_in, schedule_at, quiet, function, arguments, **options): + """Enqueues a job from the command line""" + args, kwargs = parse_function_args(arguments) + + function_string = get_call_string(function, args, kwargs) + description = description or function_string + + retry = None + if retry_max > 0: + retry = Retry(retry_max, retry_interval) + + schedule = parse_schedule(schedule_in, schedule_at) + + with Connection(cli_config.connection): + queue = cli_config.queue_class(queue) + + if schedule is None: + job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, + description, depends_on, job_id, at_front, None, retry) + else: + job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, + description, depends_on, job_id, None, JobStatus.SCHEDULED, retry) + queue.schedule_job(job, schedule) + + if not quiet: + click.echo('Enqueued %s with job-id \'%s\'.' % (blue(function_string), job.id)) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 01a63bf..ca585f7 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -7,6 +7,11 @@ import importlib import time import os from functools import partial +from enum import Enum + +from datetime import datetime, timezone, timedelta +from json import loads, JSONDecodeError +from ast import literal_eval import click import redis @@ -15,7 +20,7 @@ from redis.sentinel import Sentinel from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) from rq.logutils import setup_loghandlers -from rq.utils import import_attribute +from rq.utils import import_attribute, parse_timeout from rq.worker import WorkerStatus red = partial(click.style, fg='red') @@ -208,6 +213,83 @@ def setup_loghandlers_from_args(verbose, quiet, date_format, log_format): setup_loghandlers(level, date_format=date_format, log_format=log_format) +def parse_function_arg(argument, arg_pos): + class ParsingMode(Enum): + PLAIN_TEXT = 0 + JSON = 1 + LITERAL_EVAL = 2 + + keyword = None + if argument.startswith(':'): # no keyword, json + mode = ParsingMode.JSON + value = argument[1:] + elif argument.startswith('%'): # no keyword, literal_eval + mode = ParsingMode.LITERAL_EVAL + value = argument[1:] + else: + index = argument.find('=') + if index > 0: + if ':' in argument and argument.index(':') + 1 == index: # keyword, json + mode = ParsingMode.JSON + keyword = argument[:index - 1] + elif '%' in argument and argument.index('%') + 1 == index: # keyword, literal_eval + mode = ParsingMode.LITERAL_EVAL + keyword = argument[:index - 1] + else: # keyword, text + mode = ParsingMode.PLAIN_TEXT + keyword = argument[:index] + value = argument[index + 1:] + else: # no keyword, text + mode = ParsingMode.PLAIN_TEXT + value = argument + + if value.startswith('@'): + try: + with open(value[1:], 'r') as file: + value = file.read() + except FileNotFoundError: + raise click.FileError(value[1:], 'Not found') + + if mode == ParsingMode.JSON: # json + try: + value = loads(value) + except JSONDecodeError: + raise click.BadParameter('Unable to parse %s as JSON.' % (keyword or '%s. non keyword argument' % arg_pos)) + elif mode == ParsingMode.LITERAL_EVAL: # literal_eval + try: + value = literal_eval(value) + except Exception: + raise click.BadParameter('Unable to eval %s as Python object. See ' + 'https://docs.python.org/3/library/ast.html#ast.literal_eval' + % (keyword or '%s. non keyword argument' % arg_pos)) + + return keyword, value + + +def parse_function_args(arguments): + args = [] + kwargs = {} + + for argument in arguments: + keyword, value = parse_function_arg(argument, len(args) + 1) + if keyword is not None: + if keyword in kwargs: + raise click.BadParameter('You can\'t specify multiple values for the same keyword.') + kwargs[keyword] = value + else: + args.append(value) + return args, kwargs + + +def parse_schedule(schedule_in, schedule_at): + if schedule_in is not None: + if schedule_at is not None: + raise click.BadArgumentUsage('You can\'t specify both --schedule-in and --schedule-at') + return datetime.now(timezone.utc) + timedelta(seconds=parse_timeout(schedule_in)) + elif schedule_at is not None: + return datetime.strptime(schedule_at, '%Y-%m-%dT%H:%M:%S') + + class CliConfig: """A helper class to be used with click commands, to handle shared options""" def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS, diff --git a/tests/test.json b/tests/test.json new file mode 100644 index 0000000..7ae9fec --- /dev/null +++ b/tests/test.json @@ -0,0 +1,3 @@ +{ + "test": true +} diff --git a/tests/test_cli.py b/tests/test_cli.py index ee4756a..e7f94fd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,20 +2,24 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +from time import sleep +from uuid import uuid4 import os +import json from click.testing import CliRunner from redis import Redis from rq import Queue from rq.cli import main -from rq.cli.helpers import read_config_file, CliConfig +from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule from rq.job import Job from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.serializers import JSONSerializer from rq.worker import Worker, WorkerStatus +from rq.scheduler import RQScheduler import pytest @@ -369,3 +373,287 @@ class TestRQCli(RQTestCase): runner.invoke(main, ['worker', '-u', self.redis_url, '--serializer rq.serializer.JSONSerializer']) self.assertIn(job.id, q.job_ids) + + def test_cli_enqueue(self): + """rq enqueue -u tests.fixtures.say_hello""" + queue = Queue(connection=self.connection) + self.assertTrue(queue.is_empty()) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello']) + self.assert_normal_execution(result) + + prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' + suffix = '\'.\n' + + print(result.stdout) + + self.assertTrue(result.stdout.startswith(prefix)) + self.assertTrue(result.stdout.endswith(suffix)) + + job_id = result.stdout[len(prefix):-len(suffix)] + queue_key = 'rq:queue:default' + self.assertEqual(self.connection.llen(queue_key), 1) + self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) + + worker = Worker(queue) + worker.work(True) + self.assertEqual(Job(job_id).result, 'Hi there, Stranger!') + + def test_cli_enqueue_args(self): + """rq enqueue -u tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def""" + queue = Queue(connection=self.connection) + self.assertTrue(queue.is_empty()) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'hello', + ':[1, {"key": "value"}]', ':@tests/test.json', '%1, 2', 'json:=[3.0, true]', + 'nojson=abc', 'file=@tests/test.json']) + self.assert_normal_execution(result) + + job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii') + + worker = Worker(queue) + worker.work(True) + + args, kwargs = Job(job_id).result + + self.assertEqual(args, ('hello', [1, {'key': 'value'}], {"test": True}, (1, 2))) + self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{\n "test": true\n}\n'}) + + def test_cli_enqueue_schedule_in(self): + """rq enqueue -u tests.fixtures.say_hello --schedule-in 1s""" + queue = Queue(connection=self.connection) + registry = ScheduledJobRegistry(queue=queue) + worker = Worker(queue) + scheduler = RQScheduler(queue, self.connection) + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 0) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', + '--schedule-in', '10s']) + self.assert_normal_execution(result) + + scheduler.acquire_locks() + scheduler.enqueue_scheduled_jobs() + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 1) + + self.assertFalse(worker.work(True)) + + sleep(11) + + scheduler.enqueue_scheduled_jobs() + + self.assertTrue(len(queue) == 1) + self.assertTrue(len(registry) == 0) + + self.assertTrue(worker.work(True)) + + def test_cli_enqueue_schedule_at(self): + """ + rq enqueue -u tests.fixtures.say_hello --schedule-at 2021-01-01T00:00:00 + + rq enqueue -u tests.fixtures.say_hello --schedule-at 2100-01-01T00:00:00 + """ + queue = Queue(connection=self.connection) + registry = ScheduledJobRegistry(queue=queue) + worker = Worker(queue) + scheduler = RQScheduler(queue, self.connection) + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 0) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', + '--schedule-at', '2021-01-01T00:00:00']) + self.assert_normal_execution(result) + + scheduler.acquire_locks() + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 1) + + scheduler.enqueue_scheduled_jobs() + + self.assertTrue(len(queue) == 1) + self.assertTrue(len(registry) == 0) + + self.assertTrue(worker.work(True)) + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 0) + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', + '--schedule-at', '2100-01-01T00:00:00']) + self.assert_normal_execution(result) + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 1) + + scheduler.enqueue_scheduled_jobs() + + self.assertTrue(len(queue) == 0) + self.assertTrue(len(registry) == 1) + + self.assertFalse(worker.work(True)) + + def test_cli_enqueue_retry(self): + """rq enqueue -u tests.fixtures.say_hello --retry-max 3 --retry-interval 10 --retry-interval 20 + --retry-interval 40""" + queue = Queue(connection=self.connection) + self.assertTrue(queue.is_empty()) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--retry-max', '3', + '--retry-interval', '10', '--retry-interval', '20', '--retry-interval', '40']) + self.assert_normal_execution(result) + + job = Job.fetch(self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), + connection=self.connection) + + self.assertEqual(job.retries_left, 3) + self.assertEqual(job.retry_intervals, [10, 20, 40]) + + def test_cli_enqueue_errors(self): + """ + rq enqueue -u tests.fixtures.echo :invalid_json + + rq enqueue -u tests.fixtures.echo %invalid_eval_statement + + rq enqueue -u tests.fixtures.echo key=value key=value + + rq enqueue -u tests.fixtures.echo --schedule-in 1s --schedule-at 2000-01-01T00:00:00 + + rq enqueue -u tests.fixtures.echo @not_existing_file + """ + runner = CliRunner() + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', ':invalid_json']) + self.assertNotEqual(result.exit_code, 0) + self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output) + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', + '%invalid_eval_statement']) + self.assertNotEqual(result.exit_code, 0) + self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output) + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value']) + self.assertNotEqual(result.exit_code, 0) + self.assertIn('You can\'t specify multiple values for the same keyword.', result.output) + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '--schedule-in', '1s', + '--schedule-at', '2000-01-01T00:00:00']) + self.assertNotEqual(result.exit_code, 0) + self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output) + + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file']) + self.assertNotEqual(result.exit_code, 0) + self.assertIn('Not found', result.output) + + def test_parse_schedule(self): + """executes the rq.cli.helpers.parse_schedule function""" + self.assertEqual(parse_schedule(None, '2000-01-23T23:45:01'), datetime(2000, 1, 23, 23, 45, 1)) + + start = datetime.now(timezone.utc) + timedelta(minutes=5) + middle = parse_schedule('5m', None) + end = datetime.now(timezone.utc) + timedelta(minutes=5) + + self.assertGreater(middle, start) + self.assertLess(middle, end) + + def test_parse_function_arg(self): + """executes the rq.cli.helpers.parse_function_arg function""" + self.assertEqual(parse_function_arg('abc', 0), (None, 'abc')) + self.assertEqual(parse_function_arg(':{"json": true}', 1), (None, {'json': True})) + self.assertEqual(parse_function_arg('%1, 2', 2), (None, (1, 2))) + self.assertEqual(parse_function_arg('key=value', 3), ('key', 'value')) + self.assertEqual(parse_function_arg('jsonkey:=["json", "value"]', 4), ('jsonkey', ['json', 'value'])) + self.assertEqual(parse_function_arg('evalkey%=1.2', 5), ('evalkey', 1.2)) + self.assertEqual(parse_function_arg(':@tests/test.json', 6), (None, {'test': True})) + self.assertEqual(parse_function_arg('@tests/test.json', 7), (None, '{\n "test": true\n}\n')) + + def test_cli_enqueue_doc_test(self): + """tests the examples of the documentation""" + runner = CliRunner() + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), (['abc'], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%1, 2']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([(1, 2)], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%None']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([None], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%True']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([True], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {})) + + id = str(uuid4()) + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json']) + self.assert_normal_execution(result) + job = Job.fetch(id) + self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())}))