Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBB may not handle RAII struct very well in parallel_reduce #1251

Closed
LXYan2333 opened this issue Nov 9, 2023 · 9 comments
Closed

TBB may not handle RAII struct very well in parallel_reduce #1251

LXYan2333 opened this issue Nov 9, 2023 · 9 comments

Comments

@LXYan2333
Copy link

LXYan2333 commented Nov 9, 2023

Hello, I have an OpenMP parallized program, and I want to migrate to use TBB.

Baically it creates 3 matrices on each threads and do a lot of calculation. At the end, all these matrixs on different threads are added up to 3 final matrices.

So I choose tbb::parallel_reduce to parallize it:

struct res_struct {
    std::vector<double> a;
    std::vector<double> b;
    std::vector<double> c;
};

auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{std::vector<double>(m_size * m_size * batch_size, 0),
                   std::vector<double>(m_size * m_size * batch_size, 0),
                   std::vector<double>(m_size * m_size * batch_size, 0)},
        [&](const auto& range, res_struct res) {
            // do a lot work about res
            return res;
        },
        [&](res_struct res1, res_struct res2) {
            std::transform(res1.a.begin(),
                           res1.a.end(),
                           res2.a.begin(),
                           res1.a.begin(),
                           std::plus<double>());
            std::transform(res1.b.begin(),
                           res1.b.end(),
                           res2.b.begin(),
                           res1.b.begin(),
                           std::plus<double>());
            std::transform(res1.c.begin(),
                           res1.c.end(),
                           res2.c.begin(),
                           res1.c.begin(),
                           std::plus<double>());
            return res1;
        });

After migration, the program works as before, but the performance droped. The run time rised from 23.125s to 26.932s.

I did some tests using vtune, and found that memory access is significantly higher than before.

OpenMP (23s)

图片

TBB parallel_reduce (26s)

图片

and in the thread test, there is a lot of spin time:

Spin time of TBB parallel_reduce

图片

I guess it might be caused by TBB copied these matrices (instead of move them) in some cases, so I did a little demo:

#include <chrono>
#include <iostream>
#include <tbb/tbb.h>
#include <thread>

tbb::spin_mutex cout_mutex;

tbb::spin_mutex pig_count_mutex;
static int pig_count = 0;

struct Pig {

    int order;

    Pig() : order(pig_count) {
        tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
        pig_count++;
        std::cout << "I'm the " << order << " pig, "
                  << "construct!" << std::endl;
    };

    Pig(const Pig& other) {
        {
            tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
            order = pig_count;
            pig_count++;
        }
        std::cout << "I'm the " << order << " pig, "
                  << "copy construct from" << other.order << std::endl;
    };
    Pig(Pig&& other) noexcept {
        {
            tbb::spin_mutex::scoped_lock lock(pig_count_mutex);
            order = pig_count;
            pig_count++;
        }
        std::cout << "I'm the " << order << " pig, "
                  << "move construct from" << other.order << std::endl;
    };

    Pig& operator=(const Pig& other) {
        std::cout << "I'm the " << order << " pig, "
                  << "copy assignment from " << other.order << std::endl;
        return *this;
    };

    Pig& operator=(Pig&& other) noexcept {
        std::cout << "I'm the " << order << " pig, "
                  << "move assignment! from" << other.order << std::endl;
        return *this;
    };

    ~Pig() {
        std::cout << "I'm the " << order << " pig, "
                  << "destruct!" << std::endl;
    };

    void do_something() const {
        std::cout << "something is done to pig " << order << std::endl;
    }
};

int main() {
    {
        tbb::parallel_reduce(
            tbb::blocked_range<int>(0, 2),
            Pig{},
            [](const tbb::blocked_range<int>& range, Pig pig) {
                {
                    tbb::spin_mutex::scoped_lock lock(cout_mutex);
                    std::cout << "real body! range " << range.begin() << " "
                              << range.end() << std::endl;
                    pig.do_something();
                }
                std::this_thread::sleep_for(std::chrono::seconds(2));
                return pig;
            },
            [](Pig left, const Pig&) {
                {
                    tbb::spin_mutex::scoped_lock lock(cout_mutex);
                    std::cout << "reduction" << std::endl;
                    left.do_something();
                }
                std::this_thread::sleep_for(std::chrono::seconds(2));
                return left;
            });
    }
    std::cout << "reduce complete!" << std::endl;
}

run it:

