Skip to content

Commit

Permalink
Refactor receive timeout logic
Browse files Browse the repository at this point in the history
Reduces the complexity of the Communicator
  • Loading branch information
maarten-ic committed Aug 16, 2024
1 parent c3a13b1 commit a727b1c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 71 deletions.
75 changes: 13 additions & 62 deletions libmuscle/python/libmuscle/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
from ymmsl import Identifier, Reference, Settings

from libmuscle.endpoint import Endpoint
from libmuscle.mmp_client import MMPClient
from libmuscle.mpp_message import ClosePort, MPPMessage
from libmuscle.mpp_client import MPPClient
from libmuscle.mpp_server import MPPServer
from libmuscle.mcp.tcp_util import SocketClosed
from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.receive_timeout_handler import ReceiveTimeoutHandlerFactory


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,48 +61,6 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None,
self.settings = settings


class RecvTimeoutHandler(TimeoutHandler):
"""Timeout handler when receiving messages from peers.
This handler sends a message to the Muscle Manager when the receive times out (and
another message when the message does arrive).
This is used by the manager to detect if the simulation is in a deadlock, where a
cycle of instances is waiting on each other.
"""

def __init__(
self, manager: MMPClient,
peer_instance: str, port_name: str, slot: Optional[int],
timeout: float
) -> None:
"""Initialize a new timeout handler.
Args:
manager: Connection to the muscle manager.
peer_instance: the peer instance we try to receive from.
port_name: the name of the port we try to receive on.
slot: the slot we try to receive on.
"""
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
self._slot = slot
self._timeout = timeout

@property
def timeout(self) -> float:
return self._timeout

def on_timeout(self) -> None:
self._manager.waiting_for_receive(
self._peer_instance, self._port_name, self._slot)

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)


class Communicator:
"""Communication engine for MUSCLE3.
Expand All @@ -114,8 +71,7 @@ class Communicator:
"""
def __init__(
self, kernel: Reference, index: List[int],
port_manager: PortManager, profiler: Profiler,
manager: MMPClient) -> None:
port_manager: PortManager, profiler: Profiler) -> None:
"""Create a Communicator.
The instance reference must start with one or more Identifiers,
Expand All @@ -133,10 +89,10 @@ def __init__(
self._index = index
self._port_manager = port_manager
self._profiler = profiler
self._manager = manager
# Notify manager, by default, after 10 seconds waiting in receive_message()
self._receive_timeout = 10.0

# Note: Instance will set this attribute with a call to
# set_receive_timeout_factory() before it is used
self._receive_timeout_handler_factory: ReceiveTimeoutHandlerFactory
self._server = MPPServer()

# indexed by remote instance id
Expand Down Expand Up @@ -165,15 +121,12 @@ def set_peer_info(self, peer_info: PeerInfo) -> None:
"""
self._peer_info = peer_info

def set_receive_timeout(self, receive_timeout: float) -> None:
"""Update the timeout after which the manager is notified that we are waiting
for a message.
Args:
receive_timeout: Timeout (seconds). A negative number disables the deadlock
notification mechanism.
def set_receive_timeout_factory(
self, factory: ReceiveTimeoutHandlerFactory) -> None:
"""Set the factory function for the receive timeout handler (used for deadlock
detection).
"""
self._receive_timeout = receive_timeout
self._receive_timeout_handler_factory = factory

def send_message(
self, port_name: str, message: Message,
Expand Down Expand Up @@ -286,11 +239,9 @@ def receive_message(
snd_endpoint = self._peer_info.get_peer_endpoints(
recv_endpoint.port, slot_list)[0]
client = self.__get_client(snd_endpoint.instance())
timeout_handler = None
if self._receive_timeout >= 0:
timeout_handler = RecvTimeoutHandler(
self._manager, str(snd_endpoint.instance()),
port_name, slot, self._receive_timeout)
# Set timeout handler for message receives (deadlock detection)
timeout_handler = self._receive_timeout_handler_factory(
str(snd_endpoint), port_name, slot)
try:
mpp_message_bytes, profile = client.receive(
recv_endpoint.ref(), timeout_handler)
Expand Down
13 changes: 7 additions & 6 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.receive_timeout_handler import ReceiveTimeoutHandlerFactory
from libmuscle.snapshot_manager import SnapshotManager
from libmuscle.util import extract_log_file_location

Expand Down Expand Up @@ -142,8 +143,7 @@ def __init__(
"""PortManager for this instance."""

self._communicator = Communicator(
self._name, self._index, self._port_manager, self._profiler,
self.__manager)
self._name, self._index, self._port_manager, self._profiler)
"""Communicator for this instance."""

self._declared_ports = ports
Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(
# Note: self._setup_checkpointing() needs to have the ports initialized
# so it comes after self._connect()
self._setup_checkpointing()
# profiling and logging need settings, so come after register_()
# profiling, logging and receive timeout need settings: come after register_()
self._set_local_log_level()
self._set_remote_log_level()
self._setup_profiling()
Expand Down Expand Up @@ -816,12 +816,13 @@ def _setup_receive_timeout(self) -> None:
"""
try:
timeout = self.get_setting('muscle_deadlock_receive_timeout', 'float')
self._communicator.set_receive_timeout(timeout)
except KeyError:
pass # do nothing and keep the default
timeout = 10.0 # Use 10 seconds as default timeout
_logger.debug(
"Timeout on receiving messages set to %f",
self._communicator._receive_timeout)
timeout)
factory = ReceiveTimeoutHandlerFactory(self.__manager, timeout)
self._communicator.set_receive_timeout_factory(factory)

