Skip to content

Commit

Permalink
[WIP] Implement communication deadlock detection mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
maarten-ic committed Aug 12, 2024
1 parent 1209bb6 commit 303e2d9
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 31 deletions.
2 changes: 1 addition & 1 deletion integration_test/test_cpp_tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_cpp_tcp_server(log_file_in_tmpdir):
assert TcpTransportClient.can_connect_to(location)

client = MPPClient([location])
msg_bytes, _ = client.receive(Reference('test_receiver.port'))
msg_bytes, _ = client.receive(Reference('test_receiver.port'), None)
msg = MPPMessage.from_bytes(msg_bytes)
client.close()

Expand Down
46 changes: 41 additions & 5 deletions libmuscle/python/libmuscle/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from ymmsl import Identifier, Reference, Settings

from libmuscle.endpoint import Endpoint
from libmuscle.mmp_client import MMPClient
from libmuscle.mpp_message import ClosePort, MPPMessage
from libmuscle.mpp_client import MPPClient
from libmuscle.mpp_server import MPPServer
from libmuscle.mcp.tcp_util import SocketClosed
from libmuscle.mcp.transport_client import TimeoutHandler
from libmuscle.peer_info import PeerInfo
from libmuscle.port_manager import PortManager
from libmuscle.profiler import Profiler
Expand Down Expand Up @@ -60,6 +62,31 @@ def __init__(self, timestamp: float, next_timestamp: Optional[float] = None,
self.settings = settings


class RecvTimeoutHandler(TimeoutHandler):
def __init__(
self, manager: MMPClient,
peer_instance: str, port_name: str, slot: Optional[int],
timeout: float
) -> None:
self._manager = manager
self._peer_instance = peer_instance
self._port_name = port_name
self._slot = slot
self._timeout = timeout

@property
def timeout(self) -> float:
return self._timeout

def on_timeout(self) -> None:
self._manager.waiting_for_receive(
self._peer_instance, self._port_name, self._slot)

def on_receive(self) -> None:
self._manager.waiting_for_receive_done(
self._peer_instance, self._port_name, self._slot)


class Communicator:
"""Communication engine for MUSCLE3.
Expand Down Expand Up @@ -88,6 +115,8 @@ def __init__(
self._index = index
self._port_manager = port_manager
self._profiler = profiler
# TODO: make this a proper argument of __init__()
self._manager = profiler._manager

self._server = MPPServer()

Expand Down Expand Up @@ -178,7 +207,8 @@ def send_message(
self._profiler.record_event(profile_event)

def receive_message(
self, port_name: str, slot: Optional[int] = None) -> Tuple[Message, float]:
self, port_name: str, slot: Optional[int], timeout: float
) -> Tuple[Message, float]:
"""Receive a message and attached settings overlay.
Receiving is a blocking operation. This function will contact
Expand Down Expand Up @@ -228,8 +258,14 @@ def receive_message(
snd_endpoint = self._peer_info.get_peer_endpoints(
recv_endpoint.port, slot_list)[0]
client = self.__get_client(snd_endpoint.instance())
timeout_handler = None
if timeout >= 0:
timeout_handler = RecvTimeoutHandler(
self._manager, str(snd_endpoint.instance()),
port_name, slot, timeout)
try:
mpp_message_bytes, profile = client.receive(recv_endpoint.ref())
mpp_message_bytes, profile = client.receive(
recv_endpoint.ref(), timeout_handler)
except (ConnectionError, SocketClosed) as exc:
raise RuntimeError(
"Error while receiving a message: connection with peer"
Expand Down Expand Up @@ -288,7 +324,7 @@ def receive_message(
_logger.debug(f'Discarding received message on {port_and_slot}'
': resuming from weakly consistent snapshot')
port.set_resumed(slot)
return self.receive_message(port_name, slot)
return self.receive_message(port_name, slot, timeout)
raise RuntimeError(f'Received message on {port_and_slot} with'
' unexpected message number'
f' {mpp_message.message_number}. Was expecting'
Expand Down Expand Up @@ -398,7 +434,7 @@ def _drain_incoming_port(self, port_name: str) -> None:
port = self._port_manager.get_port(port_name)
while port.is_open():
# TODO: log warning if not a ClosePort
self.receive_message(port_name)
self.receive_message(port_name, None, 10.0) # FIXME: timeout

def _drain_incoming_vector_port(self, port_name: str) -> None:
"""Receives messages until a ClosePort is received.
Expand All @@ -413,7 +449,7 @@ def _drain_incoming_vector_port(self, port_name: str) -> None:
for slot in range(port.get_length())]):
for slot in range(port.get_length()):
if port.is_open(slot):
self.receive_message(port_name, slot)
self.receive_message(port_name, slot, 10.0) # FIXME: timeout

def _close_incoming_ports(self) -> None:
"""Closes incoming ports.
Expand Down
24 changes: 21 additions & 3 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def __init__(
self._name, self._index, self._port_manager, self._profiler)
"""Communicator for this instance."""

self._receive_timeout = 10.0
"""Timeout in seconds on message receives after which the manager is notified
that we are waiting for a message. Used to detect communication deadlocks."""

self._declared_ports = ports
"""Declared ports for this instance."""

Expand Down Expand Up @@ -186,6 +190,7 @@ def __init__(
self._set_local_log_level()
self._set_remote_log_level()
self._setup_profiling()
self._setup_receive_timeout()

def reuse_instance(self) -> bool:
"""Decide whether to run this instance again.
Expand Down Expand Up @@ -809,6 +814,16 @@ def _setup_profiling(self) -> None:

self._profiler.set_level(profile_level_str)

def _setup_receive_timeout(self) -> None:
"""Configures receive timeout with settings from settings.
"""
try:
self._receive_timeout = self.get_setting(
'muscle_deadlock_receive_timeout', 'float')
except KeyError:
pass # do nothing and keep the default
_logger.debug("Timeout on receiving messages set to %f", self._receive_timeout)

def _decide_reuse_instance(self) -> bool:
"""Decide whether and how to reuse the instance.
Expand Down Expand Up @@ -933,7 +948,8 @@ def __receive_message(
return default

else:
msg, saved_until = self._communicator.receive_message(port_name, slot)
msg, saved_until = self._communicator.receive_message(
port_name, slot, self._receive_timeout)
if not port.is_open(slot):
err_msg = (('Port {} was closed while trying to'
' receive on it, did the peer crash?'
Expand Down Expand Up @@ -1077,7 +1093,8 @@ def __receive_settings(self) -> bool:
Returns:
False iff the port is connnected and ClosePort was received.
"""
message, saved_until = self._communicator.receive_message('muscle_settings_in')
message, saved_until = self._communicator.receive_message(
'muscle_settings_in', None, self._receive_timeout)

if isinstance(message.data, ClosePort):
return False
Expand Down Expand Up @@ -1107,7 +1124,8 @@ def __pre_receive_f_init(self) -> None:
apply_overlay = InstanceFlags.DONT_APPLY_OVERLAY not in self._flags

def pre_receive(port_name: str, slot: Optional[int]) -> None:
msg, saved_until = self._communicator.receive_message(port_name, slot)
msg, saved_until = self._communicator.receive_message(
port_name, slot, self._receive_timeout)
self._f_init_cache[(port_name, slot)] = msg
if apply_overlay:
self.__apply_overlay(msg)
Expand Down
126 changes: 126 additions & 0 deletions libmuscle/python/libmuscle/manager/deadlock_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from threading import Thread
from typing import Callable, Dict, List, Optional, Tuple
from queue import Queue


_logger = logging.getLogger(__name__)
_QueueItem = Tuple[bool, str, str, str, Optional[int]]


class DeadlockDetector(Thread):
"""TODO"""

def __init__(
self, shutdown_callback: Callable[[], None], wait_before_shutdown: float
) -> None:
super().__init__(name="DeadlockDetector")

self._shutdown_callback = shutdown_callback
self._wait_before_shutdown = wait_before_shutdown

self._queue: Queue[Optional[_QueueItem]] = Queue()
self._waiting_instances: Dict[str, str] = {}
self._waiting_instance_ports: Dict[str, Tuple[str, int]] = {}

self._detected_deadlocks: List[str] = []

def run(self) -> None:
"""Logic that is executed in the thread."""
while True:
item = self._queue.get()
if item is None: # Shutdown sentinal
return
# Handle item
self._process_queue_item(item)

def shutdown(self) -> None:
"""Stop the deadlock detector thread."""
self._queue.put(None)

def put_waiting(
self, instance_id: str, peer_instance_id: str,
port_name: str, slot: Optional[int]
) -> None:
"""Process a WATING_FOR_RECEIVE message from an instance."""
self._queue.put((True, instance_id, peer_instance_id, port_name, slot))

def put_waiting_done(
self, instance_id: str, peer_instance_id: str,
port_name: str, slot: Optional[int]
) -> None:
"""Process a WATING_FOR_RECEIVE_DONE message from an instance."""
self._queue.put((False, instance_id, peer_instance_id, port_name, slot))

def _process_queue_item(self, item: _QueueItem) -> None:
_logger.info("Processing queue item: %s", item)
is_waiting, instance_id, peer_instance_id, port_name, slot = item
if is_waiting:
# Sanity checks, triggering this is a bug in the instance or the manager
if instance_id in self._waiting_instances:
_logger.error(
"Instance %s was already waiting on a receive call. "
"Did we miss a WAITING DONE event?",
instance_id)
# Register that the instance is waiting
self._waiting_instances[instance_id] = peer_instance_id
self._waiting_instance_ports[instance_id] = (port_name, slot)
self._check_for_deadlock(instance_id)

else:
# Sanity checks, triggering these is a bug in the instance or the manager
if instance_id not in self._waiting_instances:
_logger.error(
"Instance %s is not waiting on a receive call.", instance_id)
elif self._waiting_instances[instance_id] != peer_instance_id:
_logger.error(
"Instance %s was waiting for %s, not for %s.",
instance_id,
self._waiting_instances[instance_id],
peer_instance_id)
elif self._waiting_instance_ports[instance_id] != (port_name, slot):
_logger.error(
"Instance %s was waiting on port[slot] %s[%s], not on %s[%s]",
instance_id,
*self._waiting_instance_ports[instance_id],
port_name, slot)
else:
del self._waiting_instances[instance_id]
del self._waiting_instance_ports[instance_id]

def _check_for_deadlock(self, instance_id: str) -> None:
"""Check if there is a cycle of waiting instances that involves this instance.
"""
deadlock_instances = [instance_id]
cur_instance = instance_id
while cur_instance in self._waiting_instances:
cur_instance = self._waiting_instances[cur_instance]
if cur_instance == instance_id:
break # Found a deadlocked cycle
deadlock_instances.append(cur_instance)
else: # No cycle detected
_logger.info("No deadlock detected")
return

_logger.warning(
"Potential deadlock detected, aborting run in %d seconds.\n%s",
self._wait_before_shutdown,
self._format_deadlock(deadlock_instances),
)
# TODO: wait and abort
self._shutdown_callback()

def _format_deadlock(self, instances: List[str]) -> str:
"""Create and return formatted deadlock debug info."""
num_instances = str(len(instances))
lines = [f"The following {num_instances} instances are dead-locked:"]
for i, instance in enumerate(instances):
num = str(i+1).rjust(len(num_instances))
peer_instance = self._waiting_instances[instance]
port, slot = self._waiting_instance_ports[instance]
slot_txt = "" if slot is None else f"[{slot}]"
lines.append(
f"{num}. Instance '{instance}' is waiting on instance '{peer_instance}'"
f" in a receive on port '{port}{slot_txt}'."
)
return "\n".join(lines)
9 changes: 8 additions & 1 deletion libmuscle/python/libmuscle/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from libmuscle.manager.run_dir import RunDir
from libmuscle.manager.snapshot_registry import SnapshotRegistry
from libmuscle.manager.topology_store import TopologyStore
from libmuscle.manager.deadlock_detector import DeadlockDetector


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,11 +80,14 @@ def __init__(
self._snapshot_registry = SnapshotRegistry(
configuration, snapshot_dir, self._topology_store)
self._snapshot_registry.start()
# FIXME configure timeout:
self._deadlock_detector = DeadlockDetector(self.stop, 5.0)
self._deadlock_detector.start()

self._server = MMPServer(
self._logger, self._profile_store, self._configuration,
self._instance_registry, self._topology_store,
self._snapshot_registry, run_dir)
self._snapshot_registry, self._deadlock_detector, run_dir)

if self._instance_manager:
self._instance_manager.set_manager_location(
Expand Down Expand Up @@ -121,6 +125,9 @@ def stop(self) -> None:
"""Shuts down the manager."""
if self._instance_manager:
self._instance_manager.shutdown()
self._deadlock_detector.shutdown()
# Note: don't join() deadlock detector, as this method may be called from the
# DeadlockDetector thread. join() would (ironically) deadlock the shutdown :)
self._server.stop()
self._snapshot_registry.shutdown()
self._snapshot_registry.join()
Expand Down
Loading

0 comments on commit 303e2d9

Please sign in to comment.