Replace 'async' keyword with 'is_async' for Queue objects (#977)

* Replaced async keyword with is_async in the Queue class to fix reserved keyword syntax errors in Python 3.7

* Updated tests to use is_async keyword when instantiating Queue objects

* Updated docs to reference is_async keyword for Queue objects

* Updated tox.ini, setup.py and .travis.yml with references to Python 3.7
main
chevell 7 years ago committed by Selwin Ong
parent 9c32a80d11
commit c2b939d2df

@ -8,7 +8,7 @@ python:
- "3.4" - "3.4"
- "3.5" - "3.5"
- "3.6" - "3.6"
- "3.6-dev" - "3.7-dev"
- "pypy" - "pypy"
install: install:
- pip install -e . - pip install -e .

@ -65,7 +65,7 @@ job function.
and marked as `failed`. Its default unit is second and it can be an integer or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second(e.g. `'1h'`, `'3m'`, `'5s'`). and marked as `failed`. Its default unit is second and it can be an integer or a string representing an integer(e.g. `2`, `'2'`). Furthermore, it can be a string with specify unit including hour, minute, second(e.g. `'1h'`, `'3m'`, `'5s'`).
* `result_ttl` specifies the expiry time of the key where the job result will * `result_ttl` specifies the expiry time of the key where the job result will
be stored be stored
* `ttl` specifies the maximum queued time of the job before it'll be cancelled. * `ttl` specifies the maximum queued time of the job before it'll be cancelled.
If you specify a value of `-1` you indicate an infinite job ttl and it will run indefinitely If you specify a value of `-1` you indicate an infinite job ttl and it will run indefinitely
* `depends_on` specifies another job (or job id) that must complete before this * `depends_on` specifies another job (or job id) that must complete before this
job will be queued job will be queued
@ -104,7 +104,7 @@ from rq import Queue
from redis import Redis from redis import Redis
redis_conn = Redis() redis_conn = Redis()
q = Queue(connection=redis_conn) q = Queue(connection=redis_conn)
# Getting the number of jobs in the queue # Getting the number of jobs in the queue
print len(q) print len(q)
@ -168,10 +168,10 @@ print job.result
For testing purposes, you can enqueue jobs without delegating the actual For testing purposes, you can enqueue jobs without delegating the actual
execution to a worker (available since version 0.3.1). To do this, pass the execution to a worker (available since version 0.3.1). To do this, pass the
`async=False` argument into the Queue constructor: `is_async=False` argument into the Queue constructor:
{% highlight pycon %} {% highlight pycon %}
>>> q = Queue('low', async=False, connection=my_redis_conn) >>> q = Queue('low', is_async=False, connection=my_redis_conn)
>>> job = q.enqueue(fib, 8) >>> job = q.enqueue(fib, 8)
>>> job.result >>> job.result
21 21

@ -5,7 +5,7 @@ layout: docs
## Workers inside unit tests ## Workers inside unit tests
You may wish to include your RQ tasks inside unit tests. However many frameworks (such as Django) use in-memory databases which do not play nicely with the default `fork()` behaviour of RQ. You may wish to include your RQ tasks inside unit tests. However many frameworks (such as Django) use in-memory databases which do not play nicely with the default `fork()` behaviour of RQ.
Therefore, you must use the SimpleWorker class to avoid fork(); Therefore, you must use the SimpleWorker class to avoid fork();
@ -23,19 +23,19 @@ worker.work(burst=True) # Runs enqueued job
## Running Jobs in unit tests ## Running Jobs in unit tests
Another solution for testing purposes is to use the `async=False` queue Another solution for testing purposes is to use the `is_async=False` queue
parameter, that instructs it to instantly perform the job in the same parameter, that instructs it to instantly perform the job in the same
thread instead of dispatching it to the workers. Workers are not required thread instead of dispatching it to the workers. Workers are not required
anymore. anymore.
Additionally, we can use fakeredis to mock a redis instance, so we don't have to Additionally, we can use fakeredis to mock a redis instance, so we don't have to
run a redis server separately. The instance of the fake redis server can run a redis server separately. The instance of the fake redis server can
be directly passed as the connection argument to the queue: be directly passed as the connection argument to the queue:
{% highlight python %} {% highlight python %}
from fakeredis import FakeStrictRedis from fakeredis import FakeStrictRedis
from rq import Queue from rq import Queue
queue = Queue(async=False, connection=FakeStrictRedis()) queue = Queue(is_async=False, connection=FakeStrictRedis())
job = queue.enqueue(my_long_running_job) job = queue.enqueue(my_long_running_job)
assert job.is_finished assert job.is_finished
{% endhighlight %} {% endhighlight %}

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import uuid import uuid
import warnings
from redis import WatchError from redis import WatchError
@ -58,13 +59,17 @@ class Queue(object):
return cls(name, connection=connection, job_class=job_class) return cls(name, connection=connection, job_class=job_class)
def __init__(self, name='default', default_timeout=None, connection=None, def __init__(self, name='default', default_timeout=None, connection=None,
async=True, job_class=None): is_async=True, job_class=None, **kwargs):
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = '{0}{1}'.format(prefix, name) self._key = '{0}{1}'.format(prefix, name)
self._default_timeout = parse_timeout(default_timeout) self._default_timeout = parse_timeout(default_timeout)
self._async = async self._is_async = is_async
if 'async' in kwargs:
self._is_async = kwargs['async']
warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
# override class attribute job_class if one was passed # override class attribute job_class if one was passed
if job_class is not None: if job_class is not None:
@ -303,7 +308,7 @@ class Queue(object):
def enqueue_job(self, job, pipeline=None, at_front=False): def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with is_async=False, job is executed immediately.
""" """
pipe = pipeline if pipeline is not None else self.connection._pipeline() pipe = pipeline if pipeline is not None else self.connection._pipeline()
@ -319,13 +324,13 @@ class Queue(object):
job.save(pipeline=pipe) job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe) job.cleanup(ttl=job.ttl, pipeline=pipe)
if self._async: if self._is_async:
self.push_job_id(job.id, pipeline=pipe, at_front=at_front) self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
if pipeline is None: if pipeline is None:
pipe.execute() pipe.execute()
if not self._async: if not self._is_async:
job = self.run_job(job) job = self.run_job(job)
return job return job

