Merge branch 'master' into 739_fix_race_condition

main
Stefan Hammer 8 years ago
commit f9bff3d12b

2
.gitignore vendored

@ -11,3 +11,5 @@
.vagrant .vagrant
Vagrantfile Vagrantfile
.idea/ .idea/
.coverage.*
/.cache

@ -1,6 +1,16 @@
### 0.7.0 ### 0.7.0
- ... - Better support for Heroku workers (#584, #715)
- Support for connecting using a custom connection class (#741)
- Fix: connection stack in default worker (#479, #641)
- Fix: `fetch_job` now checks that a job requested actually comes from the
intended queue (#728, #733)
- Fix: Properly raise exception if a job dependency does not exist (#747)
- Fix: Job status not updated when horse dies unexpectedly (#710)
- Fix: `request_force_stop_sigrtmin` failing for Python 3 (#727)
- Fix `Job.cancel()` method on failed queue (#707)
- Python 3.5 compatibility improvements (#729)
- Improved signal name lookup (#722)
### 0.6.0 ### 0.6.0

@ -37,12 +37,12 @@ config_option = click.option('--config', '-c',
help='Module containing RQ settings.') help='Module containing RQ settings.')
def connect(url, config=None): def connect(url, config=None, connection_class=StrictRedis):
if url: if url:
return StrictRedis.from_url(url) return connection_class.from_url(url)
settings = read_config_file(config) if config else {} settings = read_config_file(config) if config else {}
return get_redis_from_config(settings) return get_redis_from_config(settings, connection_class)
@click.group() @click.group()
@ -143,6 +143,7 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') @click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use') @click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use') @click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', type=int, help='Default results timeout to be used') @click.option('--results-ttl', type=int, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used') @click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@ -152,8 +153,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl,
verbose, quiet, sentry_dsn, exception_handler, pid, queues): worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues):
"""Starts an RQ worker.""" """Starts an RQ worker."""
if path: if path:
@ -170,7 +171,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
setup_loghandlers_from_args(verbose, quiet) setup_loghandlers_from_args(verbose, quiet)
conn = connect(url, config) connection_class = import_attribute(connection_class)
conn = connect(url, config, connection_class)
cleanup_ghosts(conn) cleanup_ghosts(conn)
worker_class = import_attribute(worker_class) worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class) queue_class = import_attribute(queue_class)

@ -26,10 +26,10 @@ def read_config_file(module):
if k.upper() == k]) if k.upper() == k])
def get_redis_from_config(settings): def get_redis_from_config(settings, connection_class=StrictRedis):
"""Returns a StrictRedis instance from a dictionary of settings.""" """Returns a StrictRedis instance from a dictionary of settings."""
if settings.get('REDIS_URL') is not None: if settings.get('REDIS_URL') is not None:
return StrictRedis.from_url(settings['REDIS_URL']) return connection_class.from_url(settings['REDIS_URL'])
kwargs = { kwargs = {
'host': settings.get('REDIS_HOST', 'localhost'), 'host': settings.get('REDIS_HOST', 'localhost'),
@ -52,7 +52,7 @@ def get_redis_from_config(settings):
if not version_info >= (2, 10): if not version_info >= (2, 10):
raise RuntimeError('Using SSL requires a redis-py version >= 2.10') raise RuntimeError('Using SSL requires a redis-py version >= 2.10')
kwargs['ssl'] = use_ssl kwargs['ssl'] = use_ssl
return StrictRedis(**kwargs) return connection_class(**kwargs)
def pad(s, pad_to_length): def pad(s, pad_to_length):

@ -7,6 +7,10 @@ class NoSuchJobError(Exception):
pass pass
class InvalidJobDependency(Exception):
pass
class InvalidJobOperationError(Exception): class InvalidJobOperationError(Exception):
pass pass

@ -579,4 +579,5 @@ class Job(object):
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -9,8 +9,8 @@ from redis import WatchError
from .compat import as_text, string_types, total_ordering from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobDependency,
NoSuchJobError, UnpickleError) InvalidJobOperationError, NoSuchJobError, UnpickleError)
from .job import Job, JobStatus from .job import Job, JobStatus
from .utils import import_attribute, utcnow from .utils import import_attribute, utcnow
@ -107,9 +107,12 @@ class Queue(object):
def fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
return self.job_class.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
else:
if job.origin == self.name:
return job
def get_job_ids(self, offset=0, length=-1): def get_job_ids(self, offset=0, length=-1):
"""Returns a slice of job IDs in the queue.""" """Returns a slice of job IDs in the queue."""
@ -202,6 +205,13 @@ class Queue(object):
while True: while True:
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
# If the dependency does not exist, we raise an
# exception. So the caller is able to avoid an orphaned
# job.
if not self.job_class.exists(depends_on.id):
raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id))
if depends_on.get_status() != JobStatus.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
pipe.multi() pipe.multi()
job.set_status(JobStatus.DEFERRED) job.set_status(JobStatus.DEFERRED)