$ taskset -c 0,1 ./pig
I'm the 0 pig, construct!
I'm the 1 pig, copy construct from0                  # why? since pig 1 is not used later, and is move assigned by pig 5
I'm the 2 pig, copy construct from1
real body! range 0 1
something is done to pig 2
I'm the 3 pig, copy construct from0                 # why? pig 4 can directly copy/move from pig 0
I'm the 4 pig, copy construct from3                 # since it is the last reduce element, I guess it can be directly move from pig 0?
real body! range 1 2
something is done to pig 4
I'm the 5 pig, move construct from2
I'm the 2 pig, destruct!
I'm the 1 pig, move assignment! from5
I'm the 5 pig, destruct!
I'm the 6 pig, move construct from4
I'm the 4 pig, destruct!
I'm the 3 pig, move assignment! from6
I'm the 6 pig, destruct!
I'm the 7 pig, copy construct from1                # why don't move from pig 5?
reduction
something is done to pig 7
I'm the 8 pig, move construct from7
I'm the 7 pig, destruct!
I'm the 1 pig, move assignment! from8
I'm the 8 pig, destruct!
I'm the 3 pig, destruct!
I'm the 9 pig, copy construct from1              # why? just move pig 1
I'm the 1 pig, destruct!
I'm the 9 pig, destruct!
I'm the 0 pig, destruct!
reduce complete!

In the ideal case, we need just copy pig{} once so 2 starter thread can have their own identity pig{}. However, the pig is copied 6 times instead. This may lead to observable performance penalty.

I guess I can always use TLS to address this issue.

@LXYan2333
Copy link
Author

LXYan2333 commented Nov 10, 2023

To further prove my guess, I tried to use POD struct instead of std container, and manage resource alloc and free manually. The code looks like this:

