Skip to content

Commit

Permalink
*: decouple FlashGrpcServerHolder from Server.cpp (#5516)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Aug 2, 2022
1 parent bebd45a commit 4972cf3
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 203 deletions.
12 changes: 6 additions & 6 deletions dbms/src/Flash/DiagnosticsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ::grpc::Status DiagnosticsService::server_info(
::diagnosticspb::ServerInfoResponse * response)
try
{
const TiFlashRaftProxyHelper * helper = server.context().getTMTContext().getKVStore()->getProxyHelper();
const TiFlashRaftProxyHelper * helper = context.getTMTContext().getKVStore()->getProxyHelper();
if (helper)
{
std::string req = request->SerializeAsString();
Expand All @@ -63,18 +63,18 @@ catch (const std::exception & e)
}

// get & filter(ts of last record < start-time) all files in same log directory.
std::list<std::string> getFilesToSearch(IServer & server, Poco::Logger * log, const int64_t start_time)
std::list<std::string> getFilesToSearch(Poco::Util::LayeredConfiguration & config, Poco::Logger * log, const int64_t start_time)
{
std::list<std::string> files_to_search;

std::string log_dir; // log directory
auto error_log_file_prefix = server.config().getString("logger.errorlog", "*");
auto tracing_log_file_prefix = server.config().getString("logger.tracing_log", "*");
auto error_log_file_prefix = config.getString("logger.errorlog", "*");
auto tracing_log_file_prefix = config.getString("logger.tracing_log", "*");
// ignore tiflash error log and mpp task tracing log
std::vector<String> ignore_log_file_prefixes = {error_log_file_prefix, tracing_log_file_prefix};

{
auto log_file_prefix = server.config().getString("logger.log");
auto log_file_prefix = config.getString("logger.log");
if (auto it = log_file_prefix.rfind('/'); it != std::string::npos)
{
log_dir = std::string(log_file_prefix.begin(), log_file_prefix.begin() + it);
Expand Down Expand Up @@ -163,7 +163,7 @@ ::grpc::Status DiagnosticsService::search_log(
LOG_FMT_DEBUG(log, "Handling SearchLog done: {}", request->DebugString());
});

auto files_to_search = getFilesToSearch(server, log, start_time);
auto files_to_search = getFilesToSearch(config, log, start_time);

for (const auto & path : files_to_search)
{
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/DiagnosticsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class DiagnosticsService final : public ::diagnosticspb::Diagnostics::Service
, private boost::noncopyable
{
public:
explicit DiagnosticsService(IServer & _server)
explicit DiagnosticsService(Context & context_, Poco::Util::LayeredConfiguration & config_)
: log(&Poco::Logger::get("DiagnosticsService"))
, server(_server)
, context(context_)
, config(config_)
{}
~DiagnosticsService() override = default;

Expand All @@ -51,8 +52,9 @@ class DiagnosticsService final : public ::diagnosticspb::Diagnostics::Service

private:
Poco::Logger * log;
Context & context;

IServer & server;
Poco::Util::LayeredConfiguration & config;
};

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h)

add_library (clickhouse-server-lib
HTTPHandler.cpp
FlashGrpcServerHolder.cpp
MetricsTransmitter.cpp
MetricsPrometheus.cpp
NotFoundHandler.cpp
Expand Down
198 changes: 198 additions & 0 deletions dbms/src/Server/FlashGrpcServerHolder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Server/FlashGrpcServerHolder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int IP_ADDRESS_NOT_ALLOWED;
} // namespace ErrorCodes
namespace
{
void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log)
{
GET_METRIC(tiflash_thread_count, type_total_rpc_async_worker).Increment();
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_total_rpc_async_worker).Decrement();
});
void * tag = nullptr; // uniquely identifies a request.
bool ok = false;
while (true)
{
String err_msg;
try
{
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a EstablishCallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq is shutting down.
if (!curcq->Next(&tag, &ok))
{
LOG_FMT_INFO(log, "CQ is fully drained and shut down");
break;
}
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment();
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Decrement();
});
// If ok is false, it means server is shutdown.
// We need not log all not ok events, since the volumn is large which will pollute the content of log.
if (ok)
static_cast<EstablishCallData *>(tag)->proceed();
else
static_cast<EstablishCallData *>(tag)->cancel();
}
catch (Exception & e)
{
err_msg = e.displayText();
LOG_FMT_ERROR(log, "handleRpcs meets error: {} Stack Trace : {}", err_msg, e.getStackTrace().toString());
}
catch (pingcap::Exception & e)
{
err_msg = e.message();
LOG_FMT_ERROR(log, "handleRpcs meets error: {}", err_msg);
}
catch (std::exception & e)
{
err_msg = e.what();
LOG_FMT_ERROR(log, "handleRpcs meets error: {}", err_msg);
}
catch (...)
{
err_msg = "unrecovered error";
LOG_FMT_ERROR(log, "handleRpcs meets error: {}", err_msg);
throw;
}
}
}
} // namespace

