Skip to content

Commit

Permalink
Implement C++ components of deadlock detection
Browse files Browse the repository at this point in the history
  • Loading branch information
maarten-ic committed Aug 16, 2024
1 parent f98e5a3 commit 94869da
Show file tree
Hide file tree
Showing 19 changed files with 270 additions and 25 deletions.
16 changes: 12 additions & 4 deletions libmuscle/cpp/src/libmuscle/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ Communicator::Communicator(
ymmsl::Reference const & kernel,
std::vector<int> const & index,
PortManager & port_manager,
Logger & logger, Profiler & profiler)
Logger & logger, Profiler & profiler,
MMPClient & manager)
: kernel_(kernel)
, index_(index)
, port_manager_(port_manager)
, logger_(logger)
, profiler_(profiler)
, manager_(manager)
, server_()
, clients_()
, receive_timeout_(10.0) // Notify manager, by default, after 10 seconds waiting in receive_message()
{}

std::vector<std::string> Communicator::get_locations() const {
Expand Down Expand Up @@ -138,8 +141,12 @@ std::tuple<Message, double> Communicator::receive_message(
Endpoint snd_endpoint = peer_info_.get().get_peer_endpoints(
recv_endpoint.port, slot_list).at(0);
MPPClient & client = get_client_(snd_endpoint.instance());
std::string peer_instance = static_cast<std::string>(snd_endpoint.instance());
ReceiveTimeoutHandler handler(
manager_, peer_instance, port_name, slot, receive_timeout_);
ReceiveTimeoutHandler *timeout_handler = receive_timeout_ < 0 ? nullptr : &handler;
auto msg_and_profile = try_receive_(
client, recv_endpoint.ref(), snd_endpoint.kernel);
client, recv_endpoint.ref(), snd_endpoint.kernel, timeout_handler);
auto & msg = std::get<0>(msg_and_profile);

ProfileEvent recv_decode_event(
Expand Down Expand Up @@ -289,9 +296,10 @@ Endpoint Communicator::get_endpoint_(
}

std::tuple<std::vector<char>, mcp::ProfileData> Communicator::try_receive_(
MPPClient & client, Reference const & receiver, Reference const & peer) {
MPPClient & client, Reference const & receiver, Reference const & peer,
ReceiveTimeoutHandler *timeout_handler) {
try {
return client.receive(receiver);
return client.receive(receiver, timeout_handler);
} catch(std::runtime_error const & err) {
throw std::runtime_error(
"Error while receiving a message: connection with peer '" +
Expand Down
22 changes: 20 additions & 2 deletions libmuscle/cpp/src/libmuscle/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <libmuscle/logger.hpp>
#include <libmuscle/message.hpp>
#include <libmuscle/mmp_client.hpp>
#include <libmuscle/mpp_client.hpp>
#include <libmuscle/mpp_server.hpp>
#include <libmuscle/namespace.hpp>
Expand All @@ -14,6 +15,7 @@
#include <libmuscle/port_manager.hpp>
#include <libmuscle/ports_description.hpp>
#include <libmuscle/profiler.hpp>
#include <libmuscle/receive_timeout_handler.hpp>
#include <libmuscle/test_support.hpp>
#include <libmuscle/util.hpp>

Expand Down Expand Up @@ -53,7 +55,8 @@ class Communicator {
ymmsl::Reference const & kernel,
std::vector<int> const & index,
PortManager & port_manager,
Logger & logger, Profiler & profiler);
Logger & logger, Profiler & profiler,
MMPClient & manager);

/** Returns a list of locations that we can be reached at.
*
Expand Down Expand Up @@ -127,6 +130,19 @@ class Communicator {
*/
void shutdown();

/** Update the timeout after which the manager is notified that we are
* waiting for a message.
*
* @param receive_timeout Timeout (seconds). A negative number disables
* the deadlock notification mechanism.
*/
void set_receive_timeout(double receive_timeout) { receive_timeout_ = receive_timeout; }

/** Get the timeout after which the manager is notified that we are
* waiting for a message.
*/
double get_receive_timeout() const { return receive_timeout_; }

PRIVATE:
using Ports_ = std::unordered_map<std::string, Port>;

Expand All @@ -140,7 +156,7 @@ class Communicator {

std::tuple<std::vector<char>, mcp::ProfileData> try_receive_(
MPPClient & client, ymmsl::Reference const & receiver,
ymmsl::Reference const & peer);
ymmsl::Reference const & peer, ReceiveTimeoutHandler *handler);

void close_port_(std::string const & port_name, Optional<int> slot = {});

Expand Down Expand Up @@ -186,9 +202,11 @@ class Communicator {
PortManager & port_manager_;
Logger & logger_;
Profiler & profiler_;
MMPClient & manager_;
MPPServer server_;
std::unordered_map<ymmsl::Reference, std::unique_ptr<MPPClient>> clients_;
Optional<PeerInfo> peer_info_;
double receive_timeout_;
};

} }
Expand Down
22 changes: 21 additions & 1 deletion libmuscle/cpp/src/libmuscle/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class Instance::Impl {
void deregister_();
void setup_checkpointing_();
void setup_profiling_();
void setup_receive_timeout_();

::ymmsl::Reference make_full_name_(int argc, char const * const argv[]) const;
std::string extract_manager_location_(int argc, char const * const argv[]) const;
Expand Down Expand Up @@ -226,7 +227,7 @@ Instance::Impl::Impl(
port_manager_.reset(new PortManager(index_(), ports));
communicator_.reset(
new Communicator(
name_(), index_(), *port_manager_, *logger_, *profiler_));
name_(), index_(), *port_manager_, *logger_, *profiler_, *manager_));
snapshot_manager_.reset(new SnapshotManager(
instance_name_, *manager_, *port_manager_, *logger_));
trigger_manager_.reset(new TriggerManager());
Expand All @@ -239,6 +240,7 @@ Instance::Impl::Impl(
set_local_log_level_();
set_remote_log_level_();
setup_profiling_();
setup_receive_timeout_();
#ifdef MUSCLE_ENABLE_MPI
auto sbase_data = Data(settings_manager_.base);
msgpack::sbuffer sbuf;
Expand Down Expand Up @@ -556,6 +558,24 @@ void Instance::Impl::setup_profiling_() {
profiler_->set_level(profile_level_str);
}

void Instance::Impl::setup_receive_timeout_() {
double timeout;
try {
timeout = settings_manager_.get_setting(
instance_name_, "muscle_deadlock_receive_timeout").as<double>();
communicator_->set_receive_timeout(timeout);
}
catch (std::runtime_error const & e) {
logger_->error(e.what() + std::string(" in muscle_deadlock_receive_timeout"));
}
catch (std::out_of_range const &) {
// muscle_deadlock_receive_timeout not set, do nothing and keep the default
}
logger_->debug(
"Timeout on receiving messages set to ",
communicator_->get_receive_timeout());
}

Message Instance::Impl::receive_message(
std::string const & port_name,
Optional<int> slot,
Expand Down
3 changes: 3 additions & 0 deletions libmuscle/cpp/src/libmuscle/mcp/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ enum class RequestType {
submit_profile_events = 6,
submit_snapshot = 7,
get_checkpoint_info = 8,
// Connection deadlock detection
waiting_for_receive = 9,
waiting_for_receive_done = 10,

// MUSCLE Peer Protocol
get_next_message = 21
Expand Down
45 changes: 43 additions & 2 deletions libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <algorithm>
#include <cstring>
#include <chrono>
#include <memory>
#include <string>
#include <unistd.h>
Expand All @@ -13,6 +14,7 @@
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <poll.h>


namespace {
Expand Down Expand Up @@ -128,12 +130,51 @@ TcpTransportClient::~TcpTransportClient() {
}

std::tuple<std::vector<char>, ProfileData> TcpTransportClient::call(
char const * req_buf, std::size_t req_len
char const * req_buf, std::size_t req_len,
TimeoutHandler* timeout_handler
) const {
ProfileTimestamp start_wait;
send_frame(socket_fd_, req_buf, req_len);

int64_t length = recv_int64(socket_fd_);
int64_t length;
if (timeout_handler == nullptr) {
length = recv_int64(socket_fd_);
} else {
using std::chrono::duration;
using std::chrono::steady_clock;
using std::chrono::milliseconds;
using std::chrono::duration_cast;

const auto timeout_duration = duration<double>(timeout_handler->get_timeout());
const auto deadline = steady_clock::now() + timeout_duration;
int poll_result;
pollfd socket_poll_fd;
socket_poll_fd.fd = socket_fd_;
socket_poll_fd.events = POLLIN;
do {
int timeout_ms = duration_cast<milliseconds>(deadline - steady_clock::now()).count();
poll_result = poll(&socket_poll_fd, 1, timeout_ms);

if (poll_result >= 0)
break;

if (errno != EINTR)
throw std::runtime_error("Unexpected error during poll(): "+std::to_string(errno));

// poll() was interrupted by a signal: retry with re-calculated timeout
} while (1);

if (poll_result == 0) {
// time limit expired
timeout_handler->on_timeout();
length = recv_int64(socket_fd_);
timeout_handler->on_receive();
} else {
// socket is ready for a receive, this call shouldn't block:
length = recv_int64(socket_fd_);
}
}

ProfileTimestamp start_transfer;
std::vector<char> result(length);
recv_all(socket_fd_, result.data(), result.size());
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class TcpTransportClient : public TransportClient {
* @return A byte array with the received data.
*/
virtual std::tuple<std::vector<char>, ProfileData> call(
char const * req_buf, std::size_t req_len) const override;
char const * req_buf, std::size_t req_len,
TimeoutHandler* timeout_handler=nullptr) const override;

/** Closes this client.
*
Expand Down
18 changes: 17 additions & 1 deletion libmuscle/cpp/src/libmuscle/mcp/transport_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ namespace libmuscle { namespace _MUSCLE_IMPL_NS { namespace mcp {
using ProfileData = std::tuple<
ProfileTimestamp, ProfileTimestamp, ProfileTimestamp>;

class TimeoutHandler {
public:
virtual ~TimeoutHandler() = default;

/** Timeout (in seconds) after which on_timeout is called. */
virtual double get_timeout() = 0;
/** Callback when getTimeout seconds have passed without a response from * the peer.
*/
virtual void on_timeout() = 0;
/** Callback when receiving a response from the peer.
*
* Note: this method is only called when the request has timed out.
*/
virtual void on_receive() = 0;
};

/** A client that connects to an MCP transport server.
*
Expand Down Expand Up @@ -73,7 +88,8 @@ class TransportClient {
* received data, and the timestamps.
*/
virtual std::tuple<std::vector<char>, ProfileData> call(
char const * req_buf, std::size_t req_len) const = 0;
char const * req_buf, std::size_t req_len,
TimeoutHandler* timeout_handler=nullptr) const = 0;

/** Closes this client.
*
Expand Down
24 changes: 24 additions & 0 deletions libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,30 @@ void MMPClient::deregister_instance() {
}
}

void MMPClient::waiting_for_receive(
std::string const & peer_instance_id, std::string const & port_name,
Optional<int> slot)
{
auto request = Data::list(
static_cast<int>(RequestType::waiting_for_receive),
static_cast<std::string>(instance_id_),
peer_instance_id, port_name, encode_optional(slot));

auto response = call_manager_(request);
}

void MMPClient::waiting_for_receive_done(
std::string const & peer_instance_id, std::string const & port_name,
Optional<int> slot)
{
auto request = Data::list(
static_cast<int>(RequestType::waiting_for_receive_done),
static_cast<std::string>(instance_id_),
peer_instance_id, port_name, encode_optional(slot));

auto response = call_manager_(request);
}

DataConstRef MMPClient::call_manager_(DataConstRef const & request) {
std::lock_guard<std::mutex> lock(mutex_);

Expand Down
10 changes: 10 additions & 0 deletions libmuscle/cpp/src/libmuscle/mmp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ class MMPClient {

void deregister_instance();

/** Notify the manager that we're waiting to receive a message. */
void waiting_for_receive(
std::string const & peer_instance_id, std::string const & port_name,
Optional<int> slot);

/** Notify the manager that we're done waiting to receive a message. */
void waiting_for_receive_done(
std::string const & peer_instance_id, std::string const & port_name,
Optional<int> slot);

private:
ymmsl::Reference instance_id_;
mcp::TcpTransportClient transport_client_;
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/cpp/src/libmuscle/mpp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ MPPClient::MPPClient(std::vector<std::string> const & locations) {
}

std::tuple<std::vector<char>, ProfileData> MPPClient::receive(
Reference const & receiver)
Reference const & receiver, mcp::TimeoutHandler *timeout_handler)
{
auto request = Data::list(
static_cast<int>(RequestType::get_next_message),
Expand All @@ -37,7 +37,7 @@ std::tuple<std::vector<char>, ProfileData> MPPClient::receive(
// can then overwrite after encoding with the length?
msgpack::pack(sbuf, request);

return transport_client_->call(sbuf.data(), sbuf.size());
return transport_client_->call(sbuf.data(), sbuf.size(), timeout_handler);
}

void MPPClient::close() {
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/mpp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class MPPClient {
* @return The received message.
*/
std::tuple<std::vector<char>, mcp::ProfileData> receive(
::ymmsl::Reference const & receiver);
::ymmsl::Reference const & receiver,
mcp::TimeoutHandler *timeout_handler=nullptr);

/** Closes this client.
*
Expand Down
29 changes: 29 additions & 0 deletions libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "receive_timeout_handler.hpp"

namespace libmuscle { namespace _MUSCLE_IMPL_NS {

ReceiveTimeoutHandler::ReceiveTimeoutHandler(
MMPClient& manager, std::string const& peer_instance,
std::string const& port_name, Optional<int> slot, double timeout)
: manager_(manager)
, peer_instance_(peer_instance)
, port_name_(port_name)
, slot_(slot)
, timeout_(timeout) {}

double ReceiveTimeoutHandler::get_timeout()
{
return timeout_;
}

void ReceiveTimeoutHandler::on_timeout()
{
manager_.waiting_for_receive(peer_instance_, port_name_, slot_);
}

void ReceiveTimeoutHandler::on_receive()
{
manager_.waiting_for_receive_done(peer_instance_, port_name_, slot_);
}

} }
Loading

0 comments on commit 94869da

Please sign in to comment.