Skip to content

Commit

Permalink
Set proxy's memory limit by TiFlash (pingcap#9753)
Browse files Browse the repository at this point in the history
close pingcap#9745

Signed-off-by: Calvin Neo <calvinneo1995@gmail.com>
Signed-off-by: JaySon-Huang <tshent@qq.com>

Co-authored-by: JaySon <tshent@qq.com>
  • Loading branch information
CalvinNeo and JaySon-Huang committed Feb 8, 2025
1 parent ea2b31f commit 0918afd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 37 deletions.
128 changes: 92 additions & 36 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/BgStorageInit.h>
#include <Server/Bootstrap.h>
#include <Server/CertificateReloader.h>
Expand Down Expand Up @@ -284,31 +285,33 @@ struct TiFlashProxyConfig
args.push_back(iter->second.data());
}

explicit TiFlashProxyConfig(Poco::Util::LayeredConfiguration & config, bool has_s3_config)
// Try to parse start args from `config`.
// Return true if proxy need to be started, and `val_map` will be filled with the
// proxy start params.
// Return false if proxy is not need.
bool tryParseFromConfig(const Poco::Util::LayeredConfiguration & config, bool has_s3_config, const LoggerPtr & log)
{
auto disaggregated_mode = getDisaggregatedMode(config);

// tiflash_compute doesn't need proxy.
auto disaggregated_mode = getDisaggregatedMode(config);
if (disaggregated_mode == DisaggregatedMode::Compute && useAutoScaler(config))
{
LOG_INFO(
Logger::get(),
"TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified.");
return;
LOG_INFO(log, "TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified.");
return false;
}

Poco::Util::AbstractConfiguration::Keys keys;
config.keys("flash.proxy", keys);
if (!config.has("raft.pd_addr"))
{
LOG_WARNING(Logger::get(), "TiFlash Proxy will not start because `raft.pd_addr` is not configured.");
LOG_WARNING(log, "TiFlash Proxy will not start because `raft.pd_addr` is not configured.");
if (!keys.empty())
LOG_WARNING(Logger::get(), "`flash.proxy.*` is ignored because TiFlash Proxy will not start.");
LOG_WARNING(log, "`flash.proxy.*` is ignored because TiFlash Proxy will not start.");

return;
return false;
}

{
// config items start from `flash.proxy.`
std::unordered_map<std::string, std::string> args_map;
for (const auto & key : keys)
args_map[key] = config.getString("flash.proxy." + key);
Expand All @@ -327,14 +330,54 @@ struct TiFlashProxyConfig
for (auto && [k, v] : args_map)
val_map.emplace("--" + k, std::move(v));
}
return true;
}

TiFlashProxyConfig(
Poco::Util::LayeredConfiguration & config,
bool has_s3_config,
const StorageFormatVersion & format_version,
const Settings & settings,
const LoggerPtr & log)
{
is_proxy_runnable = tryParseFromConfig(config, has_s3_config, log);

args.push_back("TiFlash Proxy");
for (const auto & v : val_map)
{
args.push_back(v.first.data());
args.push_back(v.second.data());
}
is_proxy_runnable = true;

// Enable unips according to `format_version`
if (format_version.page == PageFormat::V4)
{
LOG_INFO(log, "Using UniPS for proxy");
addExtraArgs("unips-enabled", "1");
}

// Set the proxy's memory by size or ratio
std::visit(
[&](auto && arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, UInt64>)
{
if (arg != 0)
{
LOG_INFO(log, "Limit proxy's memory, size={}", arg);
addExtraArgs("memory-limit-size", std::to_string(arg));
}
}
else if constexpr (std::is_same_v<T, double>)
{
if (arg > 0 && arg <= 1.0)
{
LOG_INFO(log, "Limit proxy's memory, ratio={}", arg);
addExtraArgs("memory-limit-ratio", std::to_string(arg));
}
}
},
settings.max_memory_usage_for_all_queries.get());
}
};

Expand Down Expand Up @@ -518,7 +561,7 @@ struct RaftStoreProxyRunner : boost::noncopyable
pthread_attr_t attribute;
pthread_attr_init(&attribute);
pthread_attr_setstacksize(&attribute, parms.stack_size);
LOG_INFO(log, "start raft store proxy");
LOG_INFO(log, "Start raft store proxy. Args: {}", parms.conf.args);
pthread_create(&thread, &attribute, runRaftStoreProxyFFI, &parms);
pthread_attr_destroy(&attribute);
}
Expand Down Expand Up @@ -1023,20 +1066,39 @@ int Server::main(const std::vector<std::string> & /*args*/)
// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
// TODO: Parse the settings from config file at the program beginning
global_context->setDefaultProfiles(config());
LOG_INFO(
log,
"Loaded global settings from default_profile and system_profile, changed configs: {{{}}}",
global_context->getSettingsRef().toString());
Settings & settings = global_context->getSettingsRef();

// Init Proxy's config
TiFlashProxyConfig proxy_conf(config(), storage_config.s3_config.isS3Enabled());
TiFlashProxyConfig proxy_conf( //
config(),
storage_config.s3_config.isS3Enabled(),
STORAGE_FORMAT_CURRENT,
settings,
log);
EngineStoreServerWrap tiflash_instance_wrap{};
auto helper = GetEngineStoreServerHelper(&tiflash_instance_wrap);

if (STORAGE_FORMAT_CURRENT.page == PageFormat::V4)
{
LOG_INFO(log, "Using UniPS for proxy");
proxy_conf.addExtraArgs("unips-enabled", "1");
}
else
{
LOG_INFO(log, "UniPS is not enabled for proxy, page_version={}", STORAGE_FORMAT_CURRENT.page);
}
#ifdef USE_JEMALLOC
LOG_INFO(log, "Using Jemalloc for TiFlash");
#else
LOG_INFO(log, "Not using Jemalloc for TiFlash");
#endif


RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log);

Expand Down Expand Up @@ -1090,10 +1152,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
gpr_set_log_function(&printGRPCLog);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
SCOPE_EXIT({
if (!proxy_conf.is_proxy_runnable)
return;

LOG_INFO(log, "Unlink tiflash_instance_wrap.tmt");
// Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer
tiflash_instance_wrap.tmt = nullptr;
});
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;
Expand Down Expand Up @@ -1277,21 +1343,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Init TiFlash metrics.
global_context->initializeTiFlashMetrics();

/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
// TODO: Parse the settings from config file at the program beginning
global_context->setDefaultProfiles(config());
LOG_INFO(log, "Loaded global settings from default_profile and system_profile.");

///
/// The config value in global settings can only be used from here because we just loaded it from config file.
///

/// Initialize the background & blockable background thread pool.
Settings & settings = global_context->getSettingsRef();
LOG_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
Expand Down

0 comments on commit 0918afd

Please sign in to comment.