FlashGrpcServerHolder::FlashGrpcServerHolder(Context & context, Poco::Util::LayeredConfiguration & config_, TiFlashSecurityConfig & security_config, const TiFlashRaftConfig & raft_config, const LoggerPtr & log_)
: log(log_)
, is_shutdown(std::make_shared<std::atomic<bool>>(false))
{
grpc::ServerBuilder builder;
if (security_config.has_tls_config)
{
grpc::SslServerCredentialsOptions server_cred(GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
auto options = security_config.readAndCacheSecurityInfo();
server_cred.pem_root_certs = options.pem_root_certs;
server_cred.pem_key_cert_pairs.push_back(
grpc::SslServerCredentialsOptions::PemKeyCertPair{options.pem_private_key, options.pem_cert_chain});
builder.AddListeningPort(raft_config.flash_server_addr, grpc::SslServerCredentials(server_cred));
}
else
{
builder.AddListeningPort(raft_config.flash_server_addr, grpc::InsecureServerCredentials());
}

/// Init and register flash service.
bool enable_async_server = context.getSettingsRef().enable_async_server;
if (enable_async_server)
flash_service = std::make_unique<AsyncFlashService>(security_config, context);
else
flash_service = std::make_unique<FlashService>(security_config, context);
diagnostics_service = std::make_unique<DiagnosticsService>(context, config_);
builder.SetOption(grpc::MakeChannelArgumentOption(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5 * 1000));
builder.SetOption(grpc::MakeChannelArgumentOption(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 10 * 1000));
builder.SetOption(grpc::MakeChannelArgumentOption(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1));
// number of grpc thread pool's non-temporary threads, better tune it up to avoid frequent creation/destruction of threads
auto max_grpc_pollers = context.getSettingsRef().max_grpc_pollers;
if (max_grpc_pollers > 0 && max_grpc_pollers <= std::numeric_limits<int>::max())
builder.SetSyncServerOption(grpc::ServerBuilder::SyncServerOption::MAX_POLLERS, max_grpc_pollers);
builder.RegisterService(flash_service.get());
LOG_FMT_INFO(log, "Flash service registered");
builder.RegisterService(diagnostics_service.get());
LOG_FMT_INFO(log, "Diagnostics service registered");

/// Kick off grpc server.
// Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error.
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);
thread_manager = DB::newThreadManager();
int async_cq_num = context.getSettingsRef().async_cqs;
if (enable_async_server)
{
for (int i = 0; i < async_cq_num; ++i)
{
cqs.emplace_back(builder.AddCompletionQueue());
notify_cqs.emplace_back(builder.AddCompletionQueue());
}
}
flash_grpc_server = builder.BuildAndStart();
if (!flash_grpc_server)
{
throw Exception("Exception happens when start grpc server, the flash.service_addr may be invalid, flash.service_addr is " + raft_config.flash_server_addr, ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
}
LOG_FMT_INFO(log, "Flash grpc server listening on [{}]", raft_config.flash_server_addr);
Debug::setServiceAddr(raft_config.flash_server_addr);
if (enable_async_server)
{
int preallocated_request_count_per_poller = context.getSettingsRef().preallocated_request_count_per_poller;
int pollers_per_cq = context.getSettingsRef().async_pollers_per_cq;
for (int i = 0; i < async_cq_num * pollers_per_cq; ++i)
{
auto * cq = cqs[i / pollers_per_cq].get();
auto * notify_cq = notify_cqs[i / pollers_per_cq].get();
for (int j = 0; j < preallocated_request_count_per_poller; ++j)
{
// EstablishCallData will handle its lifecycle by itself.
EstablishCallData::spawn(assert_cast<AsyncFlashService *>(flash_service.get()), cq, notify_cq, is_shutdown);
}
thread_manager->schedule(false, "async_poller", [cq, this] { handleRpcs(cq, log); });
thread_manager->schedule(false, "async_poller", [notify_cq, this] { handleRpcs(notify_cq, log); });
}
}
}

FlashGrpcServerHolder::~FlashGrpcServerHolder()
{
try
{
/// Shut down grpc server.
LOG_FMT_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown();
*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));

for (auto & cq : cqs)
cq->Shutdown();
for (auto & cq : notify_cqs)
cq->Shutdown();
thread_manager->wait();
flash_grpc_server->Wait();
flash_grpc_server.reset();
if (GRPCCompletionQueuePool::global_instance)
GRPCCompletionQueuePool::global_instance->markShutdown();
LOG_FMT_INFO(log, "Shut down flash grpc server");

/// Close flash service.
LOG_FMT_INFO(log, "Begin to shut down flash service");
flash_service.reset();
LOG_FMT_INFO(log, "Shut down flash service");
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_FMT_FATAL(log, "Exception happens in destructor of FlashGrpcServerHolder with message: {}", message);
std::terminate();
}
}
} // namespace DB
48 changes: 48 additions & 0 deletions dbms/src/Server/FlashGrpcServerHolder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Common/assert_cast.h>
#include <Debug/astToExecutor.h>
#include <Flash/DiagnosticsService.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/GRPCCompletionQueuePool.h>
#include <Server/RaftConfigParser.h>

namespace DB
{
class FlashGrpcServerHolder
{
public:
FlashGrpcServerHolder(
Context & context,
Poco::Util::LayeredConfiguration & config_,
TiFlashSecurityConfig & security_config,
const TiFlashRaftConfig & raft_config,
const LoggerPtr & log_);
~FlashGrpcServerHolder();

private:
const LoggerPtr & log;
std::shared_ptr<std::atomic<bool>> is_shutdown;
std::unique_ptr<FlashService> flash_service = nullptr;
std::unique_ptr<DiagnosticsService> diagnostics_service = nullptr;
std::unique_ptr<grpc::Server> flash_grpc_server = nullptr;
// cqs and notify_cqs are used for processing async grpc events (currently only EstablishMPPConnection).
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> cqs;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> notify_cqs;
std::shared_ptr<ThreadManager> thread_manager;
};

} // namespace DB
Loading

0 comments on commit 4972cf3

Please sign in to comment.