From d25041e71719699f177cffcee0d9a27fe8379819 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Wed, 3 Jul 2024 17:33:46 +0800 Subject: [PATCH] base --- include/cinatra/ylt/coro_io/coro_file.hpp | 412 +++++++++++++++++++++- tests/test_corofile.cpp | 69 +++- 2 files changed, 478 insertions(+), 3 deletions(-) diff --git a/include/cinatra/ylt/coro_io/coro_file.hpp b/include/cinatra/ylt/coro_io/coro_file.hpp index be35cb71..5f5b813e 100644 --- a/include/cinatra/ylt/coro_io/coro_file.hpp +++ b/include/cinatra/ylt/coro_io/coro_file.hpp @@ -22,7 +22,9 @@ #include #include #include +#include +#include "async_simple/coro/SyncAwait.h" #include "io_context_pool.hpp" #if defined(ENABLE_FILE_IO_URING) #include @@ -108,11 +110,417 @@ enum class read_type { pread, }; +enum class read_mode { seq, random }; + +enum class async_mode { native_async, thread_pool }; + +constexpr flags to_flags(std::ios::ios_base::openmode mode) { + flags access = flags::read_write; + + if ((mode & (std::ios::app)) != 0) + access = flags::append; + + if ((mode & (std::ios::trunc)) != 0) + access = flags::truncate; + + if ((mode & (std::ios::in | std::ios::out)) != 0) + access = flags::read_write; + else if ((mode & std::ios::in) != 0) + access = flags::read_only; + else if ((mode & std::ios::out) != 0) + access = flags::write_only; + + return access; +} + +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) +template +inline bool open_native_async_file(File &file, Executor &executor, + std::string_view filepath, + flags open_flags) { + if (file.is_open()) { + return true; + } + + try { + if constexpr (seq) { + file = asio::stream_file(executor.get_asio_executor(), filepath, + static_cast(open_flags)); + } + else { + file = asio::random_access_file( + executor.get_asio_executor(), filepath, + static_cast(open_flags)); + } + } catch (std::exception &ex) { + std::cout << "line " << __LINE__ << " coro_file open failed" << ex.what() + << "\n"; + return false; + } + + return true; +} +#endif + +enum class execution_type { native_async, thread_pool }; + +template +class seq_coro_file { + public: + seq_coro_file(coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : seq_coro_file(executor->get_asio_executor()) {} + + seq_coro_file(asio::io_context::executor_type executor) + : executor_wrapper_(executor) {} + + bool open(std::string_view filepath, + std::ios::ios_base::openmode open_flags) { + if constexpr (execute_type == execution_type::thread_pool) { + return open_stream_file_in_pool(filepath, open_flags); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + return open_native_async_file(async_seq_file_, executor_wrapper_, + filepath, to_flags(open_flags), true); +#else + return open_stream_file_in_pool(filepath, open_flags); +#endif + } + } + + async_simple::coro::Lazy> async_read( + char *buf, size_t size) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_read_write({buf, size}); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + auto [ec, read_size] = co_await coro_io::async_read( + async_seq_file_, asio::buffer(buf, size)); + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); + } + + co_return std::make_pair(std::error_code{}, read_size); +#else + co_return co_await async_read_write({buf, size}); +#endif + } + } + + template + async_simple::coro::Lazy> async_read_write( + std::span buf) { + auto result = co_await coro_io::post( + [this, buf] { + if constexpr (is_read) { + if (frw_seq_file_.read(buf.data(), buf.size())) { + return std::pair(std::error_code{}, + frw_seq_file_.tellg()); + } + } + else { + if (frw_seq_file_.write(buf.data(), buf.size())) { + return std::pair(std::error_code{}, + buf.size()); + } + } + + return std::pair( + std::make_error_code(std::errc::io_error), 0); + }, + &executor_wrapper_); + + co_return result.value(); + } + + async_simple::coro::Lazy> async_write( + std::string_view buf) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_read_write( + std::span(const_cast(buf.data()), buf.size())); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + auto [ec, size] = + co_await coro_io::async_write(async_seq_file_, asio::buffer(buf)); + co_return std::make_pair(ec, size); +#else + co_return co_await async_read_write( + std::span(const_cast(buf.data()), buf.size())); +#endif + } + } + +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + asio::stream_file &get_async_stream_file() { return async_seq_file_; } +#endif + + std::fstream &get_stream_file() { return frw_seq_file_; } + + bool is_open() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_.is_open()) { + return true; + } +#endif + return frw_seq_file_.is_open(); + } + + bool eof() { return eof_; } + + void close() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_.is_open()) { + std::error_code ec; + async_seq_file_.close(ec); + } +#endif + if (frw_seq_file_) { + frw_seq_file_.close(); + } + } + + bool is_in_thread_pool() { return frw_seq_file_.is_open(); } + + private: + bool open_stream_file_in_pool(std::string_view filepath, + std::ios::ios_base::openmode flags) { + if (frw_seq_file_.is_open()) { + return true; + } + auto coro_func = coro_io::post( + [this, flags, filepath] { + frw_seq_file_.open(filepath.data(), flags); + if (!frw_seq_file_.is_open()) { + std::cout << "line " << __LINE__ << " coro_file open failed " + << filepath << "\n"; + return false; + } + return true; + }, + &executor_wrapper_); + auto result = async_simple::coro::syncAwait(coro_func); + return result.value(); + } + + coro_io::ExecutorWrapper<> executor_wrapper_; +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + asio::stream_file async_seq_file_; // seq +#endif + std::fstream frw_seq_file_; // fread/fwrite seq file + bool eof_ = false; +}; + +template +class random_coro_file { + public: + random_coro_file(coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : random_coro_file(executor->get_asio_executor()) {} + + random_coro_file(asio::io_context::executor_type executor) + : executor_wrapper_(executor) {} + + bool open(std::string_view filepath, + std::ios::ios_base::openmode open_flags) { + if constexpr (execute_type == execution_type::thread_pool) { + return open_fd(filepath, to_flags(open_flags)); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + return open_native_async_file(async_random_file_, + executor_wrapper_, filepath, + to_flags(open_flags), true); +#else + return open_fd(filepath, to_flags(open_flags)); +#endif + } + } + + async_simple::coro::Lazy> async_read_at( + uint64_t offset, char *buf, size_t size) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_pread(offset, buf, size); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + auto [ec, read_size] = co_await coro_io::async_read_at( + offset, async_random_file_, asio::buffer(data, size)); + + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); + } + + co_return std::make_pair(std::error_code{}, read_size); +#else + co_return co_await async_pread(offset, buf, size); +#endif + } + } + + async_simple::coro::Lazy> async_write_at( + uint64_t offset, std::string_view buf) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_pwrite(offset, buf.data(), buf.size()); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + auto [ec, write_size] = co_await coro_io::async_write_at( + offset, async_random_file_, asio::buffer(buf)); + + co_return std::make_pair(ec, write_size); +#else + co_return co_await async_pwrite(offset, buf.data(), buf.size()); +#endif + } + } + +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + asio::random_access_file &get_async_stream_file() { + return async_random_file_; + } +#endif + + std::shared_ptr get_pread_file() { return prw_random_file_; } + + bool is_open() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_.is_open()) { + return true; + } +#endif + return prw_random_file_ != nullptr; + } + + bool eof() { return eof_; } + + bool is_in_thread_pool() { return prw_random_file_ != nullptr; } + + void close() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::error_code ec; + async_seq_file_.close(ec); +#endif + prw_random_file_ = nullptr; + } + + private: + bool open_fd(std::string_view filepath, flags open_flags) { + if (prw_random_file_) { + return true; + } + +#if defined(ASIO_WINDOWS) + int fd = _open(filepath.data(), adjust_flags(open_flags)); +#else + int fd = ::open(filepath.data(), open_flags); +#endif + if (fd < 0) { + return false; + } + + prw_random_file_ = std::shared_ptr(new int(fd), [](int *ptr) { +#if defined(ASIO_WINDOWS) + _close(*ptr); +#else + ::close(*ptr); +#endif + delete ptr; + }); + return true; + } + + async_simple::coro::Lazy> async_pread( + size_t offset, char *data, size_t size) { +#if defined(ASIO_WINDOWS) + auto pread = [](int fd, void *buf, uint64_t count, + uint64_t offset) -> int64_t { + DWORD bytes_read = 0; + OVERLAPPED overlapped; + memset(&overlapped, 0, sizeof(OVERLAPPED)); + overlapped.Offset = offset & 0xFFFFFFFF; + overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; + + BOOL ok = ReadFile(reinterpret_cast(_get_osfhandle(fd)), buf, + count, &bytes_read, &overlapped); + if (!ok && (errno = GetLastError()) != ERROR_HANDLE_EOF) { + return -1; + } + + co_return bytes_read; + }; +#endif + co_return co_await async_prw(pread, true, offset, data, size); + } + + async_simple::coro::Lazy> async_pwrite( + size_t offset, const char *data, size_t size) { +#if defined(ASIO_WINDOWS) + auto pwrite = [](int fd, const void *buf, uint64_t count, + uint64_t offset) -> int64_t { + DWORD bytes_write = 0; + OVERLAPPED overlapped; + memset(&overlapped, 0, sizeof(OVERLAPPED)); + overlapped.Offset = offset & 0xFFFFFFFF; + overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; + + BOOL ok = WriteFile(reinterpret_cast(_get_osfhandle(fd)), buf, + count, &bytes_write, &overlapped); + if (!ok) { + return -1; + } + + return bytes_write; + }; +#endif + co_return co_await async_prw(pwrite, false, offset, (char *)data, size); + } + + async_simple::coro::Lazy> async_prw( + auto io_func, bool is_read, size_t offset, char *buf, size_t size) { + std::function func = [=, this] { + int fd = *prw_random_file_; + return io_func(fd, buf, size, offset); + }; + + std::error_code ec{}; + size_t op_size = 0; + + auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_); + int len = len_val.value(); + if (len == 0) { + if (is_read) { + eof_ = true; + } + } + else if (len > 0) { + op_size = len; + } + else { + ec = std::make_error_code(std::errc::io_error); + op_size = len; + } + + co_return std::make_pair(ec, op_size); + } + + coro_io::ExecutorWrapper<> executor_wrapper_; +#if defined(ENABLE_FILE_IO_URING) + asio::random_access_file async_random_file_; // random file +#endif + std::shared_ptr prw_random_file_ = nullptr; // pread/pwrite random file + bool eof_ = false; +}; + class coro_file { public: #if defined(ENABLE_FILE_IO_URING) - coro_file( - coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) + coro_file(coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) : coro_file(executor->get_asio_executor()) {} coro_file(asio::io_context::executor_type executor) diff --git a/tests/test_corofile.cpp b/tests/test_corofile.cpp index 02fda594..a0997284 100644 --- a/tests/test_corofile.cpp +++ b/tests/test_corofile.cpp @@ -69,9 +69,76 @@ void create_files(const std::vector &files, size_t file_size) { } } +template +void test_random_read_write(std::string_view filename) { + create_files({std::string(filename)}, 190); + coro_io::random_coro_file file{}; + file.open(filename, std::ios::binary | std::ios::in | std::ios::out); + CHECK(file.is_open()); + + char buf[100]; + auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf, 10)); + CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); + CHECK(!file.eof()); + + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 100); + + pair = async_simple::coro::syncAwait(file.async_read_at(110, buf, 100)); + CHECK(pair.second == 80); + + // only read size equal 0 is eof. + pair = async_simple::coro::syncAwait(file.async_read_at(200, buf, 100)); + CHECK(file.eof()); + CHECK(pair.second == 0); + + coro_io::random_coro_file file1; + file1.open(filename, std::ios::binary | std::ios::in | std::ios::out); + CHECK(file1.is_open()); + std::string buf1 = "cccccccccc"; + auto [ec, size] = + async_simple::coro::syncAwait(file1.async_write_at(0, buf1)); + CHECK(!ec); + + std::string buf2 = "dddddddddd"; + auto [ec2, sz] = + async_simple::coro::syncAwait(file1.async_write_at(10, buf2)); + CHECK(!ec2); +} + +template +void test_seq_read_write(std::string_view filename) { + create_files({std::string(filename)}, 190); + coro_io::seq_coro_file file; + file.open(filename, std::ios::binary | std::ios::in | std::ios::out); + CHECK(file.get_stream_file().is_open()); + char buf[100]; + std::error_code ec; + size_t size; + std::tie(ec, size) = async_simple::coro::syncAwait(file.async_read(buf, 10)); + CHECK(size == 10); + + std::string str = "test"; + std::tie(ec, size) = async_simple::coro::syncAwait(file.async_write(str)); + std::cout << ec.message() << "\n"; + CHECK(size == 4); +} + +TEST_CASE("test seq and random") { + std::string filename = "validate.tmp"; + { + test_random_read_write(filename); + test_random_read_write(filename); + } + { + test_seq_read_write(filename); + test_seq_read_write(filename); + } +} + TEST_CASE("validate corofile") { std::string filename = "validate.tmp"; - create_files({filename}, 190); { coro_io::coro_file file{}; async_simple::coro::syncAwait(file.async_open(