struct res_struct {
    double* a = nullptr;
    double* b = nullptr;
    double* c = nullptr;
    bool is_initialized = false;
};
auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{},
        [&](const auto& range, res_struct res) {
            if (res.is_initialized == false) {
                res.a = (double*)malloc(sizeof(double) * m_size
                                                   * m_size * batch_size);
                res.b = (double*)malloc(sizeof(double) * m_size
                                                   * m_size * batch_size);
                res.c = (double*)malloc(sizeof(double) * m_size
                                                     * m_size * batch_size);
                memset(res.a,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                memset(res.b,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                memset(res.c,
                       0,
                       sizeof(double) * m_size * m_size * batch_size);
                res.is_initialized = true;
            }
            // do a lot work about res
            return res;
        },
        [&](res_struct res1, res_struct res2) {
            if (res1.is_initialized == false) {
                return res2;
            }
            if (res2.is_initialized == false) {
                return res1;
            }
            for (int m = 0; m < batch_size; m++) {
                for (int i = 0; i < m_size * m_size; i++) {
                    res1.a[m * m_size * m_size + i] +=
                        res2.a[m * m_size * m_size + i];
                    res1.b[m * m_size * m_size + i] +=
                        res2.b[m * m_size * m_size + i];
                    res1.c[m * m_size * m_size + i] +=
                        res2.c[m * m_size * m_size + i];
                }
            }
            free(res2.j_matrix_tmp);
            free(res2.k_matrix_tmp);
            free(res2.k_matrix_b_tmp);
            return res1;
        });

Now TBB performs better than OpenMP:

图片

Please fix it……

@LXYan2333
Copy link
Author

LXYan2333 commented Nov 13, 2023

found this in the document:

Use oneapi::tbb::parallel_reduce when the objects are inexpensive to construct. It works even if the reduction operation is not commutative.
Use oneapi::tbb::parallel_for and oneapi::tbb::combinable if the reduction operation is commutative and instances of the type are expensive.

However, the combine() function of the combinable class is still sequential according to Pro TBB (and my own test). So what is the approrpiate method to parallel reduce large vectors?

@pavelkumbrasev
Copy link
Contributor

Hi @LXYan2333, what is the performance with oneapi::tbb::parallel_for and oneapi::tbb::combinable?
Basically, approach with pointers is a right W/A to avoid excessive copies.
Could you elaborate more on your use case? I see you didn't use static_partitioner in your example that might result in more task than actual threads (more copies).

@LXYan2333
Copy link
Author

LXYan2333 commented Nov 13, 2023

Hello, after benchmark several strategies, the parallel_for and tbb::combinable performs best. However, the combine method of tbb::combinable is actually serial.

I use ITTAPI and vtune to track the whole progress:

图片

the yellow task is the parallel parallel_for, and the blue task is the serial combine method.

Now my question is, is there any parallel way to reduce several large vectors/matrices stored on different threads?

  1. tbb::parallel_reduce can parallel reduce them, but the performance is slow due to unnecessary copy vectors (instead of move them)
  2. tbb::parallel_for + tbb::combinable performs better than tbb::parallel_reduce, but the reduction is serial.

@LXYan2333
Copy link
Author

BTW, directly use vector as reduction identity is actually an example of the book Pro TBB:

https://github.com/Apress/pro-TBB/blob/54f27a680e540ab66a3ffbe881d056904109feb4/ch05/fig_5_27.cpp#L69

on page 170.

@pavelkumbrasev
Copy link
Contributor

I'm thinking if there is a chance to do reduction on original vector without creating a copy of structure per thread since it introduces a lot of overhead (just working with ranges).
You can also try to parallelize reduction with ETS instead of tbb::combinable since ETS can provide a range that might be used for parallezation.

@LXYan2333
Copy link
Author

maybe this should be natively and efficiently supported by TBB, just like OpenMP?

https://github.com/OpenMP/Examples/blob/075683d57463d9251d483badd944e1a60e15192f/data_environment/sources/reduction.7.c#L22

@LXYan2333
Copy link
Author

Hi @LXYan2333, what is the performance with oneapi::tbb::parallel_for and oneapi::tbb::combinable? Basically, approach with pointers is a right W/A to avoid excessive copies. Could you elaborate more on your use case? I see you didn't use static_partitioner in your example that might result in more task than actual threads (more copies).

hello:

I wrote a demo:

#include <benchmark/benchmark.h>
#include <cstddef>
#include <iostream>
#include <tbb/tbb.h>
#include <valarray>

static constexpr size_t m_size = 8000000;

auto testset() {
    std::mt19937 gen(20240228);
    std::uniform_real_distribution<double> dist(0, 20.0);

    std::valarray<double> vec(m_size);

    for (size_t i = 0; i < m_size; ++i) {
        vec[i] = dist(gen);
    }

    return vec;
}

static void tbb_parallel_reduce(benchmark::State& state) {
    auto test_set = testset();
    for (auto _ : state) {
        std::valarray<double> res = tbb::parallel_reduce(
            tbb::blocked_range{0uz, m_size, 4},
            std::valarray<double>(m_size),
            [&](const auto& range, std::valarray<double> res) {
                for (size_t i = range.begin(); i < range.end(); ++i) {
                    res[i] += std::sin(std::pow(std::exp(test_set[i]), 8.0));
                }
                return res;
            },
            [&](std::valarray<double> res1, std::valarray<double> res2) {
                res1 += res2;
                return res1;
            });
        benchmark::DoNotOptimize(res);
    }
}

static void openmp_parallel_reduce(benchmark::State& state) {
    auto test_set = testset();
    for (auto _ : state) {
        std::valarray<double> res(m_size);
#pragma omp parallel
        {
            std::valarray<double> thread_private_res(m_size);
#pragma omp for schedule(dynamic, 4) nowait
            for (size_t i = 0; i < m_size; ++i) {
                thread_private_res[i] +=
                    std::sin(std::pow(std::exp(test_set[i]), 8.0));
            }
#pragma omp critical
            { res += thread_private_res; }
        }
        benchmark::DoNotOptimize(res);
    }
}

BENCHMARK(openmp_parallel_reduce);

BENCHMARK_MAIN();

It create a valarray on each thread and add something to that valarray. Finally, all valarrays are added into the res valarray.

I compile it using clang17 with parameter "-O3 -ffast-math -NDEBUG -march=native" and run this program on my own pc and get the result:

Run on (8 X 4000 MHz CPU s)
CPU Caches:
  L1 Data 48 KiB (x4)
  L1 Instruction 32 KiB (x4)
  L2 Unified 1280 KiB (x4)
  L3 Unified 8192 KiB (x1)
Load Average: 1.73, 1.79, 1.08
***WARNING*** CPU scaling is enabled, the benchmark real time measurements may be noisy and will incur extra overhead.
-----------------------------------------------------------------
Benchmark                       Time             CPU   Iterations
-----------------------------------------------------------------
tbb_parallel_reduce    7081146946 ns   6209464864 ns            1
openmp_parallel_reduce  504056713 ns    503715576 ns            1

tbb is significantly slower than opemmp.

I've checked the result and they all produce the same result.

So the conclution is, do not use anything that is expensive to copy as the identity within tbb::parallel_reduce? (I still feel this is a bug……)

Thank you pavelkumbrasev for you help and advice.

@kboyarinov
Copy link
Contributor

Hi @LXYan2333,
Sorry for replying after closing the issue.

Unfortunately, for parallel_reduce algorithm, we have to copy the identity element into each task to ensure each task has it's own value to reduce to without any additional synchonizations.
Regarding doing unnecessary copies of the value, we have added better rvalue-friendly API for parallel_reduce (see #1307). It is part of master branch now and would be included (and documented) as part one of the next releases of oneTBB.
For your use-case, I would propose to rewrite it in this way:

struct res_struct {
    res_struct() = default;

    std::vector<double> a;
    std::vector<double> b;
    std::vector<double> c;
    
    bool empty() {
        return a.empty() && b.empty() && c.empty();
    }
};

auto res = tbb::parallel_reduce(
        tbb::blocked_range{0, ij_end_index * (ij_end_index + 1) / 2, 1024},
        res_struct{}, // using empty res_struct as inexpensive to copy

        [&](const auto& range, res_struct&& res) {
            if (res.empty()) {
                res.a.resize(m_size * m_size * batch_size, 0);
                // same for b and c
            }
            
            // do a lot of work about res
            // assuming res is rvalue
            return std::move(res);
        },         
        
        [&](res_struct&& res1, res_struct&& res2) {
            // combine res1 and res2
            // as rvalues
            return std::move(res1);
        });

If you are forced to use previous TBB versions, consider using parallel_reduce interface with the complete Body structure (defining operator() and join) instead of lambda-friendly interface. It would be possible to achieve the same goal using it.
In case of any further questions, don't hesitate to contact us again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants