Skip to content

Commit

Permalink
Finalize Python implementation of MMSFValidator
Browse files Browse the repository at this point in the history
  • Loading branch information
maarten-ic committed Aug 23, 2024
1 parent 39eb75b commit 06ddd33
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 21 deletions.
8 changes: 5 additions & 3 deletions docs/source/examples/python/interact_coupling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Optional, Tuple, Dict

from libmuscle import Instance, Message, USES_CHECKPOINT_API
from libmuscle import Instance, InstanceFlags, Message
from libmuscle.runner import run_simulation
from ymmsl import (
Component, Conduit, Configuration, Model, Operator, Ports, Settings)
Expand Down Expand Up @@ -241,7 +241,8 @@ def temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']})
Operator.S: ['a_in', 'b_in']},
InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
# Receive initial messages and initialise state
Expand Down Expand Up @@ -275,7 +276,8 @@ def checkpointing_temporal_coupler() -> None:
"""
instance = Instance({
Operator.O_I: ['a_out', 'b_out'],
Operator.S: ['a_in', 'b_in']}, USES_CHECKPOINT_API)
Operator.S: ['a_in', 'b_in']},
InstanceFlags.USES_CHECKPOINT_API | InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
if instance.resuming():
Expand Down
5 changes: 3 additions & 2 deletions integration_test/test_snapshot_complex_coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from ymmsl import Operator, load, dump

from libmuscle import (
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API)
Instance, Message, KEEPS_NO_STATE_FOR_NEXT_USE, USES_CHECKPOINT_API,
SKIP_MMSF_SEQUENCE_CHECKS)
from libmuscle.manager.run_dir import RunDir

from .conftest import run_manager_with_actors, ls_snapshots
Expand Down Expand Up @@ -58,7 +59,7 @@ def cache_component(max_channels=2):
def echo_component(max_channels=2):
ports = {Operator.F_INIT: [f'in{i+1}' for i in range(max_channels)],
Operator.O_F: [f'out{i+1}' for i in range(max_channels)]}
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE)
instance = Instance(ports, KEEPS_NO_STATE_FOR_NEXT_USE | SKIP_MMSF_SEQUENCE_CHECKS)

while instance.reuse_instance():
for p_in, p_out in zip(ports[Operator.F_INIT], ports[Operator.O_F]):
Expand Down
22 changes: 22 additions & 0 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from libmuscle.logging_handler import MuscleManagerHandler
from libmuscle.mpp_message import ClosePort
from libmuscle.mmp_client import MMPClient
from libmuscle.mmsf_validator import MMSFValidator
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
Expand Down Expand Up @@ -89,6 +90,14 @@ class InstanceFlags(Flag):
:external:py:attr:`ymmsl.KeepsStateForNextUse.NECESSARY`).
"""

SKIP_MMSF_SEQUENCE_CHECKS = auto()
"""Disable the checks whether the MMSF is strictly followed when sending/receiving
messages.
See :class:`~libmuscle.mmsf_validator.MMSFValidator` for a detailed description of
the checks.
"""


