Skip to content

Commit

Permalink
feat: introduce metrics/logs of when pipelining is being throttled (#…
Browse files Browse the repository at this point in the history
…4000)

* feat: introduce metrics/logs of when pipelining is being throttled

Fixes #3999 following up on discussion at #3997.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Oct 28, 2024
1 parent 5dcad85 commit b0d52c6
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 64 deletions.
28 changes: 22 additions & 6 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ ABSL_FLAG(string, admin_bind, "",
ABSL_FLAG(uint64_t, request_cache_limit, 64_MB,
"Amount of memory to use for request cache in bytes - per IO thread.");

ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB,
"Amount of memory to use for parsing pipeline requests - per IO thread.");
ABSL_FLAG(uint64_t, pipeline_buffer_limit, 128_MB,
"Amount of memory to use for storing pipeline requests - per IO thread."
"Please note that clients that send excecissively huge pipelines, "
"may deadlock themselves. See https://github.com/dragonflydb/dragonfly/discussions/3997"
"for details.");

ABSL_FLAG(uint32_t, pipeline_queue_limit, 10000,
"Pipeline queue max length, the server will stop reading from the client socket"
" once its pipeline queue crosses this limit, and will resume once it processes "
"excessive requests. This is to prevent OOM states. Users of huge pipelines sizes "
"may require increasing this limit to prevent the risk of deadlocking."
"See https://github.com/dragonflydb/dragonfly/discussions/3997 for details");

ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB,
"Amount of memory to use for storing pub commands in bytes - per IO thread");
Expand All @@ -63,10 +73,6 @@ ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin
ABSL_FLAG(uint32_t, pipeline_squash, 10,
"Number of queued pipelined commands above which squashing is enabled, 0 means disabled");

ABSL_FLAG(uint32_t, pipeline_queue_limit, 1000,
"Pipeline queue max length, the server will stop reading from the client socket"
" once the pipeline reaches this limit");

// When changing this constant, also update `test_large_cmd` test in connection_test.py.
ABSL_FLAG(uint32_t, max_multi_bulk_len, 1u << 16,
"Maximum multi-bulk (array) length that is "
Expand Down Expand Up @@ -1020,6 +1026,10 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_

if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size())) {
stats_->pipeline_throttle_count++;
LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit: pipeline_bytes "
<< stats_->dispatch_queue_bytes << " queue_size " << dispatch_q_.size()
<< ", consider increasing pipeline_buffer_limit/pipeline_queue_limit";
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit(
Expand Down Expand Up @@ -1826,6 +1836,12 @@ void Connection::BreakOnce(uint32_t ev_mask) {

void Connection::SetMaxQueueLenThreadLocal(uint32_t val) {
tl_queue_backpressure_.pipeline_queue_max_len = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
}

void Connection::SetPipelineBufferLimit(size_t val) {
tl_queue_backpressure_.pipeline_buffer_limit = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
}

void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) {
Expand Down
1 change: 1 addition & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class Connection : public util::Connection {

// Sets max queue length locally in the calling thread.
static void SetMaxQueueLenThreadLocal(uint32_t val);
static void SetPipelineBufferLimit(size_t val);
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
static void TrackRequestSize(bool enable);

Expand Down
3 changes: 2 additions & 1 deletion src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);

ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 112u);
static_assert(kSizeConnStats == 120u);

ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
Expand All @@ -37,6 +37,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_replicas);
ADD(num_blocked_clients);
ADD(num_migrations);
ADD(pipeline_throttle_count);

return *this;
}
Expand Down
3 changes: 3 additions & 0 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ struct ConnectionStats {
uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0;

// Number of events when the pipeline queue was over the limit and was throttled.
uint64_t pipeline_throttle_count = 0;

ConnectionStats& operator+=(const ConnectionStats& o);
};

Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
send_active_ = true;
tl_facade_stats->reply_stats.io_write_cnt++;
tl_facade_stats->reply_stats.io_write_bytes += bsize;
DVLOG(2) << "Writing " << bsize << " bytes of len " << len;
DVLOG(2) << "Writing " << bsize + batch_.size() << " bytes of len " << len;

if (batch_.empty()) {
ec = sink_->Write(v, len);
Expand Down
10 changes: 0 additions & 10 deletions src/server/acl/acl_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -706,16 +706,6 @@ void AclFamily::Init(facade::Listener* main_listener, UserRegistry* registry) {
return;
}
registry_->Init(&CategoryToIdx(), &reverse_cat_table_, &CategoryToCommandsIndex());
config_registry.RegisterMutable("aclfile");
config_registry.RegisterMutable("acllog_max_len", [this](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<size_t>();
if (res.has_value()) {
pool_->AwaitFiberOnAll([&res](auto index, auto* context) {
ServerState::tlocal()->acl_log.SetTotalEntries(res.value());
});
}
return res.has_value();
});
}

