Skip to content

Commit

Permalink
feat: enable targeted-load for 2PC loadgen
Browse files Browse the repository at this point in the history
This sets up the 2PC load-generator to read the config values the Test
Controller already supports* to limit throughput to a set amount,
optionally increasing it according to a definable schedule. Among
other benefits, this allows the tester to constrain load-generators
directly rather than by relying on the preseed count to artifically
reduce their load.

This also includes the relevant config changes to enable respected
per-loadgen log-levels.

Signed-off-by: Sam Stuewe <stuewe@mit.edu>
  • Loading branch information
HalosGhost committed Jul 11, 2024
1 parent d792c5e commit 5e59516
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
23 changes: 23 additions & 0 deletions src/util/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ namespace cbdc::config {
return ss.str();
}

auto get_loadgen_loglevel_key(size_t loadgen_id) -> std::string {
std::stringstream ss;
ss << loadgen_prefix << loadgen_id << config_separator
<< loglevel_postfix;
return ss.str();
}

auto get_sentinel_private_key_key(size_t sentinel_id) -> std::string {
auto ss = std::stringstream();
get_sentinel_key_prefix(ss, sentinel_id);
Expand Down Expand Up @@ -614,6 +621,22 @@ namespace cbdc::config {

opts.m_loadgen_count
= cfg.get_ulong(loadgen_count_key).value_or(opts.m_loadgen_count);
opts.m_loadgen_tps_target = cfg.get_ulong(tps_target_key)
.value_or(opts.m_loadgen_tps_target);
opts.m_loadgen_tps_step_time
= cfg.get_ulong(tps_steptime_key)
.value_or(opts.m_loadgen_tps_step_time);
opts.m_loadgen_tps_step_size
= cfg.get_ulong(tps_stepsize_key)
.value_or(opts.m_loadgen_tps_step_size);
opts.m_loadgen_tps_initial = cfg.get_ulong(tps_initial_key)
.value_or(opts.m_loadgen_tps_initial);
for (size_t i{0}; i < opts.m_loadgen_count; ++i) {
const auto loadgen_loglevel_key = get_loadgen_loglevel_key(i);
const auto loadgen_loglevel = cfg.get_loglevel(loadgen_loglevel_key)
.value_or(defaults::log_level);
opts.m_loadgen_loglevels.push_back(loadgen_loglevel);
}
}

auto read_options(const std::string& config_file)
Expand Down
17 changes: 17 additions & 0 deletions src/util/common/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ namespace cbdc::config {
static constexpr auto output_count_key = "loadgen_sendtx_output_count";
static constexpr auto invalid_rate_key = "loadgen_invalid_tx_rate";
static constexpr auto fixed_tx_rate_key = "loadgen_fixed_tx_rate";
static constexpr auto loadgen_prefix = "loadgen";
static constexpr auto tps_target_key = "loadgen_tps_target";
static constexpr auto tps_steptime_key = "loadgen_tps_step_time";
static constexpr auto tps_stepsize_key = "loadgen_tps_step_percentage";
static constexpr auto tps_initial_key = "loadgen_tps_step_start";
static constexpr auto archiver_count_key = "archiver_count";
static constexpr auto watchtower_count_key = "watchtower_count";
static constexpr auto watchtower_prefix = "watchtower";
Expand Down Expand Up @@ -250,6 +255,18 @@ namespace cbdc::config {
/// Number of load generators over which to split pre-seeded UTXOs.
size_t m_loadgen_count{0};

/// List of loadgen log levels, ordered by loadgen ID.
std::vector<logging::log_level> m_loadgen_loglevels;

/// Maximum Tx/s the loadgens should produce
size_t m_loadgen_tps_target{0};
/// Initial Tx/s to send at test-start
size_t m_loadgen_tps_initial{0};
/// Tx/s to increase on each step
size_t m_loadgen_tps_step_size{0};
/// Time (in miliseconds) to wait before the next step up
size_t m_loadgen_tps_step_time{0};

/// Private keys for sentinels.
std::unordered_map<size_t, privkey_t> m_sentinel_private_keys;

Expand Down
112 changes: 111 additions & 1 deletion tools/bench/twophase_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ auto main(int argc, char** argv) -> int {
auto cfg = std::get<cbdc::config::options>(cfg_or_err);

auto gen_id = std::stoull(args[2]);
if (gen_id >= cfg.m_loadgen_count) {
std::cerr << "Attempted to run more loadgens than configured"
<< std::endl;
return -1;
}

auto logger
= std::make_shared<cbdc::logging::log>(cbdc::logging::log_level::info);
= std::make_shared<cbdc::logging::log>(cfg.m_loadgen_loglevels[gen_id]);

auto sha2_impl = SHA256AutoDetect();
logger->info("using sha2: ", sha2_impl);
Expand Down Expand Up @@ -115,6 +121,26 @@ auto main(int argc, char** argv) -> int {
logger->info("Mint confirmed");
}

size_t per_gen_send_limit = 0;
size_t per_gen_step_size = 0;
size_t per_gen_send_tgt = 0;
if (cfg.m_loadgen_tps_target != 0) {
per_gen_send_tgt = cfg.m_loadgen_tps_target / cfg.m_loadgen_count;
per_gen_send_limit = cfg.m_loadgen_tps_initial / cfg.m_loadgen_count;
size_t range = cfg.m_loadgen_tps_target - cfg.m_loadgen_tps_initial;
size_t per_gen_range = range / cfg.m_loadgen_count;
per_gen_step_size =
std::max(
std::min(
static_cast<size_t>(per_gen_range * (cfg.m_loadgen_tps_step_size * .01)),
per_gen_range),
size_t{1});
}

if (cfg.m_loadgen_tps_step_time == 0) {
per_gen_send_limit = per_gen_send_tgt;
}

static constexpr auto lookup_timeout = std::chrono::milliseconds(5000);
auto status_client = cbdc::locking_shard::rpc::status_client(
cfg.m_locking_shard_readonly_endpoints,
Expand Down Expand Up @@ -161,7 +187,11 @@ auto main(int argc, char** argv) -> int {

constexpr auto send_amt = 5;

uint64_t send_gap{};
uint64_t gen_avg{};
auto ramp_timer_full = std::chrono::nanoseconds(cfg.m_loadgen_tps_step_time * 1000000);
auto ramp_timer = ramp_timer_full;
auto ramping = ramp_timer.count() != 0 && per_gen_send_limit != per_gen_send_tgt;
auto gen_thread = std::thread([&]() {
while(running) {
// Determine if we should attempt to send a double-spending
Expand Down Expand Up @@ -219,8 +249,49 @@ auto main(int argc, char** argv) -> int {
gen_avg = static_cast<uint64_t>(
(static_cast<double>(gen_t.count()) * average_factor)
+ (static_cast<double>(gen_avg) * (1.0 - average_factor)));
if (ramping) {
if (gen_t >= ramp_timer) {
logger->debug("Ramp Timer Exhausted (gen_t). Resetting");
ramp_timer = ramp_timer_full;
per_gen_send_limit = std::min(
per_gen_send_tgt,
per_gen_send_limit + per_gen_step_size
);
logger->debug("New Send Limit:", per_gen_send_limit);
if (per_gen_send_limit == per_gen_send_tgt) {
ramping = false;
logger->info("Reached Target Throughput");
}
} else {
ramp_timer -= gen_t;
}
}
auto total_send_time = std::chrono::nanoseconds(gen_avg * per_gen_send_limit);
if (total_send_time < std::chrono::seconds(1)) {
send_gap = (std::chrono::seconds(1)
- total_send_time).count() / per_gen_send_limit;
logger->trace("New send-gap:", send_gap);
}
} else {
std::this_thread::sleep_for(std::chrono::nanoseconds(gen_avg));
if (ramping) {
auto avg = std::chrono::nanoseconds(gen_avg);
if (avg >= ramp_timer) {
logger->debug("Ramp Timer Exhausted (dbl-spend gen_avg). Resetting");
ramp_timer = ramp_timer_full;
per_gen_send_limit = std::min(
per_gen_send_tgt,
per_gen_send_limit + per_gen_step_size
);
logger->debug("New Send Limit:", per_gen_send_limit);
if (per_gen_send_limit == per_gen_send_tgt) {
ramping = false;
logger->info("Reached Target Throughput");
}
} else {
ramp_timer -= avg;
}
}
}

// We couldn't send a double-spend or a newly generated valid
Expand All @@ -234,6 +305,23 @@ auto main(int argc, char** argv) -> int {
// instead.
static constexpr auto send_delay = std::chrono::seconds(1);
std::this_thread::sleep_for(send_delay);
if (ramping) {
if (send_delay >= ramp_timer) {
logger->debug("Ramp Timer Exhausted (send_delay). Resetting");
ramp_timer = ramp_timer_full;
per_gen_send_limit = std::min(
per_gen_send_tgt,
per_gen_send_limit + per_gen_step_size
);
logger->debug("New Send Limit:", per_gen_send_limit);
if (per_gen_send_limit == per_gen_send_tgt) {
ramping = false;
logger->info("Reached Target Throughput");
}
} else {
ramp_timer -= send_delay;
}
}
continue;
}

Expand Down Expand Up @@ -280,6 +368,28 @@ auto main(int argc, char** argv) -> int {
logger->error("Failure sending transaction to sentinel");
wallet.confirm_inputs(tx.value().m_inputs);
}

auto gap = std::chrono::nanoseconds(send_gap);
if (gap < std::chrono::seconds(1)) {
std::this_thread::sleep_for(gap);
if (ramping) {
if (gap >= ramp_timer) {
logger->debug("Ramp Timer Exhausted (gap). Resetting");
ramp_timer = ramp_timer_full;
per_gen_send_limit = std::min(
per_gen_send_tgt,
per_gen_send_limit + per_gen_step_size
);
logger->debug("New Send Limit:", per_gen_send_limit);
if (per_gen_send_limit == per_gen_send_tgt) {
ramping = false;
logger->info("Reached Target Throughput");
}
} else {
ramp_timer -= gap;
}
}
}
}
});

Expand Down

0 comments on commit 5e59516

Please sign in to comment.