_CHECKPOINT_SUPPORT_MASK = (
InstanceFlags.USES_CHECKPOINT_API |
Expand Down Expand Up @@ -186,6 +195,10 @@ def __init__(
self._set_local_log_level()
self._set_remote_log_level()
self._setup_profiling()
# MMSFValidator needs a connected port manager, and does some logging
self._mmsf_validator = (
None if InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS in self._flags
else MMSFValidator(self._port_manager))

def reuse_instance(self) -> bool:
"""Decide whether to run this instance again.
Expand Down Expand Up @@ -222,6 +235,8 @@ def reuse_instance(self) -> bool:
:meth:`save_final_snapshot`, or the checkpointing tutorial.
"""
self._api_guard.verify_reuse_instance()
if self._mmsf_validator:
self._mmsf_validator.reuse_instance()

if self._do_reuse is not None:
# thank you, should_save_final_snapshot, for running this already
Expand All @@ -230,6 +245,9 @@ def reuse_instance(self) -> bool:
else:
do_reuse = self._decide_reuse_instance()

if self._do_resume and not self._do_init and self._mmsf_validator:
self._mmsf_validator.skip_f_init()

# now _first_run, _do_resume and _do_init are also set correctly

do_implicit_checkpoint = (
Expand Down Expand Up @@ -428,6 +446,8 @@ def send(self, port_name: str, message: Message,
slot: The slot to send the message on, if any.
"""
self.__check_port(port_name, slot)
if self._mmsf_validator:
self._mmsf_validator.check_send(port_name, slot)
if message.settings is None:
message = copy(message)
message.settings = self._settings_manager.overlay
Expand Down Expand Up @@ -883,6 +903,8 @@ def __receive_message(
description of those.
"""
self.__check_port(port_name, slot, True)
if self._mmsf_validator:
self._mmsf_validator.check_receive(port_name, slot)

port = self._port_manager.get_port(port_name)
if port.operator == Operator.F_INIT:
Expand Down
74 changes: 58 additions & 16 deletions libmuscle/python/libmuscle/mmsf_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@


class MMSFValidator:
"""The MMSF Validator checks whether Instances are following the Multiscale
Modelling and Simulation Framework when sending and receiving messages.
In particular it checks that in order:
1. reuse_instance() is called
2. Messages are received on all F_INIT ports
3. The following sub-items happen in order, 0 or more times:
a. Messages are sent on all O_I ports
b. Messages are received on all S ports
4. Messages are sent on all O_F ports
If any message is sent or received out of order, a warning is logged to indicate
that the instance is not following the MMSF pattern. In some cases (for example the
time bridge in ``examples/python/interact_coupling.py``) this is expected and the
warnings can be disabled by setting the
:attr:`~libmuscle.instance.InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS` flag.
Note:
Checks on vector ports are not implemented. When the instance uses vector ports,
the MMSF Validator will be disabled.
"""
def __init__(self, port_manager: PortManager) -> None:
self._port_manager = port_manager

Expand Down Expand Up @@ -64,18 +88,28 @@ def __init__(self, port_manager: PortManager) -> None:
self._current_operator: Operator = Operator.NONE

def check_send(self, port_name: str, slot: Optional[int]) -> None:
"""Check that sending on the provided port adheres to the MMSF."""
self._check_send_receive(port_name, slot)

def check_receive(self, port_name: str, slot: Optional[int]) -> None:
"""Check that receiving on the provided port adheres to the MMSF."""
self._check_send_receive(port_name, slot)

def reuse_instance(self) -> None:
"""Check that a reuse_instance() adheres to the MMSF."""
if not self._enabled:
return
self._check_transition(Operator.NONE)

def skip_f_init(self) -> None:
"""Call when resuming from an intermediate snapshot: F_INIT is skipped."""
# Pretend we're now in F_INIT and we have already received on all F_INIT ports:
self._current_operator = Operator.F_INIT
self._current_ports_used = self._connected_ports.get(Operator.F_INIT, [])

def _check_send_receive(
self, port_name: str, slot: Optional[int]) -> None:
"""Actual implementation of check_send/check_receive."""
if not self._enabled:
return

Expand All @@ -92,6 +126,15 @@ def _check_send_receive(
self._current_ports_used.append(port_name)

def _check_transition(self, operator: Operator, port_name: str = "") -> None:
"""Check that a transition to the provided operator is allowed.
Log a warning when the transition does not adhere to the MMSF.
Args:
operator: Operator to transition to.
port_name: The name of the port that was sent/receveived on. This is only
used for constructing the warning message.
"""
connected_ports = self._connected_ports.get(self._current_operator, [])
expected: str = ""

Expand All @@ -100,11 +143,13 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None:
if port not in self._current_ports_used]
if unused_ports:
# We didn't complete the current phase
if operator in (Operator.F_INIT, Operator.S):
if self._current_operator in (Operator.F_INIT, Operator.S):
expected = "a receive"
else:
expected = "a send"
expected += " on any of these ports: " + ", ".join(unused_ports)
expected += (
f" on any of these {self._current_operator.name} ports: "
+ ", ".join(unused_ports))

elif (self._current_operator, operator) not in self._allowed_transitions:
# Transition to the operator is not allowed, now figure out what we were
Expand Down Expand Up @@ -136,28 +181,25 @@ def _check_transition(self, operator: Operator, port_name: str = "") -> None:
action = f"Receive on port '{port_name}'"
else:
action = f"Send on port '{port_name}'"
file_and_line = ""

# Find the file:line where the user called send/receive/reuse_instance
try:
# Try to find the file:line where the user called
# Instance.send/receive/reuse_instance
frame: Optional[types.FrameType] = sys._getframe()
while frame and frame.f_code.co_qualname.startswith("MMSFValidator."):
frame = frame.f_back
while (frame
and frame.f_code.co_filename.endswith("libmuscle/instance.py")):
frame = frame.f_back
if frame:
code = frame.f_code
file_and_line = f" ({code.co_filename}:{code.co_firstlineno})"
except Exception:
pass
frame = None # sys._getframe() is not guaranteed available
while (frame
and frame.f_globals.get("__name__", "").startswith("libmuscle.")):
# This frame is still part of a libmuscle module, step up:
frame = frame.f_back
loc = f" ({frame.f_code.co_filename}:{frame.f_lineno})" if frame else ""

_logger.warning(
"%s%s does not adhere to the MMSF: was expecting %s. "
"%s%s does not adhere to the MMSF: was expecting %s.\n"
"Not adhering to the Multiscale Modelling and Simulation Framework "
"may lead to deadlocks. You can disable this warning by "
"setting the flag InstanceFlags.SKIP_MMSF_SEQUENCE_CHECKS "
"when creating the libmuscle.Instance.",
action, file_and_line, expected)
action, loc, expected)

self._current_operator = operator
self._current_ports_used = []
2 changes: 2 additions & 0 deletions libmuscle/python/libmuscle/test/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ def test_get_setting(instance, settings_manager):


def test_list_ports(instance, port_manager):
port_manager.list_ports.assert_called_once_with()
port_manager.list_ports.reset_mock()
instance.list_ports()
port_manager.list_ports.assert_called_once_with()

Expand Down
29 changes: 29 additions & 0 deletions libmuscle/python/libmuscle/test/test_mmsf_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,32 @@ def test_only_f_i(mock_peer_info):
validator.check_receive("f_i", None)
with pytest.raises(TestMMSFValidatorException):
validator.check_receive("f_i", None)


def test_micro(mock_peer_info):
port_manager = PortManager([], {Operator.F_INIT: ["f_i"], Operator.O_F: ["o_f"]})
port_manager.connect_ports(mock_peer_info)
validator = MMSFValidator(port_manager)

for _ in range(5):
validator.reuse_instance()
validator.check_receive("f_i", None)
validator.check_receive("o_f", None)
validator.reuse_instance()
validator.check_receive("f_i", None)
with pytest.raises(TestMMSFValidatorException):
validator.reuse_instance()
with pytest.raises(TestMMSFValidatorException):
validator.check_receive("f_i", None)


def test_not_all_ports_used(mock_peer_info):
port_manager = PortManager([], {
Operator.F_INIT: ["f_i1", "f_i2"], Operator.O_F: ["o_f"]})
port_manager.connect_ports(mock_peer_info)
validator = MMSFValidator(port_manager)

validator.reuse_instance()
validator.check_receive("f_i1", None)
with pytest.raises(TestMMSFValidatorException):
validator.check_send("o_f", None)

0 comments on commit 06ddd33

Please sign in to comment.