@ -1,67 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import optparse
from rq import dummy, Queue, use_connection
def parse_args():
parser = optparse.OptionParser()
parser.add_option('-n', '--count', type='int', dest='count', default=1)
opts, args = parser.parse_args()
return (opts, args, parser)
def main():
import sys
sys.path.insert(0, '.')
opts, args, parser = parse_args()
use_connection()
queues = ('default', 'high', 'low')
sample_calls = [
(dummy.do_nothing, [], {}),
(dummy.sleep, [1], {}),
(dummy.fib, [8], {}), # normal result
(dummy.fib, [24], {}), # takes pretty long
(dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc
(dummy.random_failure, [], {}), # simulate random failure (handy for requeue testing)
]
for i in range(opts.count):
import random
f, args, kwargs = random.choice(sample_calls)
q = Queue(random.choice(queues))
q.enqueue(f, *args, **kwargs)
# q = Queue('foo')
# q.enqueue(do_nothing)
# q.enqueue(sleep, 3)
# q = Queue('bar')
# q.enqueue(yield_stuff)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
# q.enqueue(do_nothing)
if __name__ == '__main__':
main()

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '0.7.0dev0' VERSION = '0.7.0'

@ -57,6 +57,7 @@ def iterable(x):
def compact(l): def compact(l):
return [x for x in l if x is not None] return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame) _signames = dict((getattr(signal, signame), signame)
for signame in dir(signal) for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame) if signame.startswith('SIG') and '_' not in signame)
@ -540,7 +541,7 @@ class Worker(object):
job=job job=job
) )
#Unhandled failure: move the job to the failed queue # Unhandled failure: move the job to the failed queue
self.log.warning( self.log.warning(
'Moving job to {0!r} queue'.format( 'Moving job to {0!r} queue'.format(
self.failed_queue.name self.failed_queue.name
@ -843,7 +844,7 @@ class HerokuWorker(Worker):
self.request_force_stop_sigrtmin(signum, frame) self.request_force_stop_sigrtmin(signum, frame)
else: else:
self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds',
self.imminent_shutdown_delay) self.imminent_shutdown_delay)
signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay) signal.alarm(self.imminent_shutdown_delay)

@ -7,7 +7,7 @@ from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation) some_calculation)
from rq import get_failed_queue, Queue from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobDependency, InvalidJobOperationError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
@ -401,6 +401,7 @@ class TestQueue(RQTestCase):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
@ -417,6 +418,7 @@ class TestQueue(RQTestCase):
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Can specify job dependency with job object or job id.""" """Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id) q.enqueue_call(say_hello, depends_on=parent_job.id)
@ -433,6 +435,7 @@ class TestQueue(RQTestCase):
"""Jobs remember their timeout when enqueued as a dependency.""" """Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
@ -445,6 +448,46 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123) self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_invalid_dependency(self):
"""Enqueuing a job fails, if the dependency does not exist at all."""
parent_job = Job.create(func=say_hello)
# without save() the job is not visible to others
q = Queue()
with self.assertRaises(InvalidJobDependency):
q.enqueue_call(say_hello, depends_on=parent_job)
with self.assertRaises(InvalidJobDependency):
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
def test_fetch_job_successful(self):
"""Fetch a job from a queue."""
q = Queue('example')
job_orig = q.enqueue(say_hello)
job_fetch = q.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
self.assertEqual(job_orig.id, job_fetch.id)
self.assertEqual(job_orig.description, job_fetch.description)
def test_fetch_job_missing(self):
"""Fetch a job from a queue which doesn't exist."""
q = Queue('example')
job = q.fetch_job('123')
self.assertIsNone(job)
def test_fetch_job_different_queue(self):
"""Fetch a job from a queue which is in a different queue."""
q1 = Queue('example1')
q2 = Queue('example2')
job_orig = q1.enqueue(say_hello)
job_fetch = q2.fetch_job(job_orig.id)
self.assertIsNone(job_fetch)
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):

@ -613,6 +613,7 @@ def kill_worker(pid, double_kill):
time.sleep(0.5) time.sleep(0.5)
os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGTERM)
def wait_and_kill_work_horse(pid, time_to_wait=0.0): def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait) time.sleep(time_to_wait)
os.kill(pid, signal.SIGKILL) os.kill(pid, signal.SIGKILL)
@ -714,6 +715,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertEqual(failed_q.count, 1) self.assertEqual(failed_q.count, 1)
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
def schedule_access_self(): def schedule_access_self():
q = Queue('default', connection=get_current_connection()) q = Queue('default', connection=get_current_connection())
q.enqueue(access_self) q.enqueue(access_self)

Loading…
Cancel
Save