Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(conf): use DSN_DEFINE_uint32 to load uint32 type of configs #1352

Merged
merged 4 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ DSN_DEFINE_group_validator(min_max_level, [](std::string &message) -> bool {
}
return true;
});
DSN_DEFINE_uint32(geo_client.lib, latitude_index, 5, "latitude index in value");
DSN_DEFINE_uint32(geo_client.lib, longitude_index, 4, "longitude index in value");

struct SearchResultNearer
{
Expand Down Expand Up @@ -99,14 +101,11 @@ geo_client::geo_client(const char *config_file,
_geo_data_client = pegasus_client_factory::get_client(cluster_name, geo_app_name);
CHECK_NOTNULL(_geo_data_client, "init pegasus _geo_data_client failed");

uint32_t latitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "latitude_index", 5, "latitude index in value");

uint32_t longitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "longitude_index", 4, "longitude index in value");

dsn::error_s s = _codec.set_latlng_indices(latitude_index, longitude_index);
CHECK(s.is_ok(), "set_latlng_indices({}, {}) failed", latitude_index, longitude_index);
dsn::error_s s = _codec.set_latlng_indices(FLAGS_latitude_index, FLAGS_longitude_index);
CHECK(s.is_ok(),
"set_latlng_indices({}, {}) failed",
FLAGS_latitude_index,
FLAGS_longitude_index);
}

dsn::error_s geo_client::set_max_level(int level)
Expand Down
12 changes: 6 additions & 6 deletions src/meta/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ DEFINE_TASK_CODE(TASK_META_TEST, TASK_PRIORITY_COMMON, THREAD_POOL_META_TEST)

meta_service_test_app *g_app;

DSN_DEFINE_uint32(tools.simulator, random_seed, 0, "random seed");

// as it is not easy to clean test environment in some cases, we simply run these tests in several
// commands,
// please check the script "run.sh" to modify the GTEST_FILTER
Expand Down Expand Up @@ -80,13 +82,11 @@ TEST(meta, app_envs_basic_test) { g_app->app_envs_basic_test(); }