@ -72,6 +72,7 @@ setup(
'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Internet', 'Topic :: Internet',
'Topic :: Scientific/Engineering', 'Topic :: Scientific/Engineering',

@ -389,17 +389,17 @@ class TestJob(RQTestCase):
assert get_failed_queue(self.testconn).count == 0 assert get_failed_queue(self.testconn).count == 0
def test_job_access_within_synchronous_job_function(self): def test_job_access_within_synchronous_job_function(self):
queue = Queue(async=False) queue = Queue(is_async=False)
queue.enqueue(fixtures.access_self) queue.enqueue(fixtures.access_self)
def test_job_async_status_finished(self): def test_job_async_status_finished(self):
queue = Queue(async=False) queue = Queue(is_async=False)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
self.assertEqual(job.result, 'Hi there, Stranger!') self.assertEqual(job.result, 'Hi there, Stranger!')
self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_enqueue_job_async_status_finished(self): def test_enqueue_job_async_status_finished(self):
queue = Queue(async=False) queue = Queue(is_async=False)
job = Job.create(func=fixtures.say_hello) job = Job.create(func=fixtures.say_hello)
job = queue.enqueue_job(job) job = queue.enqueue_job(job)
self.assertEqual(job.result, 'Hi there, Stranger!') self.assertEqual(job.result, 'Hi there, Stranger!')

@ -645,8 +645,8 @@ class TestFailedQueue(RQTestCase):
self.assertEqual(int(job_from_queue.result_ttl), 10) self.assertEqual(int(job_from_queue.result_ttl), 10)
def test_async_false(self): def test_async_false(self):
"""Job executes and cleaned up immediately if async=False.""" """Job executes and cleaned up immediately if is_async=False."""
q = Queue(async=False) q = Queue(is_async=False)
job = q.enqueue(some_calculation, args=(2, 3)) job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6) self.assertEqual(job.return_value, 6)
self.assertNotEqual(self.testconn.ttl(job.key), -1) self.assertNotEqual(self.testconn.ttl(job.key), -1)

@ -1,5 +1,5 @@
[tox] [tox]
envlist=py26,py27,py33,py34,py35,pypy,flake8 envlist=py26,py27,py33,py34,py35,py36,py37,pypy,flake8
[testenv] [testenv]
commands=py.test --cov rq --durations=5 {posargs} commands=py.test --cov rq --durations=5 {posargs}

Loading…
Cancel
Save