Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(conf): use DSN_DEFINE_int64 to load int64 type of configs #1357

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ replication_options::replication_options()
group_check_disabled = false;

checkpoint_disabled = false;
checkpoint_min_decree_gap = 10000;

gc_disabled = false;

Expand Down Expand Up @@ -209,11 +208,6 @@ void replication_options::initialize()
"checkpoint_disabled",
checkpoint_disabled,
"whether checkpoint is disabled");
checkpoint_min_decree_gap =
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
(int64_t)dsn_config_get_value_uint64("replication",
"checkpoint_min_decree_gap",
checkpoint_min_decree_gap,
"minimum decree gap that triggers checkpoint");

gc_disabled = dsn_config_get_value_bool(
"replication", "gc_disabled", gc_disabled, "whether to disable garbage collection");
Expand Down
1 change: 0 additions & 1 deletion src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class replication_options
bool group_check_disabled;

bool checkpoint_disabled;
int64_t checkpoint_min_decree_gap;

bool gc_disabled;

Expand Down
1 change: 0 additions & 1 deletion src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ stateful = true

checkpoint_disabled = false
checkpoint_interval_seconds = 300
checkpoint_min_decree_gap = 10000
checkpoint_max_interval_hours = 2

gc_disabled = false
Expand Down
6 changes: 5 additions & 1 deletion src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ DSN_DEFINE_int32(pegasus.server,
hotkey_analyse_time_interval_s,
10,
"hotkey analyse interval in seconds");
DSN_DEFINE_int32(pegasus.server,
update_rdb_stat_interval,
60,
"The interval seconds to update RocksDB statistics, in seconds.");

static std::string chkpt_get_dir_name(int64_t decree)
{
Expand Down Expand Up @@ -1695,7 +1699,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
&_tracker,
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);
std::chrono::seconds(FLAGS_update_rdb_stat_interval));

// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
Expand Down
1 change: 0 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ class pegasus_server_impl : public pegasus_read_service

pegasus_context_cache _context_cache;

std::chrono::seconds _update_rdb_stat_interval;
::dsn::task_ptr _update_replica_rdb_stat;
static ::dsn::task_ptr _update_server_rdb_stat;

Expand Down
16 changes: 6 additions & 10 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ DSN_DEFINE_uint32(pegasus.server,
checkpoint_reserve_time_seconds,
1800,
"Minimum seconds of checkpoint to reserve, 0 means no check.");
DSN_DEFINE_int32(pegasus.server,
rocksdb_max_open_files,
-1,
"The number of opened files that can be used by a replica(namely a DB instance). "
"The default value is -1 which means always keep files opened.");

static const std::unordered_map<std::string, rocksdb::BlockBasedTableOptions::IndexType>
INDEX_TYPE_STRING_MAP = {
Expand Down Expand Up @@ -445,12 +450,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_db_opts.write_buffer_manager = _s_write_buffer_manager;
}

int64_t max_open_files = dsn_config_get_value_int64(
"pegasus.server",
"rocksdb_max_open_files",
-1, /* always keep files opened, default by rocksdb */
"number of opened files that can be used by a replica(namely a DB instance)");
_db_opts.max_open_files = static_cast<int>(max_open_files);
_db_opts.max_open_files = FLAGS_rocksdb_max_open_files;
LOG_INFO_PREFIX("rocksdb_max_open_files = {}", _db_opts.max_open_files);

_db_opts.max_log_file_size = static_cast<size_t>(FLAGS_rocksdb_max_log_file_size);
Expand Down Expand Up @@ -593,10 +593,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_checkpoint_reserve_min_count = FLAGS_checkpoint_reserve_min_count;
_checkpoint_reserve_time_seconds = FLAGS_checkpoint_reserve_time_seconds;

// TODO(yingchun): signed integral type of at least 35 bits, int64_t
_update_rdb_stat_interval = std::chrono::seconds(dsn_config_get_value_uint64(
"pegasus.server", "update_rdb_stat_interval", 60, "update_rdb_stat_interval, in seconds"));

