Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): add shared_task #76

Merged
merged 8 commits into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions example/multi_fan_out.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <co_context/co/when_all.hpp>
#include <co_context/lazy_io.hpp>
#include <co_context/shared_task.hpp>
#include <co_context/utility/mpl.hpp>
#include <iostream>

using namespace co_context;

shared_task<std::string> fan_out() {
std::cout << "fan_out(): start\n";
co_await timeout(std::chrono::seconds{1});
std::cout << "fan_out(): done\n";
co_return "string from fan_out()";
}

task<size_t> mapped_task(shared_task<std::string> dependency) {
auto result = co_await dependency;
std::cout << "post_task(): " << result << "\n";
co_return result.size();
}

template<typename T>
task<void> reduce_task(task<T> all_task) {
auto results = co_await all_task;
constexpr size_t n = std::tuple_size_v<T>;
size_t sum_size = 0;
mpl::static_for<0, n>([&](auto i) { sum_size += std::get<i>(results); });
std::cout << "reduce_task(): total bytes: " << sum_size << "\n";
}

int main() {
io_context ctx;
{
auto f = fan_out();
auto maps = all(mapped_task(f), mapped_task(f), mapped_task(f));
auto reduce = reduce_task(std::move(maps));
ctx.co_spawn(std::move(reduce));
}
ctx.start();
ctx.join();
return 0;
}
6 changes: 5 additions & 1 deletion include/co_context/co/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ class mutex final {
public:
explicit lock_awaiter(mutex &mtx) noexcept
: mtx(mtx)
, resume_ctx(detail::this_thread.ctx) {}
, resume_ctx(detail::this_thread.ctx) {
assert(
resume_ctx != nullptr && "locking mutex without an io_context"
);
}

static constexpr bool await_ready() noexcept { return false; }

Expand Down
10 changes: 5 additions & 5 deletions include/co_context/co/when_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ template<typename... Ts>
using to_all_meta_t = typename clear_void_t<Ts...>::template to<all_meta>;

template<safety is_thread_safe, size_t idx, typename... Ts>
task<void> evaluate_to(
task<void> all_evaluate_to(
to_all_meta_t<Ts...> &meta, task<mpl::select_t<idx, Ts...>> &&node
) {
using node_return_type = mpl::select_t<idx, Ts...>;
Expand Down Expand Up @@ -107,17 +107,17 @@ task<void> evaluate_to(
namespace co_context {

template<safety is_thread_safe = safety::safe, typename... Ts>
task<detail::tuple_or_void<Ts...>> all(task<Ts> &&...node) {
task<detail::tuple_or_void<Ts...>> all(task<Ts>... node) {
constexpr size_t n = sizeof...(Ts);
static_assert(n >= 2, "too few tasks for `all(...)`");

using meta_type = detail::to_all_meta_t<Ts...>;
meta_type meta{co_await lazy::who_am_i(), n};

auto spawn_all = [&]<size_t... idx>(std::index_sequence<idx...>) {
(...,
co_spawn(evaluate_to<is_thread_safe, idx, Ts...>(meta, std::move(node))
));
(..., co_spawn(all_evaluate_to<is_thread_safe, idx, Ts...>(
meta, std::move(node)
)));
};

if constexpr (is_thread_safe) {
Expand Down
4 changes: 2 additions & 2 deletions include/co_context/co/when_any.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ task<void> any_evaluate_to(
namespace co_context {

template<safety is_thread_safe = safety::safe, typename... Ts>
task<detail::any_return_type<Ts...>> any(task<Ts> &&...node) {
task<detail::any_return_type<Ts...>> any(task<Ts>... node) {
constexpr uint32_t n = sizeof...(Ts);
static_assert(n >= 2, "too few tasks for `any(...)`");

Expand Down Expand Up @@ -266,7 +266,7 @@ namespace co_context {

template<safety is_thread_safe = safety::safe, typename... Ts>
task<detail::some_return_type<Ts...>>
some(uint32_t min_complete, task<Ts> &&...node) {
some(uint32_t min_complete, task<Ts>... node) {
constexpr uint32_t n = sizeof...(Ts);
static_assert(n >= 2, "too few tasks for `some(...)`");
assert(n >= min_complete && "too few tasks for `some(...)`");
Expand Down
6 changes: 3 additions & 3 deletions include/co_context/detail/lazy_io_awaiter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,16 +877,16 @@ class lazy_resume_on {
static constexpr bool await_ready() noexcept { return false; }

void await_suspend(std::coroutine_handle<> current) noexcept {
resume_ctx.worker.co_spawn_auto(current);
resume_ctx->worker.co_spawn_auto(current);
}

constexpr void await_resume() const noexcept {}

explicit lazy_resume_on(co_context::io_context &resume_context) noexcept
: resume_ctx(resume_context) {}
: resume_ctx(&resume_context) {}

private:
co_context::io_context &resume_ctx;
co_context::io_context *resume_ctx;
};

/****************************
Expand Down
23 changes: 23 additions & 0 deletions include/co_context/detail/type_traits.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <type_traits> // IWYU pragma: export

namespace co_context::detail {

template<typename T>
struct remove_rvalue_reference {
using type = T;
};

template<typename T>
struct remove_rvalue_reference<T &&> {
using type = T;
};

template<typename T>
using remove_rvalue_reference_t = typename remove_rvalue_reference<T>::type;

template<typename Awaiter>
using get_awaiter_result_t = decltype(std::declval<Awaiter>().await_resume());

} // namespace co_context::detail
3 changes: 2 additions & 1 deletion include/co_context/detail/worker_meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@

#include <coroutine>
#include <cstdint>
#if CO_CONTEXT_IS_USING_EVENTFD
#include <mutex>
#include <queue>
#include <thread>
#endif

#if CO_CONTEXT_IS_USING_EVENTFD
#include <sys/eventfd.h>
Expand Down
7 changes: 6 additions & 1 deletion include/co_context/io_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
#include <uring/uring.hpp>

#include <cstdint>
#include <queue>
#include <sys/types.h>
#include <thread>

namespace co_context {

namespace detail {
class lazy_resume_on;
class shared_task_promise_base;
} // namespace detail

using config::cache_line_size;

class [[nodiscard]] io_context final {
Expand Down Expand Up @@ -135,6 +139,7 @@ class [[nodiscard]] io_context final {
friend class co_context::condition_variable;
friend class co_context::counting_semaphore;
friend class co_context::detail::lazy_resume_on;
friend class co_context::detail::shared_task_promise_base;
}; // class io_context

inline void io_context::co_spawn(task<void> &&entrance) noexcept {
Expand Down
Loading
Loading