Skip to content

Commit

Permalink
Implement timeout for shutdown after a deadlock
Browse files Browse the repository at this point in the history
Improve documentation of the DeadlockDetector and add unit tests
  • Loading branch information
maarten-ic committed Aug 13, 2024
1 parent 523d1fd commit 1f6de94
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 26 deletions.
146 changes: 120 additions & 26 deletions libmuscle/python/libmuscle/manager/deadlock_detector.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,86 @@
import logging
from threading import Thread
import time
from typing import Callable, Dict, List, Optional, Tuple
from queue import Queue
from queue import Empty, Queue


_logger = logging.getLogger(__name__)
_QueueItem = Tuple[bool, str, str, str, Optional[int]]


class DeadlockDetector(Thread):
"""TODO"""
"""The DeadlockDetector attempts to detect when multiple instances are stuck waiting
for each other.
This class is responsible for handling WAITING_FOR_RECEIVE and
WAITING_FOR_RECEIVE_DONE MMP messages, which are submitted by the MMPServer.
When a deadlock is detected, the cycle of instances that is waiting on each other is
logged with FATAL severity. If this deadlock does not get resoled in
``wait_before_shutdown`` seconds, the simulation is shut down.
"""

def __init__(
self, shutdown_callback: Callable[[], None], wait_before_shutdown: float
) -> None:
"""Construct a new DeadlockDetector thread.
Args:
shutdown_callback: function to execute when a deadlock is detected. This
callback (which is executed in this thread!) is responsible for stopping
the simulation when a deadlock is detected.
wait_before_shutdown: Number of seconds to wait before executing
:param:`shutdown_callback` after a deadlock is detected. If the deadlock
is resolved (although this is unlikely), the simulation will not shut
down.
"""
super().__init__(name="DeadlockDetector")

self._shutdown_callback = shutdown_callback
self._wait_before_shutdown = wait_before_shutdown

self._queue: Queue[Optional[_QueueItem]] = Queue()
"""Queue of incoming messages. Incoming messages can come in any communication
thread and will be consumed and processed in this worker thread.
"""
self._waiting_instances: Dict[str, str] = {}
"""Maps instance IDs to the peer instance IDs they are waiting for."""
self._waiting_instance_ports: Dict[str, Tuple[str, Optional[int]]] = {}
"""Maps instance IDs to the port/slot they are waiting on.."""

self._detected_deadlocks: List[str] = []
self._detected_deadlocks: List[List[str]] = []
"""List of deadlocked instance cycles. Set by _handle_potential_deadlock.
"""
self._shutdown_time: Optional[float] = None
"""Future time when we confirm the potential deadlock and abort the simulation.
"""

def run(self) -> None:
"""Logic that is executed in the thread."""
while True:
item = self._queue.get()
if item is None: # Shutdown sentinal
# Set a timeout when a deadlock was detected
timeout = None
if self._shutdown_time is not None:
timeout = max(0, self._shutdown_time - time.monotonic())

# Grab a new item from the queue, this raises Empty when timeout expires:
try:
item = self._queue.get(timeout=timeout)
if item is None: # On shutdown, None is pushed to the queue
return # exit thread
self._process_queue_item(item)

except Empty:
# timeout expired and queue is empty, call shutdown callback
formatted_deadlocks = "\n\n".join(
self._format_deadlock(instances)
for instances in self._detected_deadlocks)
_logger.fatal(
"Aborting simulation: deadlock detected.\n%s",
formatted_deadlocks)
self._shutdown_callback()
return
# Handle item
self._process_queue_item(item)

def shutdown(self) -> None:
"""Stop the deadlock detector thread."""
Expand All @@ -42,18 +90,40 @@ def put_waiting(
self, instance_id: str, peer_instance_id: str,
port_name: str, slot: Optional[int]
) -> None:
"""Process a WATING_FOR_RECEIVE message from an instance."""
"""Queue a WAITING_FOR_RECEIVE message from an instance for processing.
This method can be called from any thread.
Args:
instance_id: ID of instance that is waiting to receive a message.
peer_instance_id: ID of the peer that the instance is waiting on.
port_name: Name of the input port.
slot: Optional slot number of the input port.
"""
self._queue.put((True, instance_id, peer_instance_id, port_name, slot))

def put_waiting_done(
self, instance_id: str, peer_instance_id: str,
port_name: str, slot: Optional[int]
) -> None:
"""Process a WATING_FOR_RECEIVE_DONE message from an instance."""
"""Queue a WAITING_FOR_RECEIVE_DONE message from an instance for processing.
This method can be called from any thread.
Args:
instance_id: ID of instance that is waiting to receive a message.
peer_instance_id: ID of the peer that the instance is waiting on.
port_name: Name of the input port.
slot: Optional slot number of the input port.
"""
self._queue.put((False, instance_id, peer_instance_id, port_name, slot))

def _process_queue_item(self, item: _QueueItem) -> None:
_logger.info("Processing queue item: %s", item)
"""Actually process a WAITING_FOR_RECEIVE[_DONE] request.
This method should be called inside the worker thread.
"""
_logger.debug("Processing queue item: %s", item)
is_waiting, instance_id, peer_instance_id, port_name, slot = item
if is_waiting:
# Sanity checks, triggering this is a bug in the instance or the manager
Expand Down Expand Up @@ -88,6 +158,15 @@ def _process_queue_item(self, item: _QueueItem) -> None:
del self._waiting_instances[instance_id]
del self._waiting_instance_ports[instance_id]