dsn::error_code meta_service_test_app::start(const std::vector<std::string> &args)
{
uint32_t seed =
(uint32_t)dsn_config_get_value_uint64("tools.simulator", "random_seed", 0, "random seed");
if (seed == 0) {
seed = time(0);
LOG_ERROR("initial seed: {}", seed);
if (FLAGS_random_seed == 0) {
FLAGS_random_seed = static_cast<uint32_t>(time(nullptr));
LOG_INFO("initial seed: {}", FLAGS_random_seed);
}
srand(seed);
srand(FLAGS_random_seed);

int argc = args.size();
char *argv[20];
Expand Down
4 changes: 0 additions & 4 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;

// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");

for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
Expand Down
12 changes: 8 additions & 4 deletions src/runtime/rpc/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
#include "utils/strings.h"

namespace dsn {
DSN_DEFINE_uint32(network,
conn_threshold_per_ip,
0,
"max connection count to each server per ip, 0 means no limit");

/*static*/ join_point<void, rpc_session *>
rpc_session::on_rpc_session_connected("rpc.session.connected");
/*static*/ join_point<void, rpc_session *>
Expand Down Expand Up @@ -582,7 +587,6 @@ uint32_t network::get_local_ipv4()
connection_oriented_network::connection_oriented_network(rpc_engine *srv, network *inner_provider)
: network(srv, inner_provider)
{
_cfg_conn_threshold_per_ip = 0;
_client_session_count.init_global_counter("server",
"network",
"client_session_count",
Expand Down Expand Up @@ -744,7 +748,7 @@ void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr

bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_address ep)
{
if (_cfg_conn_threshold_per_ip <= 0) {
if (FLAGS_conn_threshold_per_ip <= 0) {
LOG_DEBUG("new client from {} is connecting to server {}, no connection threshold",
ep.ipv4_str(),
address());
Expand All @@ -760,7 +764,7 @@ bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ip_conn_count = it->second;
}
}
if (ip_conn_count >= _cfg_conn_threshold_per_ip) {
if (ip_conn_count >= FLAGS_conn_threshold_per_ip) {
exceeded = true;
}

Expand All @@ -769,7 +773,7 @@ bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ep.ipv4_str(),
address(),
ip_conn_count,
_cfg_conn_threshold_per_ip);
FLAGS_conn_threshold_per_ip);

return exceeded;
}
Expand Down
1 change: 0 additions & 1 deletion src/runtime/rpc/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ class connection_oriented_network : public network
ip_connection_count _ip_conn_count; // from_ip => connection count
utils::rw_lock_nr _servers_lock;

uint32_t _cfg_conn_threshold_per_ip;
perf_counter_wrapper _client_session_count;
};

Expand Down
25 changes: 9 additions & 16 deletions src/runtime/rpc/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@
#include "utils/rand.h"
#include "runtime/node_scoper.h"
#include "network.sim.h"
#include "utils/flags.h"

namespace dsn {
namespace tools {

DSN_DEFINE_uint32(tools.simulator, min_message_delay_microseconds, 1, "min message delay (us)");
DSN_DEFINE_uint32(tools.simulator,
max_message_delay_microseconds,
100000,
"max message delay (us)");

// switch[channel][header_format]
// multiple machines connect to the same switch
// 10 should be >= than rpc_channel::max_value() + 1
Expand Down Expand Up @@ -153,20 +160,6 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi
: connection_oriented_network(rpc, inner_provider)
{
_address.assign_ipv4("localhost", 1);

_min_message_delay_microseconds = 1;
_max_message_delay_microseconds = 100000;

_min_message_delay_microseconds =
(uint32_t)dsn_config_get_value_uint64("tools.simulator",
"min_message_delay_microseconds",
_min_message_delay_microseconds,
"min message delay (us)");
_max_message_delay_microseconds =
(uint32_t)dsn_config_get_value_uint64("tools.simulator",
"max_message_delay_microseconds",
_max_message_delay_microseconds,
"max message delay (us)");
}

error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only)
Expand Down Expand Up @@ -194,8 +187,8 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien

uint32_t sim_network_provider::net_delay_milliseconds() const
{
return static_cast<uint32_t>(
rand::next_u32(_min_message_delay_microseconds, _max_message_delay_microseconds)) /
return rand::next_u32(FLAGS_min_message_delay_microseconds,
FLAGS_max_message_delay_microseconds) /
1000;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/runtime/rpc/network.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ class sim_network_provider : public connection_oriented_network

private:
::dsn::rpc_address _address;
uint32_t _min_message_delay_microseconds;
uint32_t _max_message_delay_microseconds;
};

//------------- inline implementations -------------
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include "runtime/rpc/serialization.h"
#include "utils/rand.h"
#include <set>
#include "utils/flags.h"

namespace dsn {
DSN_DECLARE_uint32(local_hash);

DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)

Expand Down Expand Up @@ -735,7 +737,7 @@ void rpc_engine::reply(message_ex *response, error_code err)
sizeof(response->header->server.error_name) - 1);
response->header->server.error_name[sizeof(response->header->server.error_name) - 1] = '\0';
response->header->server.error_code.local_code = err;
response->header->server.error_code.local_hash = message_ex::s_local_hash;
response->header->server.error_code.local_hash = FLAGS_local_hash;

// response rpc code may be TASK_CODE_INVALID when request rpc code is not exist
auto sp = response->local_rpc_code == TASK_CODE_INVALID
Expand Down
23 changes: 15 additions & 8 deletions src/runtime/rpc/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@
#include <cctype>

#include "runtime/task/task_engine.h"
#include "utils/flags.h"

using namespace dsn::utils;

namespace dsn {
// init common for all per-node providers
DSN_DEFINE_uint32(core,
local_hash,
0,
"a same hash value from two processes indicate the rpc codes are registered in "
"the same order, and therefore the mapping between rpc code string and integer "
"is the same, which we leverage for fast rpc handler lookup optimization");

std::atomic<uint64_t> message_ex::_id(0);
uint32_t message_ex::s_local_hash = 0;

message_ex::message_ex()
: header(nullptr),
Expand Down Expand Up @@ -82,11 +89,11 @@ error_code message_ex::error()
{
dsn::error_code code;
auto binary_hash = header->server.error_code.local_hash;
if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
code = dsn::error_code(header->server.error_code.local_code);
} else {
code = error_code::try_get(header->server.error_name, dsn::ERR_UNKNOWN);
header->server.error_code.local_hash = ::dsn::message_ex::s_local_hash;
header->server.error_code.local_hash = FLAGS_local_hash;
header->server.error_code.local_code = code;
}
return code;
Expand All @@ -99,11 +106,11 @@ task_code message_ex::rpc_code()
}

auto binary_hash = header->rpc_code.local_hash;
if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
local_rpc_code = dsn::task_code(header->rpc_code.local_code);
} else {
local_rpc_code = dsn::task_code::try_get(header->rpc_name, ::dsn::TASK_CODE_INVALID);
header->rpc_code.local_hash = ::dsn::message_ex::s_local_hash;
header->rpc_code.local_hash = FLAGS_local_hash;
header->rpc_code.local_code = local_rpc_code.code();
}

Expand Down Expand Up @@ -307,7 +314,7 @@ message_ex *message_ex::create_request(dsn::task_code rpc_code,
strncpy(hdr.rpc_name, sp->name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = (uint32_t)rpc_code;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;

hdr.id = new_id();

Expand Down Expand Up @@ -348,7 +355,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, response_sp->name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = msg->local_rpc_code;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;

// join point
request_sp->on_rpc_create_response.execute(this, msg);
Expand All @@ -359,7 +366,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, ack_rpc_name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = TASK_CODE_INVALID;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;
}

return msg;
Expand Down
3 changes: 0 additions & 3 deletions src/runtime/rpc/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ class message_ex : public ref_counter, public extensible_object<message_ex, 4>
int _rw_offset; // current buffer offset
bool _rw_committed; // mark if it is in middle state of reading/writing
bool _is_read; // is for read(recv) or write(send)

public:
static uint32_t s_local_hash; // used by fast_rpc_name
};
typedef dsn::ref_ptr<message_ex> message_ptr;

Expand Down
16 changes: 1 addition & 15 deletions src/runtime/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,7 @@ service_engine::service_engine()

service_engine::~service_engine() { _nodes_by_app_id.clear(); }

void service_engine::init_before_toollets(const service_spec &spec)
{
_spec = spec;

// init common for all per-node providers
message_ex::s_local_hash =
(uint32_t)dsn_config_get_value_uint64("core",
"local_hash",
0,
"a same hash value from two processes indicate the "
"rpc code are registered in the same order, "
"and therefore the mapping between rpc code string "
"and integer is the same, which we leverage "
"for fast rpc handler lookup optimization");
}
void service_engine::init_before_toollets(const service_spec &spec) { _spec = spec; }

void service_engine::init_after_toollets()
{
Expand Down
Loading