// TODO: move the qps/latency counters and it's statistics to replication_app_base layer
std::string str_gpid = _gpid.to_string();
char name[256];
Expand Down
14 changes: 7 additions & 7 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
namespace pegasus {
namespace server {

DSN_DEFINE_int64(pegasus.server,
dup_lagging_write_threshold_ms,
10 * 1000,
"If the duration that a write flows from master to slave is larger than this "
"threshold, the write is defined a lagging write.");

DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION)

pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
Expand Down Expand Up @@ -130,12 +136,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
COUNTER_TYPE_NUMBER_PERCENTILES,
"the time (in ms) lag between master and slave in the duplication");

_dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
"pegasus.server",
"dup_lagging_write_threshold_ms",
10 * 1000,
"If the duration that a write flows from master to slave is larger than this threshold, "
"the write is defined a lagging write.");
_pfc_dup_lagging_writes.init_app_counter(
"app.pegasus",
fmt::format("dup.lagging_writes@{}", app_name()).c_str(),
Expand Down Expand Up @@ -327,7 +327,7 @@ int pegasus_write_service::duplicate(int64_t decree,
_pfc_duplicate_qps->increment();
auto cleanup = dsn::defer([this, &request]() {
uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
if (latency_ms > _dup_lagging_write_threshold_ms) {
if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) {
_pfc_dup_lagging_writes->increment();
}
_pfc_dup_time_lag->set(latency_ms);
Expand Down
1 change: 0 additions & 1 deletion src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class pegasus_write_service : dsn::replication::replica_base
uint64_t _batch_start_time;

capacity_unit_calculator *_cu_calculator;
int64_t _dup_lagging_write_threshold_ms;

::dsn::perf_counter_wrapper _pfc_put_qps;
::dsn::perf_counter_wrapper _pfc_multi_put_qps;
Expand Down
1 change: 0 additions & 1 deletion src/server/test/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ group_check_interval_ms = 100000

checkpoint_disabled = false
checkpoint_interval_seconds = 100
checkpoint_min_decree_gap = 10000
checkpoint_max_interval_hours = 1

gc_disabled = false
Expand Down
22 changes: 10 additions & 12 deletions src/test/pressure_test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ DSN_DEFINE_int32(pressureclient, hashkey_len, 64, "hashkey length");
DSN_DEFINE_int32(pressureclient, sortkey_len, 64, "sortkey length");
DSN_DEFINE_int32(pressureclient, value_len, 64, "value length");
DSN_DEFINE_validator(qps, [](int32_t value) -> bool { return value > 0; });

// generate hashkey/sortkey between [0, ****key_limit]
static int64_t hashkey_limit;
static int64_t sortkey_limit;
DSN_DEFINE_int64(pressureclient,
hashkey_limit,
0,
"The hashkey range to generate, in format [0, ****key_limit].");
DSN_DEFINE_int64(pressureclient,
sortkey_limit,
0,
"The sortkey range to generate, in format [0, ****key_limit].");

// for app
static pegasus_client *pg_client = nullptr;
Expand All @@ -59,7 +63,7 @@ std::string fill_string(const std::string &str, int len)

std::string get_hashkey()
{
std::string key = to_string(dsn::rand::next_u64(0, hashkey_limit));
std::string key = to_string(dsn::rand::next_u64(0, FLAGS_hashkey_limit));
if (key.size() >= FLAGS_hashkey_len) {
return key;
} else {
Expand All @@ -69,7 +73,7 @@ std::string get_hashkey()

std::string get_sortkey()
{
std::string key = to_string(dsn::rand::next_u64(0, sortkey_limit));
std::string key = to_string(dsn::rand::next_u64(0, FLAGS_sortkey_limit));
if (key.size() >= FLAGS_sortkey_len) {
return key;
} else {
Expand Down Expand Up @@ -230,12 +234,6 @@ int main(int argc, const char **argv)

op_name = dsn_config_get_value_string("pressureclient", "operation_name", "", "operation name");

hashkey_limit =
(int64_t)dsn_config_get_value_uint64("pressureclient", "hashkey_limit", 0, "hashkey limit");

sortkey_limit =
(int64_t)dsn_config_get_value_uint64("pressureclient", "sortkey_limit", 0, "sortkey limit");

CHECK(!op_name.empty(), "must assign operation name");

LOG_INFO("pressureclient {} qps = {}", op_name, FLAGS_qps);
Expand Down