std::string AclFamily::AclCatToString(uint32_t acl_category, User::Sign sign) const {
Expand Down
2 changes: 1 addition & 1 deletion src/server/acl/acl_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "facade/dragonfly_connection.h"
#include "server/conn_context.h"

ABSL_FLAG(size_t, acllog_max_len, 32,
ABSL_FLAG(uint32_t, acllog_max_len, 32,
"Specify the number of log entries. Logs are kept locally for each thread "
"and therefore the total number of entries are acllog_max_len * threads");

Expand Down
12 changes: 12 additions & 0 deletions src/server/config_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ class ConfigRegistry {
return *this;
}

template <typename T>
ConfigRegistry& RegisterSetter(std::string_view name, std::function<void(const T&)> f) {
return RegisterMutable(name, [f](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<T>();
if (res.has_value()) {
f(*res);
return true;
}
return false;
});
}

enum class SetResult : uint8_t {
OK,
UNKNOWN,
Expand Down
54 changes: 23 additions & 31 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -883,14 +883,8 @@ Service::~Service() {
void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
InitRedisTables();

config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<MemoryBytesFlag>();
if (!res)
return false;

max_memory_limit = res->value;
return true;
});
config_registry.RegisterSetter<MemoryBytesFlag>(
"maxmemory", [](const MemoryBytesFlag& flag) { max_memory_limit = flag.value; });

config_registry.RegisterMutable("dbfilename");
config_registry.Register("dbnum"); // equivalent to databases in redis.
Expand All @@ -901,32 +895,24 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");

config_registry.RegisterMutable("oom_deny_ratio", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<double>();
if (res.has_value()) {
SetOomDenyRatioOnAllThreads(*res);
}
return res.has_value();
config_registry.RegisterSetter<double>("oom_deny_ratio",
[](double val) { SetOomDenyRatioOnAllThreads(val); });

config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });

config_registry.RegisterMutable("pipeline_squash");

config_registry.RegisterSetter<uint32_t>("pipeline_queue_limit", [](uint32_t val) {
shard_set->pool()->AwaitBrief(
[val](unsigned, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(val); });
});

config_registry.RegisterMutable("rss_oom_deny_ratio", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<double>();
if (res.has_value()) {
SetRssOomDenyRatioOnAllThreads(*res);
}
return res.has_value();
config_registry.RegisterSetter<size_t>("pipeline_buffer_limit", [](size_t val) {
shard_set->pool()->AwaitBrief(
[val](unsigned, auto*) { facade::Connection::SetPipelineBufferLimit(val); });
});
config_registry.RegisterMutable("pipeline_squash");
config_registry.RegisterMutable("pipeline_queue_limit",
[pool = &pp_](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (res.has_value()) {
pool->AwaitBrief([val = *res](unsigned, auto*) {
facade::Connection::SetMaxQueueLenThreadLocal(val);
});
}
return res.has_value();
});

config_registry.RegisterMutable("replica_partial_sync");
config_registry.RegisterMutable("replication_timeout");
config_registry.RegisterMutable("table_growth_margin");
Expand All @@ -951,6 +937,12 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
return true;
});

config_registry.RegisterMutable("aclfile");
config_registry.RegisterSetter<uint32_t>("acllog_max_len", [](uint32_t val) {
shard_set->pool()->AwaitFiberOnAll(
[val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); });
});

serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0 || shard_num > pp_.size()) {
Expand Down
25 changes: 12 additions & 13 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
LOG_FIRST_N(INFO, 1) << "Host OS: " << os_string << " with " << shard_set->pool()->size()
<< " threads";
SetMaxClients(listeners_, absl::GetFlag(FLAGS_maxclients));
config_registry.RegisterMutable("maxclients", [this](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (res.has_value())
SetMaxClients(listeners_, res.value());
return res.has_value();
});
config_registry.RegisterSetter<uint32_t>(
"maxclients", [this](uint32_t val) { SetMaxClients(listeners_, val); });

SetSlowLogThreshold(service_.proactor_pool(), absl::GetFlag(FLAGS_slowlog_log_slower_than));
config_registry.RegisterMutable("slowlog_log_slower_than",
Expand All @@ -832,12 +828,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
return res.has_value();
});
SetSlowLogMaxLen(service_.proactor_pool(), absl::GetFlag(FLAGS_slowlog_max_len));
config_registry.RegisterMutable("slowlog_max_len", [this](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (res.has_value())
SetSlowLogMaxLen(service_.proactor_pool(), res.value());
return res.has_value();
});
config_registry.RegisterSetter<uint32_t>(
"slowlog_max_len", [this](uint32_t val) { SetSlowLogMaxLen(service_.proactor_pool(), val); });

// We only reconfigure TLS when the 'tls' config key changes. Therefore to
// update TLS certs, first update tls_cert_file, then set 'tls true'.
Expand Down Expand Up @@ -1280,6 +1272,8 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_throttle_total", "", conn_stats.pipeline_throttle_count,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_commands_total", "", conn_stats.pipelined_cmd_cnt,
Expand Down Expand Up @@ -2298,6 +2292,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);
append("pipeline_throttle_total", conn_stats.pipeline_throttle_count);
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
append("total_net_input_bytes", conn_stats.io_read_bytes);
append("connection_migrations", conn_stats.num_migrations);
Expand Down Expand Up @@ -2327,9 +2322,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("reply_count", reply_stats.send_stats.count);
append("reply_latency_usec", reply_stats.send_stats.total_duration);

// Number of connections that are currently blocked on grabbing interpreter.
append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter);
append("lua_interpreter_cnt", m.lua_stats.interpreter_cnt);
append("lua_blocked", m.lua_stats.blocked_cnt);

// Total number of events of when a connection was blocked on grabbing interpreter.
append("lua_blocked_total", m.lua_stats.blocked_cnt);
}

if (should_enter("TIERED", true)) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ABSL_DECLARE_FLAG(string, dbfilename);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");
ABSL_DECLARE_FLAG(size_t, acllog_max_len);
ABSL_DECLARE_FLAG(uint32_t, acllog_max_len);
namespace dfly {

std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
Expand Down

0 comments on commit b0d52c6

Please sign in to comment.