Multidependencies (#1397)

* Also accept lists and tuples as value of `depends_on`.

* The elements of the lists/tuples may be either Jobs or Job IDs.
* A single Job / Job ID is still accepted as well.

* Represent _all_ job dependencies in `Job.to_dict()`.

We now represent the entire list, instead of just the first element.

* Fix some doctext regarding plurality of dependencies.

* Add unit tests for job dependencies.

* One unit test establishes a pattern for checking execution order as affected by dependencies.

* Another unit test applies this pattern to the new capability to name multiple dependencies.

* Add unit test for new `depends_on` input formats.

Also test that these are properly persisted.

* Repair `Job.restore()`.

Need to convert bytes back to strings when reloading `dependency_ids`.

* Maintain backwards compat. in `Job.to_dict()`.

Keep the old `dependency_id` (singular) key.

* Provide coverage for new test fixture.

* Simplify some code.

Cut some superfluous `as_text()` calls left over from an earlier commit.

* Check for `dependency_id` in `Job.restore()` for backwd. compat.

Also eliminate use of `as_text()` here, in favor of `.decode()`.

* Switch to snake case instead of camel case.

* Eliminate some usages of `as_text()`.

Also cut some `print` statements.

* Cleanup.

* Accept arbitrary iterables for `Job`'s `depends_on` kwarg.

Instead of requiring a list or tuple, we now make use of `ensure_list()`.

* Add test fixtures.

* Provide a system to get two workers working simultaneously, using `multiprocessing`.
* Define a simple job that just says whether its dependencies are met.
* In `rpush`, make an option to record the name of the worker.

* Improve unit tests on execution order with dependencies.

These now actually have two workers going, which makes a more thorough test.

* Add unit test examining `Job.dependencies_are_met()` at execution time.

* Redesign dependency execution order unit tests.

* Simplify assertions.

* Improve doctext and formatting.

* Move fixture tests to new, dedicated module `test_fixtures.py`.

* Use `enqueue` instead of `enqueue_call` in new unit tests.
main
skieffer 4 years ago committed by GitHub
parent 617b18a496
commit 59d1b40d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,7 +19,7 @@ from .exceptions import NoSuchJobError
from .local import LocalStack from .local import LocalStack
from .serializers import resolve_serializer from .serializers import resolve_serializer
from .utils import (enum, get_version, import_attribute, parse_timeout, str_to_date, from .utils import (enum, get_version, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow) utcformat, utcnow, ensure_list)
# Serialize pickle dumps using the highest pickle protocol (binary, default # Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii) # uses ascii)
@ -126,9 +126,10 @@ class Job(object):
job._status = status job._status = status
job.meta = meta or {} job.meta = meta or {}
# dependency could be job instance or id # dependency could be job instance or id, or iterable thereof
if depends_on is not None: if depends_on is not None:
job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] job._dependency_ids = [dep.id if isinstance(dep, Job) else dep
for dep in ensure_list(depends_on)]
return job return job
def get_position(self): def get_position(self):
@ -175,7 +176,7 @@ class Job(object):
@property @property
def _dependency_id(self): def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present """Returns the first item in self._dependency_ids. Present to
preserve compatibility with third party packages.. preserve compatibility with third party packages..
""" """
if self._dependency_ids: if self._dependency_ids:
@ -183,7 +184,7 @@ class Job(object):
@property @property
def dependency(self): def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache """Returns a job's first dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency. job.dependency as job._dependency.
""" """
if not self._dependency_ids: if not self._dependency_ids:
@ -498,8 +499,10 @@ class Job(object):
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = obj.get('status').decode() if obj.get('status') else None self._status = obj.get('status').decode() if obj.get('status') else None
dependency_id = obj.get('dependency_id', None) dep_ids = obj.get('dependency_ids')
self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = ( json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [] )
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@ -571,7 +574,8 @@ class Job(object):
if self._status is not None: if self._status is not None:
obj['status'] = self._status obj['status'] = self._status
if self._dependency_ids: if self._dependency_ids:
obj['dependency_id'] = self._dependency_ids[0] obj['dependency_id'] = self._dependency_ids[0] # for backwards compatibility
obj['dependency_ids'] = json.dumps(self._dependency_ids)
if self.meta and include_meta: if self.meta and include_meta:
obj['meta'] = self.serializer.dumps(self.meta) obj['meta'] = self.serializer.dumps(self.meta)
if self.ttl: if self.ttl:
@ -786,14 +790,14 @@ class Job(object):
return self.retry_intervals[index] return self.retry_intervals[index]
def register_dependency(self, pipeline=None): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the jobs they
depend on is successfully performed. We record this relation as depend on are successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something a reverse dependency (a Redis set), with a key that looks something
like: like:
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the job in its dependency's dependents set This method adds the job in its dependencies' dependents sets,
and adds the job to DeferredJobRegistry. and adds the job to DeferredJobRegistry.
""" """
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry

