From 5ec1fa4bb1e78de157f75246456a63baefce0562 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Tue, 14 May 2024 17:28:02 +0800 Subject: [PATCH] some improve --- include/cinatra/ylt/coro_io/coro_io.hpp | 30 +++++++++++++- tests/test_cinatra.cpp | 54 +++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/include/cinatra/ylt/coro_io/coro_io.hpp b/include/cinatra/ylt/coro_io/coro_io.hpp index dbc22f04..54f5957a 100644 --- a/include/cinatra/ylt/coro_io/coro_io.hpp +++ b/include/cinatra/ylt/coro_io/coro_io.hpp @@ -350,19 +350,37 @@ struct coro_channel using return_type = R; using ValueType = std::pair; using asio::experimental::channel::channel; + coro_channel(coro_io::ExecutorWrapper<> *executor, size_t capacity) + : executor_(executor), + asio::experimental::channel( + executor->get_asio_executor(), capacity) {} + auto get_executor() { return executor_; } + + private: + coro_io::ExecutorWrapper<> *executor_; }; template inline coro_channel create_channel( size_t capacity, - asio::io_context::executor_type executor = - coro_io::get_global_block_executor()->get_asio_executor()) { + coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) { return coro_channel(executor, capacity); } +template +inline auto create_shared_channel( + size_t capacity, + coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) { + return std::make_shared>(executor, capacity); +} + template inline async_simple::coro::Lazy async_send( asio::experimental::channel &channel, T val) { + bool r = channel.try_send(std::error_code{}, val); + if (r) { + co_return std::error_code{}; + } callback_awaitor awaitor; co_return co_await awaitor.await_resume( [&, val = std::move(val)](auto handler) { @@ -376,6 +394,14 @@ template async_simple::coro::Lazy> inline async_receive(Channel &channel) { + using value_type = typename Channel::return_type; + value_type val; + bool r = channel.try_receive([&val](std::error_code, value_type result) { + val = result; + }); + if (r) { + co_return std::make_pair(std::error_code{}, val); + } callback_awaitor> awaitor; co_return co_await awaitor.await_resume([&](auto handler) { diff --git a/tests/test_cinatra.cpp b/tests/test_cinatra.cpp index 6a3dcf81..45d623eb 100644 --- a/tests/test_cinatra.cpp +++ b/tests/test_cinatra.cpp @@ -187,7 +187,61 @@ TEST_CASE("test cinatra::string SSO to no SSO") { CHECK(s == sum); } +async_simple::coro::Lazy send_data(auto &ch, size_t count) { + for (int i = 0; i < count; i++) { + co_await coro_io::async_send(ch, i); + } +} + +async_simple::coro::Lazy recieve_data(auto &ch, auto &vec, size_t count) { + while (true) { + if (vec.size() == count) { + std::cout << std::this_thread::get_id() << "\n"; + break; + } + + auto [ec, i] = co_await coro_io::async_receive(ch); + vec.push_back(i); + } +} + +TEST_CASE("test coro channel with multi thread") { + size_t count = 10000; + auto ch = coro_io::create_channel(count); + send_data(ch, count).via(ch.get_executor()).start([](auto &&) { + }); + + std::vector vec; + std::vector group; + for (int i = 0; i < 10; i++) { + group.emplace_back(std::thread([&]() { + async_simple::coro::syncAwait( + recieve_data(ch, vec, count).via(ch.get_executor())); + })); + } + for (auto &thd : group) { + thd.join(); + } + + for (int i = 0; i < count; i++) { + CHECK(vec.at(i) == i); + } +} + TEST_CASE("test coro channel") { + { + auto ch = coro_io::create_shared_channel(100); + auto ec = async_simple::coro::syncAwait( + coro_io::async_send(*ch, std::string("test"))); + CHECK(!ec); + + std::string val; + std::error_code err; + std::tie(err, val) = + async_simple::coro::syncAwait(coro_io::async_receive(*ch)); + CHECK(!err); + CHECK(val == "test"); + } auto ch = coro_io::create_channel(1000); auto ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 41)); CHECK(!ec);