Skip to content

Commit

Permalink
Add integration tests for deadlock detection
Browse files Browse the repository at this point in the history
And fix bugs with the C++ implementation.
  • Loading branch information
maarten-ic committed Aug 19, 2024
1 parent 45a6cd7 commit f3f8e31
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 4 deletions.
15 changes: 12 additions & 3 deletions integration_test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ def _python_wrapper(instance_name, muscle_manager, callable):
callable()


def run_manager_with_actors(ymmsl_text, tmpdir, actors):
def run_manager_with_actors(ymmsl_text, tmpdir, actors, expect_success=True):
"""Start muscle_manager along with C++ and python actors.
Args:
ymmsl_text: YMMSL configuration for the simulation
tmpdir: Temporary folder to use as runpath a
actors: a dictionary of lists containing details for each actor:
``{"instance_name": ("language", "details", ...)}``.
Expand Down Expand Up @@ -155,10 +157,17 @@ def run_manager_with_actors(ymmsl_text, tmpdir, actors):
# check results
for proc in native_processes:
proc.wait()
assert proc.returncode == 0
if expect_success:
assert proc.returncode == 0
for proc in python_processes:
proc.join()
assert proc.exitcode == 0
if expect_success:
assert proc.exitcode == 0
if not expect_success:
# Check that at least one process has failed
assert (
any(proc.returncode != 0 for proc in native_processes) or
any(proc.exitcode != 0 for proc in python_processes))


@pytest.fixture
Expand Down
184 changes: 184 additions & 0 deletions integration_test/test_deadlock_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import functools
import sys

from ymmsl import Operator

from libmuscle import Instance, Message

from .conftest import skip_if_python_only, run_manager_with_actors


def suppress_deadlock_exception_output(func):
@functools.wraps(func)
def wrapper():
try:
func()
except RuntimeError as exc:
exc_str = str(exc).lower()
if "deadlock" not in exc_str and "did the peer crash?" not in exc_str:
raise
sys.exit(1)
return wrapper


@suppress_deadlock_exception_output
def deadlocking_micro():
instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]})

counter = 5 # Deadlock after 5 iterations
while instance.reuse_instance():
message = instance.receive("in")
counter -= 1
if counter > 0:
instance.send("out", message)


@suppress_deadlock_exception_output
def micro():
instance = Instance({Operator.F_INIT: ["in"], Operator.O_F: ["out"]})

while instance.reuse_instance():
message = instance.receive("in")
instance.send("out", message)


@suppress_deadlock_exception_output
def deadlocking_macro():
instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]})

while instance.reuse_instance():
for i in range(10):
message = Message(float(i), data="testing")
instance.send("out", message)
instance.receive("in")
# Deadlock:
instance.receive("in")


@suppress_deadlock_exception_output
def macro():
instance = Instance({Operator.O_I: ["out"], Operator.S: ["in"]})

while instance.reuse_instance():
for i in range(10):
message = Message(float(i), data="testing")
instance.send("out", message)
instance.receive("in")


MACRO_MICRO_CONFIG = """
ymmsl_version: v0.1
model:
name: test_model
components:
macro:
implementation: macro
micro:
implementation: micro
conduits:
macro.out: micro.in
micro.out: macro.in
settings:
muscle_deadlock_receive_timeout: 0.1
"""
MACRO_MICRO_WITH_DISPATCH_CONFIG = """
ymmsl_version: v0.1
model:
name: test_model
components:
macro:
implementation: macro
micro1:
implementation: micro
micro2:
implementation: micro
micro3:
implementation: micro
conduits:
macro.out: micro1.in
micro1.out: micro2.in
micro2.out: micro3.in
micro3.out: macro.in
settings:
muscle_deadlock_receive_timeout: 0.1
"""


def test_no_deadlock(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro": ("python", micro)})


def test_deadlock1(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro": ("python", deadlocking_micro)},
expect_success=False)


def test_deadlock2(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("python", deadlocking_macro),
"micro": ("python", micro)},
expect_success=False)


def test_no_deadlock_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro1": ("python", micro),
"micro2": ("python", micro),
"micro3": ("python", micro)})


def test_deadlock1_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", macro),
"micro1": ("python", micro),
"micro2": ("python", deadlocking_micro),
"micro3": ("python", micro)},
expect_success=False)


def test_deadlock2_with_dispatch(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("python", deadlocking_macro),
"micro1": ("python", micro),
"micro2": ("python", micro),
"micro3": ("python", micro)},
expect_success=False)


@skip_if_python_only
def test_no_deadlock_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro": ("python", micro)})


@skip_if_python_only
def test_deadlock1_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro": ("python", deadlocking_micro)},
expect_success=False)


@skip_if_python_only
def test_deadlock2_cpp(tmp_path):
run_manager_with_actors(
MACRO_MICRO_WITH_DISPATCH_CONFIG, tmp_path,
{"macro": ("cpp", "component_test"),
"micro1": ("python", micro),
"micro2": ("python", deadlocking_micro),
"micro3": ("cpp", "component_test")},
expect_success=False)
1 change: 1 addition & 0 deletions libmuscle/cpp/src/libmuscle/mcp/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class RequestType {
// Connection deadlock detection
waiting_for_receive = 9,
waiting_for_receive_done = 10,
is_deadlocked = 11,

// MUSCLE Peer Protocol
get_next_message = 21
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ void MMPClient::waiting_for_receive_done(

bool MMPClient::is_deadlocked() {
auto request = Data::list(
static_cast<int>(RequestType::waiting_for_receive_done),
static_cast<int>(RequestType::is_deadlocked),
static_cast<std::string>(instance_id_));

auto response = call_manager_(request);
Expand Down
1 change: 1 addition & 0 deletions libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void ReceiveTimeoutHandler::on_timeout()
else
if (manager_.is_deadlocked())
throw Deadlock();
num_timeout_ ++;
}

void ReceiveTimeoutHandler::on_receive()
Expand Down

0 comments on commit f3f8e31

Please sign in to comment.