@ -11,11 +11,14 @@ import time
import signal import signal
import sys import sys
import subprocess import subprocess
import contextlib
from multiprocessing import Process
from redis import Redis
from rq import Connection, get_current_job, get_current_connection, Queue from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job from rq.decorators import job
from rq.compat import text_type from rq.compat import text_type
from rq.worker import HerokuWorker from rq.worker import HerokuWorker, Worker
def say_pid(): def say_pid():
@ -55,6 +58,18 @@ def some_calculation(x, y, z=1):
""" """
return x * y / z return x * y / z
def rpush(key, value, append_worker_name=False, sleep=0):
"""Push a value into a list in Redis. Useful for detecting the order in
which jobs were executed."""
if sleep:
time.sleep(sleep)
if append_worker_name:
value += ':' + get_current_job().worker_name
redis = get_current_connection()
redis.rpush(key, value)
def check_dependencies_are_met():
return get_current_job().dependencies_are_met()
def create_file(path): def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the """Creates a file at the given path. Actually, leaves evidence that the
@ -196,3 +211,45 @@ class Serializer(object):
def dumps(self): pass def dumps(self): pass
def start_worker(queue_name, conn_kwargs, worker_name, burst):
"""
Start a worker. We accept only serializable args, so that this can be
executed via multiprocessing.
"""
# Silence stdout (thanks to <https://stackoverflow.com/a/28321717/14153673>)
with open(os.devnull, 'w') as devnull:
with contextlib.redirect_stdout(devnull):
w = Worker([queue_name], name=worker_name, connection=Redis(**conn_kwargs))
w.work(burst=burst)
def start_worker_process(queue_name, connection=None, worker_name=None, burst=False):
"""
Use multiprocessing to start a new worker in a separate process.
"""
connection = connection or get_current_connection()
conn_kwargs = connection.connection_pool.connection_kwargs
p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst))
p.start()
return p
def burst_two_workers(queue, timeout=2, tries=5, pause=0.1):
"""
Get two workers working simultaneously in burst mode, on a given queue.
Return after both workers have finished handling jobs, up to a fixed timeout
on the worker that runs in another process.
"""
w1 = start_worker_process(queue.name, worker_name='w1', burst=True)
w2 = Worker(queue, name='w2')
jobs = queue.jobs
if jobs:
first_job = jobs[0]
# Give the first worker process time to get started on the first job.
# This is helpful in tests where we want to control which worker takes which job.
n = 0
while n < tries and not first_job.is_started:
time.sleep(pause)
n += 1
# Now can start the second worker.
w2.work(burst=True)
w1.join(timeout)

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from rq import Queue
from tests import RQTestCase, fixtures
class TestFixtures(RQTestCase):
def test_rpush_fixture(self):
fixtures.rpush('foo', 'bar')
assert self.testconn.lrange('foo', 0, 0)[0].decode() == 'bar'
def test_start_worker_fixture(self):
queue = Queue(name='testing', connection=self.testconn)
queue.enqueue(fixtures.say_hello)
conn_kwargs = self.testconn.connection_pool.connection_kwargs
fixtures.start_worker(queue.name, conn_kwargs, 'w1', True)
assert not queue.jobs

@ -432,6 +432,35 @@ class TestJob(RQTestCase):
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_multiple_dependencies_are_accepted_and_persisted(self):
"""Ensure job._dependency_ids accepts different input formats, and
is set and restored properly"""
job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id="A")
job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id="B")
# No dependencies
job = Job.create(func=fixtures.say_hello)
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job._dependency_ids, [])
# Various ways of specifying dependencies
cases = [
["A", ["A"]],
[job_A, ["A"]],
[["A", "B"], ["A", "B"]],
[[job_A, job_B], ["A", "B"]],
[["A", job_B], ["A", "B"]],
[("A", "B"), ["A", "B"]],
[(job_A, job_B), ["A", "B"]],
[(job_A, "B"), ["A", "B"]],
]
for given, expected in cases:
job = Job.create(func=fixtures.say_hello, depends_on=given)
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job._dependency_ids, expected)
def test_prepare_for_execution(self): def test_prepare_for_execution(self):
"""job.prepare_for_execution works properly""" """job.prepare_for_execution works properly"""
job = Job.create(func=fixtures.say_hello) job = Job.create(func=fixtures.say_hello)
@ -944,6 +973,78 @@ class TestJob(RQTestCase):
assert dependent_job.dependencies_are_met() assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED assert dependent_job.get_status() == JobStatus.QUEUED
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="A")
queue.enqueue(fixtures.say_hello, job_id="B")
job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
w = Worker([queue])
w.work(burst=True)
assert job_C.result
def test_execution_order_with_sole_dependency(self):
queue = Queue(connection=self.testconn)
key = 'test_job:job_order'
# When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
# Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs.
job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True])
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
fixtures.burst_two_workers(queue)
time.sleep(0.75)
jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
self.assertEqual(jobs_completed, ["A:w2", "B:w2", "slow:w1"])
self.testconn.delete(key)
# When job "A" depends on the slow job, then job "B" finishes before "A".
# There is no clear requirement on which worker should take job "A", so we stay silent on that.
job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on='slow_job')
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
fixtures.burst_two_workers(queue)
time.sleep(0.75)
jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
self.assertEqual(jobs_completed, ["B:w2", "slow:w1", "A"])
def test_execution_order_with_dual_dependency(self):
queue = Queue(connection=self.testconn)
key = 'test_job:job_order'
# When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True])
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
fixtures.burst_two_workers(queue)
time.sleep(1)
jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "A:w1", "B:w1", "slow_2:w2"])
self.testconn.delete(key)
# This time job "A" depends on two slow jobs, while job "B" depends only on the faster of
# the two. Job "B" should be completed before job "A".
# There is no clear requirement on which worker should take job "A", so we stay silent on that.
job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1')
job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2')
job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on=['slow_1', 'slow_2'])
job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True], depends_on=['slow_1'])
fixtures.burst_two_workers(queue)
time.sleep(1)
jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)]
self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"])
def test_retry(self): def test_retry(self):
"""Retry parses `max` and `interval` correctly""" """Retry parses `max` and `interval` correctly"""
retry = Retry(max=1) retry = Retry(max=1)

Loading…
Cancel
Save