Skip to content

Commit

Permalink
Use MMSFValidator in Instance
Browse files Browse the repository at this point in the history
And implement checks in send() and receive() to check if the port's operator allows sending/receiving.
  • Loading branch information
maarten-ic committed Aug 27, 2024
1 parent 73f1b41 commit 2ac5533
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
41 changes: 34 additions & 7 deletions libmuscle/cpp/src/libmuscle/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <libmuscle/data.hpp>
#include <libmuscle/mcp/data_pack.hpp>
#include <libmuscle/logger.hpp>
#include <libmuscle/mmsf_validator.hpp>
#include <libmuscle/mmp_client.hpp>
#include <libmuscle/peer_info.hpp>
#include <libmuscle/port_manager.hpp>
Expand Down Expand Up @@ -130,6 +131,7 @@ class Instance::Impl {
SettingsManager settings_manager_;
std::unique_ptr<SnapshotManager> snapshot_manager_;
std::unique_ptr<TriggerManager> trigger_manager_;
std::unique_ptr<MMSFValidator> mmsf_validator_;
Optional<bool> first_run_;
Optional<bool> do_reuse_;
bool do_resume_;
Expand All @@ -151,8 +153,8 @@ class Instance::Impl {

std::vector<::ymmsl::Port> list_declared_ports_() const;
void check_port_(
std::string const & port_name, Optional<int> slot = {},
bool allow_slot_out_of_range = false);
std::string const & port_name, Optional<int> slot,
bool is_send, bool allow_slot_out_of_range = false);

bool receive_settings_();
bool have_f_init_connections_();
Expand Down Expand Up @@ -239,6 +241,10 @@ Instance::Impl::Impl(
set_local_log_level_();
set_remote_log_level_();
setup_profiling_();
// MMSFValidator needs a connected port manager, and does some logging
if (! (InstanceFlags::SKIP_MMSF_SEQUENCE_CHECKS & flags_)) {
mmsf_validator_.reset(new MMSFValidator(*port_manager_, *logger_));
}
#ifdef MUSCLE_ENABLE_MPI
auto sbase_data = Data(settings_manager_.base);
msgpack::sbuffer sbuf;
Expand Down Expand Up @@ -268,6 +274,7 @@ Instance::Impl::~Impl() {

bool Instance::Impl::reuse_instance() {
api_guard_->verify_reuse_instance();
if (mmsf_validator_) mmsf_validator_->reuse_instance();

bool do_reuse;
if (do_reuse_.is_set()) {
Expand All @@ -278,6 +285,9 @@ bool Instance::Impl::reuse_instance() {
do_reuse = decide_reuse_instance_();
}

if (do_resume_ && !do_init_ && mmsf_validator_)
mmsf_validator_->skip_f_init();

// now first_run_, do_resume_ and do_init_ are also set correctly
#ifdef MUSCLE_ENABLE_MPI
if (mpi_barrier_.is_root()) {
Expand Down Expand Up @@ -411,7 +421,8 @@ void Instance::Impl::send(std::string const & port_name, Message const & message
if (mpi_barrier_.is_root()) {
#endif

check_port_(port_name);
check_port_(port_name, {}, true);
if (mmsf_validator_) mmsf_validator_->check_send(port_name, {});
if (!message.has_settings()) {
Message msg(message);
msg.set_settings(settings_manager_.overlay);
Expand All @@ -434,7 +445,8 @@ void Instance::Impl::send(
try {
#endif

check_port_(port_name, slot);
check_port_(port_name, slot, true);
if (mmsf_validator_) mmsf_validator_->check_send(port_name, slot);
if (!message.has_settings()) {
Message msg(message);
msg.set_settings(settings_manager_.overlay);
Expand Down Expand Up @@ -570,7 +582,8 @@ Message Instance::Impl::receive_message(
try {
#endif

check_port_(port_name, slot, true);
check_port_(port_name, slot, false, true);
if (mmsf_validator_) mmsf_validator_->check_receive(port_name, slot);

Reference port_ref(port_name);
auto const & port = port_manager_->get_port(port_name);
Expand Down Expand Up @@ -842,7 +855,7 @@ std::vector<::ymmsl::Port> Instance::Impl::list_declared_ports_() const {
*/
void Instance::Impl::check_port_(
std::string const & port_name, Optional<int> slot,
bool allow_slot_out_of_range)
bool is_send, bool allow_slot_out_of_range)
{
if (!port_manager_->port_exists(port_name)) {
std::ostringstream oss;
Expand All @@ -852,8 +865,22 @@ void Instance::Impl::check_port_(
throw std::logic_error(oss.str());
}

auto & port = port_manager_->get_port(port_name);
if (is_send) {
if (!::ymmsl::allows_sending(port.oper)) {
std::ostringstream oss;
oss << " Port " << port_name << " does not allow sending messages.";
throw std::logic_error(oss.str());
}
} else {
if (!::ymmsl::allows_receiving(port.oper)) {
std::ostringstream oss;
oss << " Port " << port_name << " does not allow receiving messages.";
throw std::logic_error(oss.str());
}
}

if (slot.is_set()) {
auto & port = port_manager_->get_port(port_name);
if (!port.is_vector()) {
std::ostringstream oss;
oss << "Port \"" << port_name << "\" is not a vector port, but a slot was";
Expand Down
7 changes: 7 additions & 0 deletions libmuscle/cpp/src/libmuscle/instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ enum class InstanceFlags : int {
* `ymmsl.KeepsStateForNextUse.NECESSARY`).
*/
STATE_NOT_REQUIRED_FOR_NEXT_USE = 8,

/** Disable the checks whether the MMSF is strictly followed when
* sending/receiving messages.
*
* See MMSFValidator for a detailed description of the checks.
*/
SKIP_MMSF_SEQUENCE_CHECKS = 16,
};

inline InstanceFlags operator|(InstanceFlags a, InstanceFlags b) {
Expand Down
12 changes: 12 additions & 0 deletions libmuscle/cpp/src/libmuscle/tests/test_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <libmuscle/logging.cpp>
#include <libmuscle/mcp/data_pack.cpp>
#include <libmuscle/message.cpp>
#include <libmuscle/mmsf_validator.cpp>
#include <libmuscle/port.cpp>
#include <libmuscle/timestamp.cpp>

Expand Down Expand Up @@ -426,6 +427,8 @@ TEST_F(libmuscle_instance, get_setting) {
}

TEST_F(libmuscle_instance, list_ports) {
ASSERT_TRUE(port_manager_.list_ports.called_once_with());
port_manager_.list_ports.call_args_list.clear();
instance_.list_ports();
ASSERT_TRUE(port_manager_.list_ports.called_once_with());
}
Expand Down Expand Up @@ -554,10 +557,19 @@ TEST_F(libmuscle_instance, send_after_resize) {
instance_.send("out_r", mock_msg, 13);
}

TEST_F(libmuscle_instance, send_on_receiving_port) {
Message mock_msg(0.0);
ASSERT_THROW((instance_.send("in_v", mock_msg, 3)), std::logic_error);
}

TEST_F(libmuscle_instance, receive_on_invalid_port) {
ASSERT_THROW(instance_.receive("does_not_exist"), std::logic_error);
}

TEST_F(libmuscle_instance, receive_on_sending_port) {
ASSERT_THROW(instance_.receive("out_v", 3), std::logic_error);
}

TEST_F(libmuscle_instance, receive_f_init) {
Message mock_msg(0.0, Settings());
communicator_.receive_message.return_value = std::make_tuple(mock_msg, 0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <libmuscle/logging.cpp>
#include <libmuscle/mcp/data_pack.cpp>
#include <libmuscle/message.cpp>
#include <libmuscle/mmsf_validator.cpp>
#include <libmuscle/port.cpp>
#include <libmuscle/settings_manager.cpp>
#include <libmuscle/snapshot_manager.cpp>
Expand Down

0 comments on commit 2ac5533

Please sign in to comment.