Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add RAII to aggregator #596

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions include/oneapi/tbb/detail/_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "_assert.h"
#include "_utils.h"
#include "_template_helpers.h"
#include <atomic>
#if !__TBBMALLOC_BUILD // TODO: check this macro with TBB Malloc
#include "../profiling.h"
Expand Down Expand Up @@ -114,8 +115,13 @@ class aggregator_generic {
// only one thread can possibly spin here at a time
spin_wait_until_eq(handler_busy, uintptr_t(0));
call_itt_notify(acquired, &handler_busy);

// acquire fence not necessary here due to causality rule and surrounding atomics
handler_busy.store(1, std::memory_order_relaxed);
auto handler_lock = make_raii_guard([&](){
// release the handler
handler_busy.store(0, std::memory_order_release);
});

// ITT note: &pending_operations tag covers access to the handler_busy flag
// itself. Capturing the state of the pending_operations signifies that
Expand All @@ -127,9 +133,6 @@ class aggregator_generic {

// handle all the operations
handle_operations(op_list);

// release the handler
handler_busy.store(0, std::memory_order_release);
}

// An atomically updated list (aka mailbox) of pending operations
Expand Down
42 changes: 42 additions & 0 deletions test/tbb/test_multifunction_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,42 @@ void test_ports_return_references() {
test_output_ports_return_ref(mf_node);
}

void test_exception_ligthweight_policy(){
using IndexerNodeType = oneapi::tbb::flow::indexer_node<int,int>;
using MultifunctionNodeType = oneapi::tbb::flow::multifunction_node<IndexerNodeType::output_type,
std::tuple<int>, oneapi::tbb::flow::lightweight>;

oneapi::tbb::flow::graph g;
auto inputNodeBody = [](oneapi::tbb::flow_control &){ return 1; };
auto multifunctionNodeBody = [](MultifunctionNodeType::input_type, MultifunctionNodeType::output_ports_type)
{
throw std::exception();
};

oneapi::tbb::flow::input_node<int> input1(g, inputNodeBody);
oneapi::tbb::flow::input_node<int> input2(g, inputNodeBody);

IndexerNodeType indexer(g);
MultifunctionNodeType multi(g, oneapi::tbb::flow::serial, multifunctionNodeBody);
oneapi::tbb::flow::make_edge(indexer, multi);
oneapi::tbb::flow::make_edge(input1, indexer);
oneapi::tbb::flow::make_edge(input2, oneapi::tbb::flow::input_port<1>(indexer));

input1.activate();
input2.activate();

bool catchException = false;
try
{
g.wait_for_all();
}
catch (const std::exception&)
{
catchException = true;
}
CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" );
}

#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
#include <array>
#include <vector>
Expand Down Expand Up @@ -547,6 +583,12 @@ TEST_CASE("Lightweight testing"){
lightweight_testing::test<tbb::flow::multifunction_node>(10);
}

//! Test exception thrown in node with lightweight policy was rethrown by graph
//! \brief \ref error_guessing
TEST_CASE("Exception in lightweight node"){
test_exception_ligthweight_policy();
}

#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
//! Test follows and precedes API
//! \brief \ref error_guessing
Expand Down