Skip to content

Commit

Permalink
Catch exception in lightweight node (#623)
Browse files Browse the repository at this point in the history
* Add raii to aggregator to prevent stuck

Signed-off-by: Mishin, Ilya <[email protected]>

* Add test

Signed-off-by: Mishin, Ilya <[email protected]>

* Reuse tbb raii guard

Signed-off-by: Mishin, Ilya <[email protected]>

* Add header

Signed-off-by: Mishin, Ilya <[email protected]>

* execute_and_wait lightweight task

Signed-off-by: Mishin, Ilya <[email protected]>

* remove unused headers

Signed-off-by: Mishin, Ilya <[email protected]>

* remove space

Signed-off-by: Mishin, Ilya <[email protected]>

* remove std::function

Signed-off-by: Mishin, Ilya <[email protected]>

* noexcept lightweight body

Signed-off-by: Mishin, Ilya <[email protected]>

* fix cmake

Signed-off-by: Mishin, Ilya <[email protected]>

* fix space

Signed-off-by: Mishin, Ilya <[email protected]>

* gcc error

Signed-off-by: Mishin, Ilya <[email protected]>

* declval gateway_type

Signed-off-by: Mishin, Ilya <[email protected]>

* new tests

Signed-off-by: Mishin, Ilya <[email protected]>

* template tests

Signed-off-by: Mishin, Ilya <[email protected]>

* macro TBB_USE_EXCEPTIONS

Signed-off-by: Mishin, Ilya <[email protected]>

* Apply suggestions from code review

Co-authored-by: Aleksei Fedotov <[email protected]>

Co-authored-by: Aleksei Fedotov <[email protected]>
  • Loading branch information
Iliamish and aleksei-fedotov committed Dec 8, 2021
1 parent 4eec89f commit 324afd9
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 25 deletions.
16 changes: 10 additions & 6 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class function_input_base : public receiver<Input>, no_assign {
static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");

//! Constructor for function_input_base
function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority )
function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
: my_graph_ref(g), my_max_concurrency(max_concurrency)
, my_concurrency(0), my_priority(a_priority)
, my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
, my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
, my_predecessors(this)
, forwarder_busy(false)
Expand All @@ -75,7 +75,7 @@ class function_input_base : public receiver<Input>, no_assign {

//! Copy constructor
function_input_base( const function_input_base& src )
: function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {}
: function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}

//! Destructor
// The queue is allocated by the constructor for {multi}function_node.
Expand All @@ -87,7 +87,10 @@ class function_input_base : public receiver<Input>, no_assign {
}

graph_task* try_put_task( const input_type& t) override {
return try_put_task_impl(t, has_policy<lightweight, Policy>());
if ( my_is_no_throw )
return try_put_task_impl(t, has_policy<lightweight, Policy>());
else
return try_put_task_impl(t, std::false_type());
}

//! Adds src to the list of cached predecessors.
Expand Down Expand Up @@ -121,6 +124,7 @@ class function_input_base : public receiver<Input>, no_assign {
const size_t my_max_concurrency;
size_t my_concurrency;
node_priority_t my_priority;
const bool my_is_no_throw;
input_queue_type *my_queue;
predecessor_cache<input_type, null_mutex > my_predecessors;

Expand Down Expand Up @@ -357,7 +361,7 @@ class function_input : public function_input_base<Input, Policy, A, function_inp
template<typename Body>
function_input(
graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
: base_type(g, max_concurrency, a_priority)
: base_type(g, max_concurrency, a_priority, noexcept(body(input_type())))
, my_body( new function_body_leaf< input_type, output_type, Body>(body) )
, my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
}
Expand Down Expand Up @@ -492,7 +496,7 @@ class multifunction_input : public function_input_base<Input, Policy, A, multifu
// constructor
template<typename Body>
multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
: base_type(g, max_concurrency, a_priority)
: base_type(g, max_concurrency, a_priority, noexcept(body(input_type(), my_output_ports)))
, my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
Expand Down
8 changes: 4 additions & 4 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -2820,21 +2820,21 @@ class async_body_base: no_assign {

template<typename Input, typename Ports, typename Gateway, typename Body>
class async_body: public async_body_base<Gateway> {
private:
Body my_body;

public:
typedef async_body_base<Gateway> base_type;
typedef Gateway gateway_type;

async_body(const Body &body, gateway_type *gateway)
: base_type(gateway), my_body(body) { }

void operator()( const Input &v, Ports & ) {
void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval<gateway_type&>()))) {
my_body(v, *this->my_gateway);
}

Body get_body() { return my_body; }

private:
Body my_body;
};

//! Implements async node
Expand Down
110 changes: 96 additions & 14 deletions test/common/graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "config.h"

#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/task.h"
#include "oneapi/tbb/null_rw_mutex.h"
#include "oneapi/tbb/concurrent_unordered_set.h"

Expand Down Expand Up @@ -688,7 +689,7 @@ class native_loop_body {
public:
native_loop_body(NodeType& node) : my_node(node) {}

void operator()(int) const {
void operator()(int) const noexcept {
std::thread::id this_id = std::this_thread::get_id();
my_node.try_put(this_id);
}
Expand All @@ -701,9 +702,9 @@ class concurrency_checker_body {
concurrency_checker_body() { g_body_count = 0; }

template<typename gateway_type>
void operator()(const std::thread::id& input, gateway_type&) { increase_and_check(input); }
void operator()(const std::thread::id& input, gateway_type&) noexcept { increase_and_check(input); }

output_tuple_type operator()(const std::thread::id& input) {
output_tuple_type operator()(const std::thread::id& input) noexcept {
increase_and_check(input);
return output_tuple_type();
}
Expand Down Expand Up @@ -740,7 +741,7 @@ class native_loop_limited_body {
public:
native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier):
my_node(node), my_barrier(barrier) {}
void operator()(int) const {
void operator()(int) const noexcept {
std::thread::id this_id = std::this_thread::get_id();
my_node.try_put(this_id);
if(!lightweight_work_processed) {
Expand All @@ -760,6 +761,7 @@ struct condition_predicate {
std::atomic<unsigned> g_lightweight_count;
std::atomic<unsigned> g_task_count;

template <bool NoExcept>
class limited_lightweight_checker_body {
public:
limited_lightweight_checker_body() {
Expand All @@ -770,10 +772,9 @@ class limited_lightweight_checker_body {
private:
void increase_and_check(const std::thread::id& /*input*/) {
++g_body_count;
// TODO revamp: in order not to rely on scheduler functionality anymore add
// __TBB_EXTRA_DEBUG for counting the number of tasks actually created by the flow graph,
// hence consider moving lightweight testing into whitebox test for the flow graph.
bool is_inside_task = false;/*tbb::task::self().state() == tbb::task::executing;*/

bool is_inside_task = oneapi::tbb::task::current_context() != nullptr;

if(is_inside_task) {
++g_task_count;
} else {
Expand All @@ -785,10 +786,10 @@ class limited_lightweight_checker_body {
}
public:
template<typename gateway_type>
void operator()(const std::thread::id& input, gateway_type&) {
void operator()(const std::thread::id& input, gateway_type&) noexcept(NoExcept) {
increase_and_check(input);
}
output_tuple_type operator()(const std::thread::id& input) {
output_tuple_type operator()(const std::thread::id& input) noexcept(NoExcept) {
increase_and_check(input);
return output_tuple_type();
}
Expand All @@ -799,32 +800,113 @@ void test_limited_lightweight_execution(unsigned N, unsigned concurrency) {
CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
"Test for limited concurrency cannot be called with unlimited concurrency argument");
tbb::flow::graph g;
NodeType node(g, concurrency, limited_lightweight_checker_body());
NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/true>());
// Execute first body as lightweight, then wait for all other threads to fill internal buffer.
// Then unblock the lightweight thread and check if other body executions are inside oneTBB task.
utils::SpinBarrier barrier(N - concurrency);
utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
g.wait_for_all();
CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
// TODO revamp: enable the following checks once whitebox flow graph testing is ready for it.
// CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
// CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
work_submitted = false;
lightweight_work_processed = false;
}

template<typename NodeType>
void test_limited_lightweight_execution_with_throwing_body(unsigned N, unsigned concurrency) {
CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
"Test for limited concurrency cannot be called with unlimited concurrency argument");
tbb::flow::graph g;
NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/false>());
// Body is no noexcept, in this case it must be executed as tasks, instead of lightweight execution
utils::SpinBarrier barrier(N);
utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
g.wait_for_all();
CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
CHECK_MESSAGE(g_lightweight_count == 0, "Body needs to be executed with queueing policy");
CHECK_MESSAGE(g_task_count == N, "Body needs to be executed as task N times");
work_submitted = false;
lightweight_work_processed = false;
}

template <int Threshold>
struct throwing_body{
std::atomic<int>& my_counter;

throwing_body(std::atomic<int>& counter) : my_counter(counter) {}

template<typename input_type, typename gateway_type>
void operator()(const input_type&, gateway_type&) {
++my_counter;
if(my_counter == Threshold)
throw Threshold;
}

template<typename input_type>
output_tuple_type operator()(const input_type&) {
++my_counter;
if(my_counter == Threshold)
throw Threshold;
return output_tuple_type();
}
};

#if TBB_USE_EXCEPTIONS
//! Test excesption thrown in node with lightweight policy was rethrown by graph
template<template<typename, typename, typename> class NodeType>
void test_exception_ligthweight_policy(){
std::atomic<int> counter {0};
constexpr int threshold = 10;

using IndexerNodeType = oneapi::tbb::flow::indexer_node<int, int>;
using FuncNodeType = NodeType<IndexerNodeType::output_type, output_tuple_type, tbb::flow::lightweight>;
oneapi::tbb::flow::graph g;

IndexerNodeType indexer(g);
FuncNodeType tested_node(g, oneapi::tbb::flow::serial, throwing_body<threshold>(counter));
oneapi::tbb::flow::make_edge(indexer, tested_node);

utils::NativeParallelFor( threshold * 2, [&](int i){
if(i % 2)
std::get<1>(indexer.input_ports()).try_put(1);
else
std::get<0>(indexer.input_ports()).try_put(0);
});

bool catchException = false;
try
{
g.wait_for_all();
}
catch (const int& exc)
{
catchException = true;
CHECK_MESSAGE( exc == threshold, "graph.wait_for_all() rethrow current exception" );
}
CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" );
CHECK_MESSAGE( counter == threshold, "Graph must cancel all tasks after exception" );
}
#endif /* TBB_USE_EXCEPTIONS */

template<typename NodeType>
void test_lightweight(unsigned N) {
test_unlimited_lightweight_execution<NodeType>(N);
test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial);
test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2));

test_limited_lightweight_execution_with_throwing_body<NodeType>(N, tbb::flow::serial);
}

template<template<typename, typename, typename> class NodeType>
void test(unsigned N) {
typedef std::thread::id input_type;
typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type;
test_lightweight<node_type>(N);

#if TBB_USE_EXCEPTIONS
test_exception_ligthweight_policy<NodeType>();
#endif /* TBB_USE_EXCEPTIONS */
}

} // namespace lightweight_testing
Expand Down
6 changes: 5 additions & 1 deletion test/tbb/test_flow_graph_priorities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ struct DeciderBody {

struct AsyncSubmissionBody {
AsyncActivity* my_activity;
void operator()(data_type input, async_node_type::gateway_type& gateway) {
// It is important that async_node in the test executes without spawning a TBB task, because
// it passes the work to asynchronous thread, which unlocks the barrier that is waited
// by every execution thread (asynchronous thread and any TBB worker or main thread).
// This is why async_node's body marked noexcept.
void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
my_activity->submit(input, &gateway);
}
AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
Expand Down

0 comments on commit 324afd9

Please sign in to comment.