def _decide_reuse_instance(self) -> bool:
"""Decide whether and how to reuse the instance.
Expand Down
79 changes: 79 additions & 0 deletions libmuscle/python/libmuscle/receive_timeout_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Optional

from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.mmp_client import MMPClient


class ReceiveTimeoutHandler(TimeoutHandler):
"""Timeout handler when receiving messages from peers.
This handler sends a message to the Muscle Manager when the receive times out (and
another message when the message does arrive).
This is used by the manager to detect if the simulation is in a deadlock, where a
cycle of instances is waiting on each other.
"""

def __init__(
self,
manager: MMPClient,
peer_instance: str,
port_name: str,
slot: Optional[int],
timeout: float,
) -> None:
"""Initialize a new timeout handler.
Args:
manager: Connection to the muscle manager.
peer_instance: the peer instance we try to receive from.
port_name: the name of the port we try to receive on.
slot: the slot we try to receive on.
"""
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
self._slot = slot
self._timeout = timeout

@property
def timeout(self) -> float:
return self._timeout

def on_timeout(self) -> None:
self._manager.waiting_for_receive(
self._peer_instance, self._port_name, self._slot)

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)


class ReceiveTimeoutHandlerFactory:
"""Factory class which can be called to create ReceiveTimeoutHandler instances.
"""

def __init__(self, manager: MMPClient, timeout: float) -> None:
"""Initialize factory class.
Args:
manager: Connection to the muscle manager.
timeout: Timeout in seconds before the manager is notified that we are
waiting for a message.
"""
self.manager = manager
self.timeout = timeout

def __call__(
self, peer_instance_id: str, port_name: str, slot: Optional[int]
) -> Optional[TimeoutHandler]:
"""Create a new TimeoutHandler.
Returns:
A new ReceiveTimeoutHandler with the provided parameters, or None when
self.timeout is negative.
"""
if self.timeout < 0:
return None
return ReceiveTimeoutHandler(
self.manager, peer_instance_id, port_name, slot, self.timeout)
7 changes: 4 additions & 3 deletions libmuscle/python/libmuscle/test/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from libmuscle.communicator import Communicator, Message
from libmuscle.mpp_message import ClosePort, MPPMessage
from libmuscle.peer_info import PeerInfo
from libmuscle.receive_timeout_handler import ReceiveTimeoutHandlerFactory
from ymmsl import Conduit, Reference as Ref, Settings


Expand Down Expand Up @@ -46,7 +47,9 @@ def mpp_client(MPPClient):

@pytest.fixture
def communicator(connected_port_manager, profiler):
return Communicator(Ref('component'), [], connected_port_manager, profiler, Mock())
communicator = Communicator(Ref('component'), [], connected_port_manager, profiler)
communicator.set_receive_timeout_factory(ReceiveTimeoutHandlerFactory(Mock(), -1))
return communicator


@pytest.fixture
Expand Down Expand Up @@ -115,7 +118,6 @@ def test_receive_message(connected_communicator, mpp_client):

mpp_client.receive.return_value = msg.encoded(), MagicMock()

connected_communicator.set_receive_timeout(-1)
recv_msg, saved_until = connected_communicator.receive_message('in')

mpp_client.receive.assert_called_with(Ref('component.in'), None)
Expand All @@ -136,7 +138,6 @@ def test_receive_message_vector(connected_communicator, mpp_client):

mpp_client.receive.return_value = msg.encoded(), MagicMock()

connected_communicator.set_receive_timeout(-1)
recv_msg, saved_until = connected_communicator.receive_message('in_v', 5)

mpp_client.receive.assert_called_with(Ref('component.in_v[5]'), None)
Expand Down

0 comments on commit a727b1c

Please sign in to comment.