Skip to content

Commit

Permalink
test: Add coverage for --shm-metadata-msg-size
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Jun 13, 2023
1 parent 3decac5 commit 25614e3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 37 deletions.
63 changes: 57 additions & 6 deletions test/message/_message.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ auto AsStringView(Message const& msg) -> string_view
return {static_cast<char const*>(msg.GetData()), msg.GetSize()};
}

auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void
auto RunPushPullWithMsgResize(string const & transport, string const & _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));

Channel push{"Push", "push", factory};
Expand Down Expand Up @@ -100,12 +103,15 @@ auto RunPushPullWithMsgResize(string const & transport, string const & _address)
}
}

auto RunMsgRebuild(const string& transport) -> void
auto RunMsgRebuild(const string& transport, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));

size_t const msgSize{100};
Expand Down Expand Up @@ -134,12 +140,15 @@ auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> boo
return (reinterpret_cast<uintptr_t>(msg.GetData()) % static_cast<size_t>(alignment)) == 0; // NOLINT
}

auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void
auto RunPushPullWithAlignment(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));

Channel push{"Push", "push", factory};
Expand Down Expand Up @@ -189,12 +198,15 @@ auto RunPushPullWithAlignment(string const& transport, string const& _address) -
ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32));
}

auto EmptyMessage(string const& transport, string const& _address) -> void
auto EmptyMessage(string const& transport, string const& _address, bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config));

Channel push{"Push", "push", factory};
Expand Down Expand Up @@ -241,12 +253,15 @@ auto EmptyMessage(string const& transport, string const& _address) -> void

// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopy() -> void
auto ZeroCopy(bool expandedShmMetadata = false) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config));

unique_ptr<string> str(make_unique<string>("asdf"));
Expand All @@ -272,7 +287,7 @@ auto ZeroCopy() -> void

// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address) -> void
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
{
ProgOptions config1;
ProgOptions config2;
Expand All @@ -285,6 +300,12 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void
config2.SetProperty<bool>("shm-monitor", true);
// ref counts should be accessible accross different segments
config2.SetProperty<uint16_t>("shm-segment-id", 2);
if (expandedShmMetadata) {
config1.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
if (expandedShmMetadata) {
config2.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}
auto factory1(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config1));
auto factory2(TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config2));

Expand Down Expand Up @@ -378,6 +399,11 @@ TEST(Resize, shmem) // NOLINT
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize");
}

TEST(Resize, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithMsgResize("shmem", "ipc://test_message_resize", true);
}

TEST(Rebuild, zeromq) // NOLINT
{
RunMsgRebuild("zeromq");
Expand All @@ -388,11 +414,21 @@ TEST(Rebuild, shmem) // NOLINT
RunMsgRebuild("shmem");
}

TEST(Rebuild, shmem_expanded_metadata) // NOLINT
{
RunMsgRebuild("shmem", true);
}

TEST(Alignment, shmem) // NOLINT
{
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment");
}

TEST(Alignment, shmem_expanded_metadata) // NOLINT
{
RunPushPullWithAlignment("shmem", "ipc://test_message_alignment", true);
}

TEST(Alignment, zeromq) // NOLINT
{
RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment");
Expand All @@ -408,14 +444,29 @@ TEST(EmptyMessage, shmem) // NOLINT
EmptyMessage("shmem", "ipc://test_empty_message");
}

TEST(EmptyMessage, shmem_expanded_metadata) // NOLINT
{
EmptyMessage("shmem", "ipc://test_empty_message", true);
}

TEST(ZeroCopy, shmem) // NOLINT
{
ZeroCopy();
}

TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
{
ZeroCopy(true);
}

TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
}

TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
}

} // namespace
17 changes: 12 additions & 5 deletions test/protocols/_pair.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;

auto RunPair(string transport) -> void
auto RunPair(const string& transport, const string& extraDeviceCmdArgs) -> void
{
size_t session{fair::mq::tools::UuidHash()};
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
Expand All @@ -38,6 +38,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=bind,address=" << address;
pairleft = execute(cmd.str(), "[PAIR L]");
});
Expand All @@ -52,6 +53,7 @@ auto RunPair(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pair,method=connect,address=" << address;
pairright = execute(cmd.str(), "[PAIR R]");
});
Expand All @@ -65,14 +67,19 @@ auto RunPair(string transport) -> void
exit(pairleft.exit_code + pairright.exit_code);
}