# Check if we were part of a deadlock
for i, instance_list in enumerate(self._detected_deadlocks):
if instance_id in instance_list:
del self._detected_deadlocks[i]
break
if not self._detected_deadlocks:
# There are no deadlocks anymore: cancel shutdown
self._shutdown_time = None

def _check_for_deadlock(self, instance_id: str) -> None:
"""Check if there is a cycle of waiting instances that involves this instance.
"""
Expand All @@ -96,25 +175,40 @@ def _check_for_deadlock(self, instance_id: str) -> None:
while cur_instance in self._waiting_instances:
cur_instance = self._waiting_instances[cur_instance]
if cur_instance == instance_id:
break # Found a deadlocked cycle
self._handle_potential_deadlock(deadlock_instances)
return
deadlock_instances.append(cur_instance)
else: # No cycle detected
_logger.info("No deadlock detected")
return

_logger.warning(
"Potential deadlock detected, aborting run in %d seconds.\n%s",
self._wait_before_shutdown,
self._format_deadlock(deadlock_instances),
)
# TODO: wait and abort
self._shutdown_callback()
_logger.debug("No deadlock detected")

def _handle_potential_deadlock(self, deadlock_instances: List[str]) -> None:
"""Handle a potential deadlock.
def _format_deadlock(self, instances: List[str]) -> str:
"""Create and return formatted deadlock debug info."""
num_instances = str(len(instances))
Args:
deadlock_instances: list of instances waiting on eachother
"""
shutdown_delay = self._wait_before_shutdown
if self._shutdown_time is not None:
# Get time until shutdown
shutdown_delay = self._shutdown_time - time.monotonic()
_logger.fatal(
"Potential deadlock detected, aborting run in %d seconds.\n%s",
shutdown_delay,
self._format_deadlock(deadlock_instances),
)

self._detected_deadlocks.append(deadlock_instances)
if self._shutdown_time is None:
self._shutdown_time = time.monotonic() + self._wait_before_shutdown

def _format_deadlock(self, deadlock_instances: List[str]) -> str:
"""Create and return formatted deadlock debug info.
Args:
deadlock_instances: list of instances waiting on eachother
"""
num_instances = str(len(deadlock_instances))
lines = [f"The following {num_instances} instances are dead-locked:"]
for i, instance in enumerate(instances):
for i, instance in enumerate(deadlock_instances):
num = str(i+1).rjust(len(num_instances))
peer_instance = self._waiting_instances[instance]
port, slot = self._waiting_instance_ports[instance]
Expand Down
106 changes: 106 additions & 0 deletions libmuscle/python/libmuscle/manager/test/test_deadlock_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
import time
from typing import Iterator
from unittest.mock import Mock

import pytest

from libmuscle.manager.deadlock_detector import DeadlockDetector


@pytest.fixture
def shutdown_callback() -> Mock:
return Mock()


@pytest.fixture
def detector(shutdown_callback) -> Iterator[DeadlockDetector]:
# Using a very short delay (10ms) to speed up unit testing
detector = DeadlockDetector(shutdown_callback, 0.01)
detector.start()
yield detector
if detector.is_alive():
detector.shutdown()
detector.join()


def test_no_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", None)
detector.put_waiting_done("macro", "micro", "s", None)
time.sleep(0.05)
detector.shutdown()
detector.join()
shutdown_callback.assert_not_called()


def test_double_waiting_log_error(
caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", 0)
detector.put_waiting("macro", "micro", "s", 1)
detector.shutdown()
detector.join()
assert len(caplog.record_tuples) == 1
assert caplog.record_tuples[0][:2] == (
"libmuscle.manager.deadlock_detector", logging.ERROR)


def test_not_waiting_log_error(
caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None:
detector.put_waiting_done("macro", "micro", "s", 0)
detector.shutdown()
detector.join()
assert len(caplog.record_tuples) == 1
assert caplog.record_tuples[0][:2] == (
"libmuscle.manager.deadlock_detector", logging.ERROR)


def test_waiting_for_different_instance_log_error(
caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", 0)
detector.put_waiting_done("macro", "meso", "s", 0)
detector.shutdown()
detector.join()
assert len(caplog.record_tuples) == 1
assert caplog.record_tuples[0][:2] == (
"libmuscle.manager.deadlock_detector", logging.ERROR)


def test_waiting_for_different_port_log_error(
caplog: pytest.LogCaptureFixture, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", 0)
detector.put_waiting_done("macro", "micro", "f_init", 0)
detector.shutdown()
detector.join()
assert len(caplog.record_tuples) == 1
assert caplog.record_tuples[0][:2] == (
"libmuscle.manager.deadlock_detector", logging.ERROR)


def test_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", None)
detector.put_waiting("micro", "macro", "f_init", None)
time.sleep(0.05)
assert not detector.is_alive()
shutdown_callback.assert_called_once()


def test_deadlock_cancelled(
shutdown_callback: Mock, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", None)
detector.put_waiting("micro", "macro", "f_init", None)
detector.put_waiting_done("macro", "micro", "s", None)
time.sleep(0.05)
detector.shutdown()
detector.join()
shutdown_callback.assert_not_called()


def test_double_deadlock(shutdown_callback: Mock, detector: DeadlockDetector) -> None:
detector.put_waiting("macro", "micro", "s", None)
detector.put_waiting("micro", "macro", "f_init", None)
detector.put_waiting("cycle2", "peer2", "s", None)
detector.put_waiting("peer2", "cycle2", "f_init", None)
detector.put_waiting_done("macro", "micro", "s", None)
time.sleep(0.05)
assert not detector.is_alive()
shutdown_callback.assert_called()

0 comments on commit 1f6de94

Please sign in to comment.