diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index f2ff1a4105..08d2d5dfad 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -100,7 +100,6 @@ replication_options::replication_options() group_check_disabled = false; checkpoint_disabled = false; - checkpoint_min_decree_gap = 10000; gc_disabled = false; @@ -209,11 +208,6 @@ void replication_options::initialize() "checkpoint_disabled", checkpoint_disabled, "whether checkpoint is disabled"); - checkpoint_min_decree_gap = - (int64_t)dsn_config_get_value_uint64("replication", - "checkpoint_min_decree_gap", - checkpoint_min_decree_gap, - "minimum decree gap that triggers checkpoint"); gc_disabled = dsn_config_get_value_bool( "replication", "gc_disabled", gc_disabled, "whether to disable garbage collection"); diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 970ad3a459..e9bf90545a 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -76,7 +76,6 @@ class replication_options bool group_check_disabled; bool checkpoint_disabled; - int64_t checkpoint_min_decree_gap; bool gc_disabled; diff --git a/src/server/config.ini b/src/server/config.ini index 86d37ee8c7..18c776e239 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -255,7 +255,6 @@ stateful = true checkpoint_disabled = false checkpoint_interval_seconds = 300 - checkpoint_min_decree_gap = 10000 checkpoint_max_interval_hours = 2 gc_disabled = false diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 4629304b92..1013e4d712 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -58,6 +58,10 @@ DSN_DEFINE_int32(pegasus.server, hotkey_analyse_time_interval_s, 10, "hotkey analyse interval in seconds"); +DSN_DEFINE_int32(pegasus.server, + update_rdb_stat_interval, + 60, + "The interval seconds to update RocksDB statistics, in seconds."); static std::string chkpt_get_dir_name(int64_t decree) { @@ -1695,7 +1699,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON, &_tracker, [this]() { this->update_replica_rocksdb_statistics(); }, - _update_rdb_stat_interval); + std::chrono::seconds(FLAGS_update_rdb_stat_interval)); // These counters are singletons on this server shared by all replicas, their metrics update // task should be scheduled once an interval on the server view. diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 84687353c3..8e651b9a39 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -464,7 +464,6 @@ class pegasus_server_impl : public pegasus_read_service pegasus_context_cache _context_cache; - std::chrono::seconds _update_rdb_stat_interval; ::dsn::task_ptr _update_replica_rdb_stat; static ::dsn::task_ptr _update_server_rdb_stat; diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index 571f7e09ba..edbb49c7c7 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -194,6 +194,11 @@ DSN_DEFINE_uint32(pegasus.server, checkpoint_reserve_time_seconds, 1800, "Minimum seconds of checkpoint to reserve, 0 means no check."); +DSN_DEFINE_int32(pegasus.server, + rocksdb_max_open_files, + -1, + "The number of opened files that can be used by a replica(namely a DB instance). " + "The default value is -1 which means always keep files opened."); static const std::unordered_map INDEX_TYPE_STRING_MAP = { @@ -445,12 +450,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _db_opts.write_buffer_manager = _s_write_buffer_manager; } - int64_t max_open_files = dsn_config_get_value_int64( - "pegasus.server", - "rocksdb_max_open_files", - -1, /* always keep files opened, default by rocksdb */ - "number of opened files that can be used by a replica(namely a DB instance)"); - _db_opts.max_open_files = static_cast(max_open_files); + _db_opts.max_open_files = FLAGS_rocksdb_max_open_files; LOG_INFO_PREFIX("rocksdb_max_open_files = {}", _db_opts.max_open_files); _db_opts.max_log_file_size = static_cast(FLAGS_rocksdb_max_log_file_size); @@ -593,10 +593,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _checkpoint_reserve_min_count = FLAGS_checkpoint_reserve_min_count; _checkpoint_reserve_time_seconds = FLAGS_checkpoint_reserve_time_seconds; - // TODO(yingchun): signed integral type of at least 35 bits, int64_t - _update_rdb_stat_interval = std::chrono::seconds(dsn_config_get_value_uint64( - "pegasus.server", "update_rdb_stat_interval", 60, "update_rdb_stat_interval, in seconds")); - // TODO: move the qps/latency counters and it's statistics to replication_app_base layer std::string str_gpid = _gpid.to_string(); char name[256]; diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index a923a3df48..338e81190d 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -29,6 +29,12 @@ namespace pegasus { namespace server { +DSN_DEFINE_int64(pegasus.server, + dup_lagging_write_threshold_ms, + 10 * 1000, + "If the duration that a write flows from master to slave is larger than this " + "threshold, the write is defined a lagging write."); + DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION) pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) @@ -130,12 +136,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) COUNTER_TYPE_NUMBER_PERCENTILES, "the time (in ms) lag between master and slave in the duplication"); - _dup_lagging_write_threshold_ms = dsn_config_get_value_int64( - "pegasus.server", - "dup_lagging_write_threshold_ms", - 10 * 1000, - "If the duration that a write flows from master to slave is larger than this threshold, " - "the write is defined a lagging write."); _pfc_dup_lagging_writes.init_app_counter( "app.pegasus", fmt::format("dup.lagging_writes@{}", app_name()).c_str(), @@ -327,7 +327,7 @@ int pegasus_write_service::duplicate(int64_t decree, _pfc_duplicate_qps->increment(); auto cleanup = dsn::defer([this, &request]() { uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000; - if (latency_ms > _dup_lagging_write_threshold_ms) { + if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) { _pfc_dup_lagging_writes->increment(); } _pfc_dup_time_lag->set(latency_ms); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index a93fe779bd..2315189f5f 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -197,7 +197,6 @@ class pegasus_write_service : dsn::replication::replica_base uint64_t _batch_start_time; capacity_unit_calculator *_cu_calculator; - int64_t _dup_lagging_write_threshold_ms; ::dsn::perf_counter_wrapper _pfc_put_qps; ::dsn::perf_counter_wrapper _pfc_multi_put_qps; diff --git a/src/server/test/config.ini b/src/server/test/config.ini index bc742fc0d4..a1125b265a 100644 --- a/src/server/test/config.ini +++ b/src/server/test/config.ini @@ -165,7 +165,6 @@ group_check_interval_ms = 100000 checkpoint_disabled = false checkpoint_interval_seconds = 100 -checkpoint_min_decree_gap = 10000 checkpoint_max_interval_hours = 1 gc_disabled = false diff --git a/src/test/pressure_test/main.cpp b/src/test/pressure_test/main.cpp index 2f8b926bd1..8a03ed61d0 100644 --- a/src/test/pressure_test/main.cpp +++ b/src/test/pressure_test/main.cpp @@ -40,10 +40,14 @@ DSN_DEFINE_int32(pressureclient, hashkey_len, 64, "hashkey length"); DSN_DEFINE_int32(pressureclient, sortkey_len, 64, "sortkey length"); DSN_DEFINE_int32(pressureclient, value_len, 64, "value length"); DSN_DEFINE_validator(qps, [](int32_t value) -> bool { return value > 0; }); - -// generate hashkey/sortkey between [0, ****key_limit] -static int64_t hashkey_limit; -static int64_t sortkey_limit; +DSN_DEFINE_int64(pressureclient, + hashkey_limit, + 0, + "The hashkey range to generate, in format [0, ****key_limit]."); +DSN_DEFINE_int64(pressureclient, + sortkey_limit, + 0, + "The sortkey range to generate, in format [0, ****key_limit]."); // for app static pegasus_client *pg_client = nullptr; @@ -59,7 +63,7 @@ std::string fill_string(const std::string &str, int len) std::string get_hashkey() { - std::string key = to_string(dsn::rand::next_u64(0, hashkey_limit)); + std::string key = to_string(dsn::rand::next_u64(0, FLAGS_hashkey_limit)); if (key.size() >= FLAGS_hashkey_len) { return key; } else { @@ -69,7 +73,7 @@ std::string get_hashkey() std::string get_sortkey() { - std::string key = to_string(dsn::rand::next_u64(0, sortkey_limit)); + std::string key = to_string(dsn::rand::next_u64(0, FLAGS_sortkey_limit)); if (key.size() >= FLAGS_sortkey_len) { return key; } else { @@ -230,12 +234,6 @@ int main(int argc, const char **argv) op_name = dsn_config_get_value_string("pressureclient", "operation_name", "", "operation name"); - hashkey_limit = - (int64_t)dsn_config_get_value_uint64("pressureclient", "hashkey_limit", 0, "hashkey limit"); - - sortkey_limit = - (int64_t)dsn_config_get_value_uint64("pressureclient", "sortkey_limit", 0, "sortkey limit"); - CHECK(!op_name.empty(), "must assign operation name"); LOG_INFO("pressureclient {} qps = {}", op_name, FLAGS_qps);