Skip to content

Commit

Permalink
Refactor ReceiveTimeoutHandler to a separate file.
Browse files Browse the repository at this point in the history
  • Loading branch information
maarten-ic committed Aug 16, 2024
1 parent cd79b7d commit f98e5a3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 44 deletions.
46 changes: 2 additions & 44 deletions libmuscle/python/libmuscle/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from libmuscle.mpp_client import MPPClient
from libmuscle.mpp_server import MPPServer
from libmuscle.mcp.tcp_util import SocketClosed
from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.receive_timeout_handler import ReceiveTimeoutHandler


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,48 +62,6 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None,
self.settings = settings


class RecvTimeoutHandler(TimeoutHandler):
"""Timeout handler when receiving messages from peers.
This handler sends a message to the Muscle Manager when the receive times out (and
another message when the message does arrive).
This is used by the manager to detect if the simulation is in a deadlock, where a
cycle of instances is waiting on each other.
"""

def __init__(
self, manager: MMPClient,
peer_instance: str, port_name: str, slot: Optional[int],
timeout: float
) -> None:
"""Initialize a new timeout handler.
Args:
manager: Connection to the muscle manager.
peer_instance: the peer instance we try to receive from.
port_name: the name of the port we try to receive on.
slot: the slot we try to receive on.
"""
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
self._slot = slot
self._timeout = timeout

@property
def timeout(self) -> float:
return self._timeout

def on_timeout(self) -> None:
self._manager.waiting_for_receive(
self._peer_instance, self._port_name, self._slot)

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)


class Communicator:
"""Communication engine for MUSCLE3.
Expand Down Expand Up @@ -288,7 +246,7 @@ def receive_message(
client = self.__get_client(snd_endpoint.instance())
timeout_handler = None
if self._receive_timeout >= 0:
timeout_handler = RecvTimeoutHandler(
timeout_handler = ReceiveTimeoutHandler(
self._manager, str(snd_endpoint.instance()),
port_name, slot, self._receive_timeout)
try:
Expand Down
47 changes: 47 additions & 0 deletions libmuscle/python/libmuscle/receive_timeout_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Optional

from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.mmp_client import MMPClient


class ReceiveTimeoutHandler(TimeoutHandler):
"""Timeout handler when receiving messages from peers.
This handler sends a message to the Muscle Manager when the receive times out (and
another message when the message does arrive).
This is used by the manager to detect if the simulation is in a deadlock, where a
cycle of instances is waiting on each other.
"""

def __init__(
self, manager: MMPClient,
peer_instance: str, port_name: str, slot: Optional[int],
timeout: float
) -> None:
"""Initialize a new timeout handler.
Args:
manager: Connection to the muscle manager.
peer_instance: the peer instance we try to receive from.
port_name: the name of the port we try to receive on.
slot: the slot we try to receive on.
timeout: Timeout in seconds.
"""
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
self._slot = slot
self._timeout = timeout

@property
def timeout(self) -> float:
return self._timeout

def on_timeout(self) -> None:
self._manager.waiting_for_receive(
self._peer_instance, self._port_name, self._slot)

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)

0 comments on commit f98e5a3

Please sign in to comment.