diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2b9f4ef..c9faf55 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,6 +22,8 @@ set(FrameworkProtoFiles ${CMAKE_SOURCE_DIR}/src/tateyama/proto/framework/request.proto ${CMAKE_SOURCE_DIR}/src/tateyama/proto/framework/response.proto ${CMAKE_SOURCE_DIR}/src/tateyama/proto/diagnostics.proto + ${CMAKE_SOURCE_DIR}/src/tateyama/proto/core/request.proto + ${CMAKE_SOURCE_DIR}/src/tateyama/proto/core/response.proto ${CMAKE_SOURCE_DIR}/src/tateyama/proto/endpoint/request.proto ${CMAKE_SOURCE_DIR}/src/tateyama/proto/endpoint/response.proto ${CMAKE_SOURCE_DIR}/src/tateyama/proto/auth/request.proto diff --git a/src/ogawayama/transport/transport.h b/src/ogawayama/transport/transport.h index 0ae6fda..2591223 100644 --- a/src/ogawayama/transport/transport.h +++ b/src/ogawayama/transport/transport.h @@ -26,6 +26,8 @@ #include #include #include +#include +#include #include #include @@ -45,13 +47,17 @@ constexpr static tateyama::common::wire::message_header::index_type OPT_INDEX = class transport { constexpr static std::size_t HEADER_MESSAGE_VERSION_MAJOR = 0; constexpr static std::size_t HEADER_MESSAGE_VERSION_MINOR = 0; + constexpr static std::size_t CORE_MESSAGE_VERSION_MAJOR = 0; + constexpr static std::size_t CORE_MESSAGE_VERSION_MINOR = 0; constexpr static std::size_t SQL_MESSAGE_VERSION_MAJOR = 1; constexpr static std::size_t SQL_MESSAGE_VERSION_MINOR = 0; constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MAJOR = 0; constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MINOR = 0; + constexpr static std::uint32_t SERVICE_ID_ROUTING = 0; // from tateyama/framework/component_ids.h constexpr static std::uint32_t SERVICE_ID_ENDPOINT_BROKER = 1; // from tateyama/framework/component_ids.h constexpr static std::uint32_t SERVICE_ID_SQL = 3; // from tateyama/framework/component_ids.h constexpr static std::uint32_t SERVICE_ID_FDW = 4; // from tateyama/framework/component_ids.h + constexpr static std::uint32_t MAX_EXPIRATION_TIME = (24 * 60 * 60 * 1000); public: transport() = delete; @@ -70,6 +76,13 @@ class transport { if (handshake_response.value().result_case() != tateyama::proto::endpoint::response::Handshake::ResultCase::kSuccess) { throw std::runtime_error("handshake error"); } + auto update_expiration_time_response = update_expiration_time(); + if (!update_expiration_time_response) { + throw std::runtime_error("update_expiration_time error"); + } + if (update_expiration_time_response.value().result_case() != tateyama::proto::core::response::UpdateExpirationTime::ResultCase::kSuccess) { + throw std::runtime_error("update_expiration_time error"); + } } /** @@ -323,6 +336,26 @@ class transport { return receive(index); } + template + std::optional send(::tateyama::proto::core::request::Request& request, tateyama::common::wire::message_header::index_type index = 0) { + tateyama::proto::framework::request::Header fwrq_header{}; + fwrq_header.set_service_message_version_major(HEADER_MESSAGE_VERSION_MAJOR); + fwrq_header.set_service_message_version_minor(HEADER_MESSAGE_VERSION_MINOR); + fwrq_header.set_service_id(SERVICE_ID_ROUTING); + + std::stringstream sst{}; + if(auto res = tateyama::utils::SerializeDelimitedToOstream(fwrq_header, std::addressof(sst)); ! res) { + return std::nullopt; + } + request.set_service_message_version_major(CORE_MESSAGE_VERSION_MAJOR); + request.set_service_message_version_minor(CORE_MESSAGE_VERSION_MINOR); + if(auto res = tateyama::utils::SerializeDelimitedToOstream(request, std::addressof(sst)); ! res) { + return std::nullopt; + } + wire_.write(sst.str(), index); + return receive(index); + } + template std::optional send(::tateyama::proto::endpoint::request::Request& request, tateyama::common::wire::message_header::index_type index = 0) { tateyama::proto::framework::request::Header fwrq_header{}; @@ -434,6 +467,16 @@ class transport { return send(request); } + std::optional update_expiration_time() { + tateyama::proto::core::request::UpdateExpirationTime uet_request{}; + uet_request.set_expiration_time(MAX_EXPIRATION_TIME); + + tateyama::proto::core::request::Request request{}; + *(request.mutable_update_expiration_time()) = uet_request; + + return send(request); + } + std::optional receive(tateyama::common::wire::message_header::index_type index) { auto& response_wire = wire_.get_response_wire(); diff --git a/src/tateyama/proto/core/request.proto b/src/tateyama/proto/core/request.proto new file mode 100644 index 0000000..4a44657 --- /dev/null +++ b/src/tateyama/proto/core/request.proto @@ -0,0 +1,56 @@ +syntax = "proto3"; + +package tateyama.proto.core.request; + +option java_multiple_files = false; +option java_package = "com.tsurugidb.core.proto"; +option java_outer_classname = "CoreRequest"; + +// the request message to tateyama core service. +message Request { + // service message version (major) + uint64 service_message_version_major = 1; + + // service message version (minor) + uint64 service_message_version_minor = 2; + + // reserved for system use + reserved 3 to 10; + + // the request command. + oneof command { + // update session expiration time operation. + UpdateExpirationTime update_expiration_time = 11; + + // shutdown operation. + Shutdown shutdown = 12; + } + reserved 13 to 99; +} + +// update session expiration time +message UpdateExpirationTime { + + // the expiration time (milliseconds from now) to be set + uint64 expiration_time = 1; +} + +// kind of shutdown type. +enum ShutdownType { + + // The default shutdown type. + SHUTDOWN_TYPE_NOT_SET = 0; + + // Waits for the ongoing requests and safely shutdown the session. + GRACEFUL = 1; + + // Cancelling the ongoing requests and safely shutdown the session. + FORCEFUL = 2; +} + +// request shutdown to the session. +message Shutdown { + + // the shutdown type. + ShutdownType type = 1; +} \ No newline at end of file diff --git a/src/tateyama/proto/core/response.proto b/src/tateyama/proto/core/response.proto new file mode 100644 index 0000000..c90a728 --- /dev/null +++ b/src/tateyama/proto/core/response.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package tateyama.proto.core.response; + +option java_multiple_files = false; +option java_package = "com.tsurugidb.core.proto"; +option java_outer_classname = "CoreResponse"; + +// empty message +message Void {} + +// unknown error was occurred. +message UnknownError { + // the error message. + string message = 1; +} + +// update session expiration time +message UpdateExpirationTime { + reserved 1 to 10; + + // the response body. + oneof result { + // request is successfully completed. + Void success = 11; + + // unknown error was occurred. + UnknownError unknown_error = 12; + } +} + +// shutdown operation. +message Shutdown { + // no special message +} \ No newline at end of file