Skip to content

Commit

Permalink
set a timeout deadline of 86400 seconds at the start of sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Jul 18, 2024
1 parent 9701edd commit 860607f
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ set(FrameworkProtoFiles
${CMAKE_SOURCE_DIR}/src/tateyama/proto/framework/request.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/framework/response.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/diagnostics.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/core/request.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/core/response.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/endpoint/request.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/endpoint/response.proto
${CMAKE_SOURCE_DIR}/src/tateyama/proto/auth/request.proto
Expand Down
43 changes: 43 additions & 0 deletions src/ogawayama/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <tateyama/utils/protobuf_utils.h>
#include <tateyama/proto/framework/request.pb.h>
#include <tateyama/proto/framework/response.pb.h>
#include <tateyama/proto/core/request.pb.h>
#include <tateyama/proto/core/response.pb.h>
#include <tateyama/proto/endpoint/request.pb.h>
#include <tateyama/proto/endpoint/response.pb.h>

Expand All @@ -45,13 +47,17 @@ constexpr static tateyama::common::wire::message_header::index_type OPT_INDEX =
class transport {
constexpr static std::size_t HEADER_MESSAGE_VERSION_MAJOR = 0;
constexpr static std::size_t HEADER_MESSAGE_VERSION_MINOR = 0;
constexpr static std::size_t CORE_MESSAGE_VERSION_MAJOR = 0;
constexpr static std::size_t CORE_MESSAGE_VERSION_MINOR = 0;
constexpr static std::size_t SQL_MESSAGE_VERSION_MAJOR = 1;
constexpr static std::size_t SQL_MESSAGE_VERSION_MINOR = 0;
constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MAJOR = 0;
constexpr static std::size_t ENDPOINT_MESSAGE_VERSION_MINOR = 0;
constexpr static std::uint32_t SERVICE_ID_ROUTING = 0; // from tateyama/framework/component_ids.h
constexpr static std::uint32_t SERVICE_ID_ENDPOINT_BROKER = 1; // from tateyama/framework/component_ids.h
constexpr static std::uint32_t SERVICE_ID_SQL = 3; // from tateyama/framework/component_ids.h
constexpr static std::uint32_t SERVICE_ID_FDW = 4; // from tateyama/framework/component_ids.h
constexpr static std::uint32_t MAX_EXPIRATION_TIME = (24 * 60 * 60 * 1000);

public:
transport() = delete;
Expand All @@ -70,6 +76,13 @@ class transport {
if (handshake_response.value().result_case() != tateyama::proto::endpoint::response::Handshake::ResultCase::kSuccess) {
throw std::runtime_error("handshake error");
}
auto update_expiration_time_response = update_expiration_time();
if (!update_expiration_time_response) {
throw std::runtime_error("update_expiration_time error");
}
if (update_expiration_time_response.value().result_case() != tateyama::proto::core::response::UpdateExpirationTime::ResultCase::kSuccess) {
throw std::runtime_error("update_expiration_time error");
}
}

/**
Expand Down Expand Up @@ -323,6 +336,26 @@ class transport {
return receive<T>(index);
}

template <typename T>
std::optional<T> send(::tateyama::proto::core::request::Request& request, tateyama::common::wire::message_header::index_type index = 0) {
tateyama::proto::framework::request::Header fwrq_header{};
fwrq_header.set_service_message_version_major(HEADER_MESSAGE_VERSION_MAJOR);
fwrq_header.set_service_message_version_minor(HEADER_MESSAGE_VERSION_MINOR);
fwrq_header.set_service_id(SERVICE_ID_ROUTING);

std::stringstream sst{};
if(auto res = tateyama::utils::SerializeDelimitedToOstream(fwrq_header, std::addressof(sst)); ! res) {
return std::nullopt;
}
request.set_service_message_version_major(CORE_MESSAGE_VERSION_MAJOR);
request.set_service_message_version_minor(CORE_MESSAGE_VERSION_MINOR);
if(auto res = tateyama::utils::SerializeDelimitedToOstream(request, std::addressof(sst)); ! res) {
return std::nullopt;
}
wire_.write(sst.str(), index);
return receive<T>(index);
}

template <typename T>
std::optional<T> send(::tateyama::proto::endpoint::request::Request& request, tateyama::common::wire::message_header::index_type index = 0) {
tateyama::proto::framework::request::Header fwrq_header{};
Expand Down Expand Up @@ -434,6 +467,16 @@ class transport {
return send<tateyama::proto::endpoint::response::Handshake>(request);
}

std::optional<tateyama::proto::core::response::UpdateExpirationTime> update_expiration_time() {
tateyama::proto::core::request::UpdateExpirationTime uet_request{};
uet_request.set_expiration_time(MAX_EXPIRATION_TIME);

tateyama::proto::core::request::Request request{};
*(request.mutable_update_expiration_time()) = uet_request;

return send<tateyama::proto::core::response::UpdateExpirationTime>(request);
}

std::optional<std::string> receive(tateyama::common::wire::message_header::index_type index) {
auto& response_wire = wire_.get_response_wire();

Expand Down
56 changes: 56 additions & 0 deletions src/tateyama/proto/core/request.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
syntax = "proto3";

package tateyama.proto.core.request;

option java_multiple_files = false;
option java_package = "com.tsurugidb.core.proto";
option java_outer_classname = "CoreRequest";

// the request message to tateyama core service.
message Request {
// service message version (major)
uint64 service_message_version_major = 1;

// service message version (minor)
uint64 service_message_version_minor = 2;

// reserved for system use
reserved 3 to 10;

// the request command.
oneof command {
// update session expiration time operation.
UpdateExpirationTime update_expiration_time = 11;

// shutdown operation.
Shutdown shutdown = 12;
}
reserved 13 to 99;
}

// update session expiration time
message UpdateExpirationTime {

// the expiration time (milliseconds from now) to be set
uint64 expiration_time = 1;
}

// kind of shutdown type.
enum ShutdownType {

// The default shutdown type.
SHUTDOWN_TYPE_NOT_SET = 0;

// Waits for the ongoing requests and safely shutdown the session.
GRACEFUL = 1;

// Cancelling the ongoing requests and safely shutdown the session.
FORCEFUL = 2;
}

// request shutdown to the session.
message Shutdown {

// the shutdown type.
ShutdownType type = 1;
}
35 changes: 35 additions & 0 deletions src/tateyama/proto/core/response.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
syntax = "proto3";

package tateyama.proto.core.response;

option java_multiple_files = false;
option java_package = "com.tsurugidb.core.proto";
option java_outer_classname = "CoreResponse";

// empty message
message Void {}

// unknown error was occurred.
message UnknownError {
// the error message.
string message = 1;
}

// update session expiration time
message UpdateExpirationTime {
reserved 1 to 10;

// the response body.
oneof result {
// request is successfully completed.
Void success = 11;

// unknown error was occurred.
UnknownError unknown_error = 12;
}
}

// shutdown operation.
message Shutdown {
// no special message
}

0 comments on commit 860607f

Please sign in to comment.