Skip to content

Commit

Permalink
coro channel (#533)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Feb 29, 2024
1 parent fab9204 commit 9e2e7ef
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
24 changes: 24 additions & 0 deletions include/cinatra/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/experimental/channel.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
#include <asio/read_at.hpp>
Expand Down Expand Up @@ -332,6 +333,29 @@ post(Func func,
co_return co_await awaitor.await_resume(helper);
}

template <typename T>
async_simple::coro::Lazy<std::error_code> async_send(
asio::experimental::channel<void(std::error_code, T)> &channel, T val) {
callback_awaitor<std::error_code> awaitor;
co_return co_await awaitor.await_resume(
[&, val = std::move(val)](auto handler) {
channel.async_send({}, std::move(val), [handler](const auto &ec) {
handler.set_value_then_resume(ec);
});
});
}

template <typename R>
async_simple::coro::Lazy<std::pair<std::error_code, R>> async_receive(
asio::experimental::channel<void(std::error_code, R)> &channel) {
callback_awaitor<std::pair<std::error_code, R>> awaitor;
co_return co_await awaitor.await_resume([&](auto handler) {
channel.async_receive([handler](auto ec, auto val) {
handler.set_value_then_resume(std::make_pair(ec, std::move(val)));
});
});
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
AsioBuffer &&buffer) {
Expand Down
21 changes: 21 additions & 0 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ TEST_CASE("test cinatra::string SSO to no SSO") {
CHECK(s == sum);
}

TEST_CASE("test coro channel") {
auto ctx = coro_io::get_global_block_executor()->get_asio_executor();
asio::experimental::channel<void(std::error_code, int)> ch(ctx, 10000);
auto ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 41));
CHECK(!ec);
ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 42));
CHECK(!ec);

std::error_code err;
int val;
std::tie(err, val) =
async_simple::coro::syncAwait(coro_io::async_receive<int>(ch));
CHECK(!err);
CHECK(val == 41);

std::tie(err, val) =
async_simple::coro::syncAwait(coro_io::async_receive<int>(ch));
CHECK(!err);
CHECK(val == 42);
}

async_simple::coro::Lazy<void> test_collect_all() {
asio::io_context ioc;
std::thread thd([&] {
Expand Down

0 comments on commit 9e2e7ef

Please sign in to comment.