Skip to content

Commit

Permalink
select coro
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Mar 16, 2024
1 parent 3dbb380 commit 0006d7b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 97 deletions.
74 changes: 4 additions & 70 deletions include/cinatra/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,76 +376,10 @@ async_simple::coro::Lazy<std::pair<
});
}

template <typename... Channel>
class select_t {
public:
select_t(Channel &...channels)
: coros_(std::make_tuple(get_lazy(channels)...)) {}

select_t(Channel &&...channels)
: coros_(std::make_tuple(get_lazy(channels)...)){};

template <typename... Func>
async_simple::coro::Lazy<void> on_recieve(Func... fn) {
std::tuple<Func...> tuple(std::move(fn)...);
helper<std::tuple<Func...>> helper{std::move(tuple)};
co_await std::apply(helper, coros_);
}

private:
template <typename T>
auto get_lazy(T &t) {
using U = std::decay_t<T>;
if constexpr (util::is_specialization_v<U, coro_channel>) {
return std::move(async_receive(t));
}
else {
return std::move(t);
}
}

template <typename Tuple>
struct helper {
template <size_t Idx>
void call(auto &fn, auto &result) {
using ValueType = std::remove_cvref_t<decltype(std::get<Idx>(result))>;
using Inner =
std::remove_cvref_t<decltype(std::declval<ValueType>().value())>;
if constexpr (std::is_same_v<Inner, async_simple::Try<void>> ||
std::is_void_v<Inner>) {
fn();
}
else {
fn(std::move(std::get<Idx>(result).value()));
}
}

template <class Result, std::size_t... Is>
void tuple_switch_impl(std::size_t i, Tuple &t, Result &result,
std::index_sequence<Is...>) {
((void)(i == Is && (call<Is>(std::get<Is>(t), result), false)), ...);
}

template <class Result>
void tuple_switch(std::size_t i, Tuple &t, Result &result) {
tuple_switch_impl(i, t, result,
std::make_index_sequence<std::tuple_size_v<Tuple>>{});
}

async_simple::coro::Lazy<void> operator()(auto &&...tests) {
auto result =
co_await async_simple::coro::collectAny(std::move(tests)...);

tuple_switch(result.index(), tuple_, result);
}

Tuple tuple_;
};

std::tuple<async_simple::coro::Lazy<
typename std::remove_reference_t<Channel>::ValueType>...>
coros_;
};
template <typename... T>
auto select(T &&...args) {
return async_simple::coro::collectAny(std::forward<T>(args)...);
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
Expand Down
103 changes: 76 additions & 27 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,45 +209,94 @@ TEST_CASE("test coro channel") {

async_simple::coro::Lazy<void> test_select_channel() {
using namespace coro_io;
using namespace async_simple;
using namespace async_simple::coro;

auto ch1 = coro_io::create_channel<int>(1000);
auto ch2 = coro_io::create_channel<int>(1000);

co_await async_send(ch1, 41);
co_await async_send(ch2, 42);

std::array<int, 2> arr{41, 42};
int val;

size_t index =
co_await select(std::pair{async_receive(ch1),
[&val](auto value) {
auto [ec, r] = value.value();
val = r;
}},
std::pair{async_receive(ch2), [&val](auto value) {
auto [ec, r] = value.value();
val = r;
}});

CHECK(val == arr[index]);

co_await async_send(ch1, 41);
co_await async_send(ch2, 42);

co_await select_t(ch1, ch2).on_recieve(
[](std::pair<std::error_code, int> val) {
std::cout << val.second << "\n";
},
[](std::pair<std::error_code, int> val) {
std::cout << val.second << "\n";
});
std::vector<Lazy<std::pair<std::error_code, int>>> vec;
vec.push_back(async_receive(ch1));
vec.push_back(async_receive(ch2));

index = co_await select(std::move(vec), [&](size_t i, auto result) {
val = result.value().second;
});
CHECK(val == arr[index]);

period_timer timer1(coro_io::get_global_executor());
timer1.expires_after(100ms);
period_timer timer2(coro_io::get_global_executor());
timer2.expires_after(200ms);

co_await select_t(timer1.async_await(), timer2.async_await())
.on_recieve(
[](bool) {
std::cout << "timer1\n";
},
[](bool) {
std::cout << "timer2\n";
});

co_await select_t(coro_io::post([] {
}),
coro_io::post([] {
}))
.on_recieve(
[]() {
std::cout << "post1\n";
},
[]() {
std::cout << "post2\n";
});
int val1;
index = co_await select(std::pair{timer1.async_await(),
[&](auto val) {
CHECK(val.value());
val1 = 0;
}},
std::pair{timer2.async_await(), [&](auto val) {
CHECK(val.value());
val1 = 0;
}});
CHECK(index == val1);

int val2;
index = co_await select(std::pair{coro_io::post([] {
}),
[&](auto) {
std::cout << "post1\n";
val2 = 0;
}},
std::pair{coro_io::post([] {
}),
[&](auto) {
std::cout << "post2\n";
val2 = 1;
}});
CHECK(index == val2);

co_await async_send(ch1, 43);
auto lazy = coro_io::post([] {
});

int val3 = -1;
index = co_await select(std::pair{async_receive(ch1),
[&](auto result) {
val3 = result.value().second;
}},
std::pair{std::move(lazy), [&](auto) {
val3 = 0;
}});

if (index == 0) {
CHECK(val3 == 43);
}
else if (index == 1) {
CHECK(val3 == 0);
}
}

TEST_CASE("test select coro channel") {
Expand Down

0 comments on commit 0006d7b

Please sign in to comment.