TEST(Pair, SingleMsg_MP_tcp_zeromq)
TEST(Pair, SingleMsg_MultiThreaded_tcp_zeromq)
{
EXPECT_EXIT(RunPair("zeromq"), ::testing::ExitedWithCode(0), "PAIR test successfull");
EXPECT_EXIT(RunPair("zeromq", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
}

TEST(Pair, SingleMsg_MP_tcp_shmem)
TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem)
{
EXPECT_EXIT(RunPair("shmem"), ::testing::ExitedWithCode(0), "PAIR test successfull");
EXPECT_EXIT(RunPair("shmem", ""), ::testing::ExitedWithCode(0), "PAIR test successfull");
}

TEST(Pair, SingleMsg_MultiThreaded_tcp_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPair("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PAIR test successfull");
}

} // namespace
17 changes: 12 additions & 5 deletions test/protocols/_push_pull.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ using namespace std;
using namespace fair::mq::test;
using namespace fair::mq::tools;

auto RunPushPull(string transport) -> void
auto RunPushPull(string transport, const string& extraDeviceCmdArgs) -> void
{
size_t session(fair::mq::tools::UuidHash());
string ipcFile("/tmp/fmq_" + to_string(session) + "_data_" + transport);
Expand All @@ -38,6 +38,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=push,method=bind,address=" << address;
push = execute(cmd.str(), "[PUSH]");
});
Expand All @@ -52,6 +53,7 @@ auto RunPushPull(string transport) -> void
<< " --shm-segment-size 100000000"
<< " --session " << session
<< " --color false"
<< extraDeviceCmdArgs
<< " --channel-config name=data,type=pull,method=connect,address=" << address;
pull = execute(cmd.str(), "[PULL]");
});
Expand All @@ -65,14 +67,19 @@ auto RunPushPull(string transport) -> void
exit(push.exit_code + pull.exit_code);
}

TEST(PushPull, SingleMsg_MP_ipc_zeromq)
TEST(PushPull, SingleMsg_MultiThreaded_ipc_zeromq)
{
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
EXPECT_EXIT(RunPushPull("zeromq", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}

TEST(PushPull, SingleMsg_MP_ipc_shmem)
TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem)
{
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
EXPECT_EXIT(RunPushPull("shmem", ""), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}

TEST(PushPull, SingleMsg_MultiThreaded_ipc_shmem_expanded_metadata)
{
EXPECT_EXIT(RunPushPull("shmem", " --shm-metadata-msg-size 2048"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}

} // namespace
62 changes: 44 additions & 18 deletions test/protocols/_push_pull_multipart.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ namespace
using namespace std;
using namespace fair::mq;

auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void {
auto RunSingleThreadedMultipart(string transport, string address1, string address2, bool expandedShmMetadata) -> void {

fair::mq::ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}

auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);

Expand Down Expand Up @@ -104,13 +107,16 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres
}
}

auto RunMultiThreadedMultipart(string transport, string address1) -> void
auto RunMultiThreadedMultipart(string transport, string address1, bool expandedShmMetadata) -> void
{
ProgOptions config;
config.SetProperty<string>("session", tools::Uuid());
config.SetProperty<int>("io-threads", 1);
config.SetProperty<size_t>("shm-segment-size", 20000000); // NOLINT
config.SetProperty<bool>("shm-monitor", true);
if (expandedShmMetadata) {
config.SetProperty<size_t>("shm-metadata-msg-size", 2048);
}

auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);

Expand Down Expand Up @@ -147,44 +153,64 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void
puller.join();
}

TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_inproc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2", false);
}

TEST(PushPull, Multipart_SingleThreaded_inproc_shmem) // NOLINT
{
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", false);
}

TEST(PushPull, Multipart_SingleThreaded_inproc_shmem_expanded_metadata) // NOLINT
{
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2", true);
}

TEST(PushPull, Multipart_SingleThreaded_ipc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_1", "ipc://test_Multipart_SingleThreaded_ipc_zeromq_2", false);
}

TEST(PushPull, Multipart_SingleThreaded_ipc_shmem) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2");
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", false);
}

TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT
TEST(PushPull, Multipart_SingleThreaded_ipc_shmem_expanded_metadata) // NOLINT
{
RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2");
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_SingleThreaded_ipc_shmem_1", "ipc://test_Multipart_SingleThreaded_ipc_shmem_2", true);
}

TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_inproc_zeromq) // NOLINT
{
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
RunMultiThreadedMultipart("zeromq", "inproc://test_1", false);
}

TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem) // NOLINT
{
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
RunMultiThreadedMultipart("shmem", "inproc://test_1", false);
}

TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_inproc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "inproc://test_1");
RunMultiThreadedMultipart("shmem", "inproc://test_1", true);
}

TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_ipc_zeromq) // NOLINT
{
RunMultiThreadedMultipart("shmem", "inproc://test_1");
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MultiThreaded_ipc_zeromq_1", false);
}

TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem) // NOLINT
{
RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1");
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", false);
}

TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT
TEST(PushPull, Multipart_MultiThreaded_ipc_shmem_expanded_metadata) // NOLINT
{
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1");
RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MultiThreaded_ipc_shmem_1", true);
}

} // namespace
Loading

0 comments on commit 25614e3

Please sign in to comment.