From c16ca44a595740ffb6fd8f473e762d3791d71b76 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 23 Jan 2023 15:15:23 -0500 Subject: [PATCH] kvflowcontroller: implement kvflowcontrol.Controller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part of #95563. kvflowcontroller.Controller is a concrete implementation of the kvflowcontrol.Controller interface. It provides flow control for replication traffic in KV and is typically held at the node-level. Internally it maintain flow token buckets for {regular,elastic} work traveling along each kvflowcontrol.Stream. When work tries to Admit(), the controller blocks until there are flow tokens available the specified stream/work class. For each stream we maintain two flow token buckets, one for each work class (regular and elastic). To provide coarse grained prioritization, we want the following properties: - When elastic tokens are deducted/returned, they only affect the elastic bucket. It cannot cause blocking for regular work (which would be a form of priority inversion). - When regular tokens get deducted, they deduct from both the regular and elastic buckets. We do want regular work to starve out elastic work. If sufficient regular work has not yet been logically admitted, and other regular work is waiting for it, we don't want elastic work to be able to go through. To get a sense of what this ends up looking like, consider the following datadriven snippet: history ---- regular | elastic +16MiB | +8.0MiB ====================================== -1.0MiB regular +15MiB | +7.0MiB -7.0MiB regular +8.0MiB | +0B (elastic blocked) -2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) +2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) +4.0MiB regular +8.0MiB | +0B (elastic blocked) +2.0MiB regular +10MiB | +2.0MiB -2.0MiB elastic +10MiB | +0B (elastic blocked) -2.0MiB regular +8.0MiB | -2.0MiB (elastic blocked) +2.0MiB regular +10MiB | +0B (elastic blocked) +2.0MiB elastic +10MiB | +2.0MiB +6.0MiB regular +16MiB | +8.0MiB The kvflowcontrol.Controller implementation is furnished with the following metrics: - kvadmission.flow_controller.{regular,elastic}_tokens_available - kvadmission.flow_controller.{regular,elastic}_tokens_deducted - kvadmission.flow_controller.{regular,elastic}_tokens_returned - kvadmission.flow_controller.{regular,elastic}_tokens_unaccounted - kvadmission.flow_controller.{regular,elastic}_requests_waiting - kvadmission.flow_controller.{regular,elastic}_requests_admitted - kvadmission.flow_controller.{regular,elastic}_requests_errored - kvadmission.flow_controller.{regular,elastic}_wait_duration - kvadmission.flow_controller.{regular,elastic}_stream_count - kvadmission.flow_controller.{regular,elastic}_blocked_stream_count This commit also introduces a deterministic simulator to understand the kind of traffic shaping this flow controller does. The simulator is able to spit out each of the metrics above. To do all this, we added a utility asciitsdb library that comes 'batteries-included' -- it integrates with our metrics registry, is able to scrape relevant metrics, and spit out ascii plots for metric values, all of which we use in tests added in this package. It looks like the following: # Note again the mirroring between token returns which immediately # allows admission, followed by token deductions. plot start=15s end=30s kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB rate=true ---- 2.1 ┤ ╭╮ 2.0 ┤ ╭╮│ ╭╮ ╭╮╮ 1.9 ┤ │╰╮ ││╮ │││ 1.7 ┤ ╭╯╯│ ╭╯╰╮ ╭╯╰╮ 1.6 ┤ ││ │╮ │╯ │ │╯ │ 1.4 ┤ │╯ ││ ╭╯ │ ╭╯ │╮ 1.3 ┤ ╭╯ ╰╮ │ ╰╮ │╯ ││ 1.1 ┤ │╯ │╮ ╭╯ │ ╭╯ ╰╮ 1.0 ┤ ╭╯ ││ │╯ │ │╯ │╮ 0.9 ┤ │╯ ╰╮ ╭╯ │╮ ╭╯ ││ ╭ 0.7 ┤ ╭╯ │ │ ╰╮ ╭╯ ││ ╭╯ 0.6 ┤ ╭╯╯ │╮ ╭╯ │ │╯ ╰╮ │╯ 0.4 ┤ │╯ ││╭╯ │ ╭╯ │╮╭╯ 0.3 ┤╭╯ ╰─╯│ ╰─╯ │╭╯ 0.1 ┼╯╯ │╭╯ ╰╯ ╰╯╯ -0.0 ┼╯ ╰╯ rate(regular_tokens_{deducted,returned}) (MiB) # So we're still admitting work choppily, and observe corresponding # deductions in the waiting request rate. But given the open-loop # thread above, the # of waiting request is still growing # unboundedly. plot start=15s end=30s kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true kvadmission.flow_controller.regular_requests_waiting unit=reqs ---- ---- 10.7 ┤ ╭╮ 9.9 ┼╮ ││ ╭╮ ╭╮ ╭╮ 9.2 ┤╰╮ ╭╯│ │╰╮ ╭─╮ ╭╮ ╭╯│ ││ 8.4 ┤ │ │ ╰╮ │ │ ╭╯ │ ╭╯╰╮ ╭╯ │ │╰╮ 7.7 ┤ │ ╭╯ │ │ ╰╮ │ │ │ │ │ ╰╮╭╯ │ 6.9 ┤ ╰╮ │ │╭╯ │ ╭╯ ╰╮│ ╰╮ ╭╯ ││ ╰╮ 6.1 ┤ ╰╮╭╯ ││ ╰╮│ ╭╯ │ │ ││ ╰ 5.4 ┤ ││ ╰│ │╯ │ ╰╮╯ ╭╯ 4.6 ┤ ╰╮ ╭╯ ╰╮ │ ╭╰╮ │╮ 3.9 ┤ ╭╰╮ ││ ╭╯│ │╮ │ │ ││ ╭ 3.1 ┤ ╭╯ ╰╮ │╰╮ │ ╰╮ ╭╯│ ╭╯ ╰╮ ╭╯│ ╭╯ 2.3 ┤ ╭╯ │ │ │ ╭╯ ╰╮ │ │ ╭╯ ╰╮ │ ╰╮╭╯ 1.6 ┤ │ ╰╮╭╯ │ │ │ │ ╰╮│ │ │ ││ 0.8 ┤╭╯ ││ │╭╯ ╰─╯ ╰╯ ╰╮│ ╰╯ 0.1 ┼╯ ││ ╰╯ ╰╯ -0.7 ┤ ╰╯ rate(regular_requests_{admitted,waiting}) (reqs/s) 118 ┤ ╭ 115 ┤ ╭──╯ 112 ┤ ╭╯ 108 ┤ ╭╯ 105 ┤ ╭─────╯ 102 ┤ ╭─╯ 99 ┤ ╭─╯ 96 ┤ ╭─╯ 92 ┤ ╭╯ 89 ┤ ╭─────╯ 86 ┤ ╭─╯ 83 ┤ ╭─╯ 80 ┤ ╭╯ 76 ┤ ╭╯ 73 ┤ ╭──────╯ 70 ┼─╯ regular_requests_waiting (reqs) ---- ---- Release note: None --- DEPS.bzl | 10 + build/bazelutil/distdir_files.bzl | 1 + go.mod | 1 + go.sum | 2 + pkg/BUILD.bazel | 6 + pkg/kv/kvserver/kvflowcontrol/BUILD.bazel | 2 + pkg/kv/kvserver/kvflowcontrol/doc.go | 2 + .../kvserver/kvflowcontrol/kvflowcontrol.go | 34 +- .../kvflowcontroller/BUILD.bazel | 55 ++ .../kvflowcontrol_token_adjustment_test.go | 159 ++++++ .../kvflowcontroller/kvflowcontroller.go | 424 ++++++++++++++ .../kvflowcontroller_metrics.go | 238 ++++++++ .../kvflowcontroller_simulation_test.go | 523 ++++++++++++++++++ .../testdata/flow_token_adjustment | 65 +++ .../simulation/admitting_waiting_choppy | 198 +++++++ .../testdata/simulation/over_admission | 97 ++++ .../testdata/simulation/overview | 188 +++++++ .../simulation/regular_elastic_prioritization | 122 ++++ pkg/util/admission/admissionpb/admissionpb.go | 30 +- pkg/util/admission/testdata/store_work_queue | 2 +- pkg/util/admission/testdata/work_queue | 134 ++--- pkg/util/asciitsdb/BUILD.bazel | 18 + pkg/util/asciitsdb/asciitsdb.go | 311 +++++++++++ 23 files changed, 2549 insertions(+), 73 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization create mode 100644 pkg/util/asciitsdb/BUILD.bazel create mode 100644 pkg/util/asciitsdb/asciitsdb.go diff --git a/DEPS.bzl b/DEPS.bzl index 5763daa75831..f07b5ba1e02d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -6023,6 +6023,16 @@ def go_deps(): "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mjibson/esc/com_github_mjibson_esc-v0.2.0.zip", ], ) + go_repository( + name = "com_github_mkungla_bexp_v3", + build_file_proto_mode = "disable_global", + importpath = "github.com/mkungla/bexp/v3", + sha256 = "903a932e83da8be3426e29c1484d79f814a825b2c2743c36d43b054910a9f886", + strip_prefix = "github.com/mkungla/bexp/v3@v3.0.1", + urls = [ + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mkungla/bexp/v3/com_github_mkungla_bexp_v3-v3.0.1.zip", + ], + ) go_repository( name = "com_github_mmatczuk_go_generics", build_file_proto_mode = "disable_global", diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index d8ac51caeb57..d608b2af1401 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -627,6 +627,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mitchellh/osext/com_github_mitchellh_osext-v0.0.0-20151018003038-5e2d6d41470f.zip": "d8e6e5f6bd749cfa0c1c17c40f5dc0fd19e4a0a83245f46bde23bea4e65d1a20", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mitchellh/reflectwalk/com_github_mitchellh_reflectwalk-v1.0.0.zip": "318ab84e22d4554a7540c7ebc9b4fb607e2608578c3a5bb72434203988048145", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mjibson/esc/com_github_mjibson_esc-v0.2.0.zip": "9f090786bd43dddb5c0d798b449d5e8aede4cb7d106f56dcac0aebd8fd1929cc", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mkungla/bexp/v3/com_github_mkungla_bexp_v3-v3.0.1.zip": "903a932e83da8be3426e29c1484d79f814a825b2c2743c36d43b054910a9f886", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mmatczuk/go_generics/com_github_mmatczuk_go_generics-v0.0.0-20181212143635-0aaa050f9bab.zip": "18c1e95c93f1f82be0184bc13bf49eb4350c7a4ff524b1bf440b3eb9ff14acc9", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mmcloughlin/geohash/com_github_mmcloughlin_geohash-v0.9.0.zip": "7162856858d9bb3c411d4b42ad19dfff579341ddf0580122e3f1ac3be05c7441", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/moby/locker/com_github_moby_locker-v1.0.1.zip": "f07361346d12a24e168db7fb2f21281883bee6060f1aedf7507bccf20c4a793f", diff --git a/go.mod b/go.mod index 55a7d0d49559..a0dbcdb2745c 100644 --- a/go.mod +++ b/go.mod @@ -176,6 +176,7 @@ require ( github.com/mattn/goveralls v0.0.2 github.com/mibk/dupl v1.0.0 github.com/mitchellh/reflectwalk v1.0.0 + github.com/mkungla/bexp/v3 v3.0.1 github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab github.com/montanaflynn/stats v0.6.3 github.com/mozillazg/go-slugify v0.2.0 diff --git a/go.sum b/go.sum index 1f5f69b057c6..b39406359cca 100644 --- a/go.sum +++ b/go.sum @@ -1673,6 +1673,8 @@ github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQ github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mjibson/esc v0.2.0/go.mod h1:9Hw9gxxfHulMF5OJKCyhYD7PzlSdhzXyaGEBRPH1OPs= +github.com/mkungla/bexp/v3 v3.0.1 h1:UqcWAaxWn+rmJ+3ZgwokMrUerGMUeOFihUPrTLPFZ9Q= +github.com/mkungla/bexp/v3 v3.0.1/go.mod h1:zkzndAaEcYdZcu+cB4WkNYQuG7pwjzMOZLn3vM8KD+8= github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab h1:QjLgi/L+MjxysinrA8KkNZLf3cAhTluBoSXUvFeN144= github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab/go.mod h1:Fs8p4al9GNa3b42YfZOoUVMdjQ2WlNEYlnoboJKPpJ8= github.com/mmcloughlin/geohash v0.9.0 h1:FihR004p/aE1Sju6gcVq5OLDqGcMnpBY+8moBqIsVOs= diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 861d35784cd7..b2a6437a20f3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -204,6 +204,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/gc:gc_test", "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", "//pkg/kv/kvserver/liveness:liveness_test", "//pkg/kv/kvserver/logstore:logstore_test", @@ -1216,6 +1217,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvadmission:kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol", "//pkg/kv/kvserver/kvserverbase:kvserverbase", @@ -2018,6 +2021,7 @@ GO_TARGETS = [ "//pkg/util/admission:admission", "//pkg/util/admission:admission_test", "//pkg/util/arith:arith", + "//pkg/util/asciitsdb:asciitsdb", "//pkg/util/binfetcher:binfetcher", "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray", @@ -2615,6 +2619,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/intentresolver:get_x_data", "//pkg/kv/kvserver/kvadmission:get_x_data", "//pkg/kv/kvserver/kvflowcontrol:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", @@ -3119,6 +3124,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/admission:get_x_data", "//pkg/util/admission/admissionpb:get_x_data", "//pkg/util/arith:get_x_data", + "//pkg/util/asciitsdb:get_x_data", "//pkg/util/binfetcher:get_x_data", "//pkg/util/bitarray:get_x_data", "//pkg/util/bufalloc:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel index 04ef54d4893d..4817b09ce5a9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -13,6 +13,8 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/roachpb", "//pkg/util/admission/admissionpb", + "@com_github_cockroachdb_redact//:redact", + "@com_github_dustin_go_humanize//:go-humanize", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index f4b1a0ed64a3..285cb16cce1e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -353,3 +353,5 @@ package kvflowcontrol // the receiver to send messages to return tokens up to some point, and // (b) the receiver has either not received the message for which we've // deducted tokens, or forgotten about it. +// - Ensure that flow tokens aren't leaked, by checking that after the tests +// quiesce, flow tokens are back to their original limits. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 68f2cfa7b3a0..635b946f6b14 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -19,6 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/redact" + "github.com/dustin/go-humanize" ) // Stream models the stream over which we replicate data traffic, the @@ -34,7 +36,10 @@ type Stream struct { // Tokens represent the finite capacity of a given stream, expressed in bytes // for data we're looking to replicate. Use of replication streams are // predicated on tokens being available. -type Tokens uint64 +// +// NB: We use a signed integer to accommodate data structures that deal with +// token deltas, or buckets that are allowed to go into debt. +type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. @@ -136,3 +141,30 @@ type DispatchReader interface { PendingDispatch() []roachpb.NodeID PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries } + +func (t Tokens) String() string { + return redact.StringWithoutMarkers(t) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (t Tokens) SafeFormat(p redact.SafePrinter, verb rune) { + sign := "+" + if t < 0 { + sign = "-" + t = -t + } + p.Printf("%s%s", sign, humanize.IBytes(uint64(t))) +} + +func (s Stream) String() string { + return redact.StringWithoutMarkers(s) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (s Stream) SafeFormat(p redact.SafePrinter, verb rune) { + tenantSt := s.TenantID.String() + if s.TenantID.IsSystem() { + tenantSt = "1" + } + p.Printf("t%s/s%s", tenantSt, s.StoreID.String()) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel new file mode 100644 index 000000000000..eaf9f5c6311b --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel @@ -0,0 +1,55 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvflowcontroller", + srcs = [ + "kvflowcontroller.go", + "kvflowcontroller_metrics.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "kvflowcontroller_test", + srcs = [ + "kvflowcontrol_token_adjustment_test.go", + "kvflowcontroller_simulation_test.go", + ], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + embed = [":kvflowcontroller"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/testutils/datapathutils", + "//pkg/util/admission/admissionpb", + "//pkg/util/asciitsdb", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/timeutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_dustin_go_humanize//:go-humanize", + "@com_github_guptarohit_asciigraph//:asciigraph", + "@com_github_mkungla_bexp_v3//:bexp", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go new file mode 100644 index 000000000000..9bd06ef6f547 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go @@ -0,0 +1,159 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/datadriven" + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/require" +) + +func TestFlowTokenAdjustment(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var ( + ctx = context.Background() + controller *Controller + adjustments []adjustment + stream = kvflowcontrol.Stream{ + TenantID: roachpb.SystemTenantID, + StoreID: roachpb.StoreID(1), + } + ) + + datadriven.RunTest(t, "testdata/flow_token_adjustment", + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + controller = New( + metric.NewRegistry(), + cluster.MakeTestingClusterSettings(), + hlc.NewClockWithSystemTimeSource(time.Nanosecond), + ) + adjustments = nil + return "" + + case "adjust": + require.NotNilf(t, controller, "uninitialized flow controller (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.Len(t, parts, 2, "expected form 'class={regular,elastic} delta={+,-}") + + var delta kvflowcontrol.Tokens + var pri admissionpb.WorkPriority + + // Parse class={regular,elastic}. + parts[0] = strings.TrimSpace(parts[0]) + require.True(t, strings.HasPrefix(parts[0], "class=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "class=") + switch parts[0] { + case "regular": + pri = admissionpb.NormalPri + case "elastic": + pri = admissionpb.BulkNormalPri + } + + // Parse delta={+,-} + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "delta=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "delta=") + require.True(t, strings.HasPrefix(parts[1], "+") || strings.HasPrefix(parts[1], "-")) + isPositive := strings.Contains(parts[1], "+") + parts[1] = strings.TrimPrefix(parts[1], "+") + parts[1] = strings.TrimPrefix(parts[1], "-") + bytes, err := humanize.ParseBytes(parts[1]) + require.NoError(t, err) + delta = kvflowcontrol.Tokens(int64(bytes)) + if !isPositive { + delta = -delta + } + + controller.adjustTokens(ctx, pri, delta, stream) + adjustments = append(adjustments, adjustment{ + pri: pri, + delta: delta, + post: controller.testingGetTokensForStream(stream), + }) + } + return "" + + case "history": + limit := controller.testingGetLimit() + + var buf strings.Builder + buf.WriteString(" regular | elastic\n") + buf.WriteString(fmt.Sprintf(" %8s | %8s\n", + printTrimmedTokens(limit[admissionpb.RegularWorkClass]), + printTrimmedTokens(limit[admissionpb.ElasticWorkClass]), + )) + buf.WriteString("======================================\n") + for _, h := range adjustments { + buf.WriteString(fmt.Sprintf("%s\n", h)) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} + +type adjustment struct { + pri admissionpb.WorkPriority + delta kvflowcontrol.Tokens + post tokensPerWorkClass +} + +func printTrimmedTokens(t kvflowcontrol.Tokens) string { + return strings.ReplaceAll(t.String(), " ", "") +} + +func (h adjustment) String() string { + class := admissionpb.WorkClassFromPri(h.pri) + + comment := "" + if h.post[admissionpb.RegularWorkClass] <= 0 { + comment = "regular" + } + if h.post[admissionpb.ElasticWorkClass] <= 0 { + if len(comment) == 0 { + comment = "elastic" + } else { + comment = "regular and elastic" + } + } + if len(comment) != 0 { + comment = fmt.Sprintf(" (%s blocked)", comment) + } + return fmt.Sprintf("%8s %7s %8s | %8s%s", + printTrimmedTokens(h.delta), + class, + printTrimmedTokens(h.post[admissionpb.RegularWorkClass]), + printTrimmedTokens(h.post[admissionpb.ElasticWorkClass]), + comment, + ) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go new file mode 100644 index 000000000000..69a760d47c59 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -0,0 +1,424 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +var regularTokensPerStream = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kvadmission.flow_controller.regular_tokens_per_stream", + "flow tokens available for regular work on a per-stream basis", + 16<<20, // 16 MiB + validateTokenRange, +) + +var elasticTokensPerStream = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kvadmission.flow_controller.elastic_tokens_per_stream", + "flow tokens available for elastic work on a per-stream basis", + 8<<20, // 8 MiB + validateTokenRange, +) + +// Aliases to make the code below slightly easier to read. +const regular, elastic = admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass + +// Controller is a concrete implementation of the kvflowcontrol.Controller +// interface. It provides flow control for replication traffic in KV and is +// typically held at the node-level. +type Controller struct { + mu struct { + syncutil.Mutex + + // Token limit per work class, tracking + // kvadmission.flow_controller.{regular,elastic}_tokens_per_stream. + limit tokensPerWorkClass + + // We maintain flow token buckets for {regular,elastic} work along each + // stream. This is lazily instantiated. + // + // TODO(irfansharif): Sort out the GC story for these buckets. When + // streams get closed permanently (tenants get deleted, nodes removed) + // or when completely inactive (no tokens deducted/returned over 30+ + // minutes), clear these out. + buckets map[kvflowcontrol.Stream]bucket + } + metrics *metrics + clock *hlc.Clock +} + +var _ kvflowcontrol.Controller = &Controller{} + +// New constructs a new Controller. +func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock) *Controller { + c := &Controller{ + clock: clock, + } + + regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)) + elasticTokens := kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)) + c.mu.limit = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + regular: regularTokens, + elastic: elasticTokens, + } + c.mu.buckets = make(map[kvflowcontrol.Stream]bucket) + regularTokensPerStream.SetOnChange(&settings.SV, func(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + before := tokensPerWorkClass{ + regular: c.mu.limit[regular], + elastic: c.mu.limit[elastic], + } + now := tokensPerWorkClass{ + regular: kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)), + elastic: kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)), + } + adjustment := tokensPerWorkClass{ + regular: now[regular] - before[regular], + elastic: now[elastic] - before[elastic], + } + c.mu.limit = now + for _, b := range c.mu.buckets { + b.tokens[regular] += adjustment[regular] + b.tokens[elastic] += adjustment[elastic] + c.metrics.onTokenAdjustment(adjustment) + if adjustment[regular] > 0 || adjustment[elastic] > 0 { + b.signal() // signal a waiter, if any + } + } + }) + c.metrics = newMetrics(c) + registry.AddMetricStruct(c.metrics) + return c +} + +// Admit is part of the kvflowcontrol.Controller interface. It blocks until +// there are flow tokens available for replication over the given stream for +// work of the given priority. +func (c *Controller) Admit( + ctx context.Context, pri admissionpb.WorkPriority, _ time.Time, stream kvflowcontrol.Stream, +) error { + class := admissionpb.WorkClassFromPri(pri) + c.metrics.onWaiting(class) + + logged := false + tstart := c.clock.PhysicalTime() + for { + c.mu.Lock() + b := c.getBucketLocked(stream) + tokens := b.tokens[class] + c.mu.Unlock() + + if tokens > 0 { + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "flow tokens available (pri=%s stream=%s tokens=%s wait-duration=%s)", + pri, stream, tokens, c.clock.PhysicalTime().Sub(tstart)) + } + + // TODO(irfansharif): Right now we continue forwarding admission + // grants to request while the available tokens > 0, which can lead + // to over-admission since deduction is deferred (see + // testdata/simulation/over_admission). + // A. One mitigation could be terminating grant forwarding if the + // 'tentatively deducted tokens' exceeds some amount (say, 16 + // MiB). When tokens are actually deducted, we'll reduce from + // this 'tentatively deducted' count. We can re-signal() on every + // actual token deduction where available tokens is still > 0. + // B. The pathological case with A is when all these tentatively + // deducted tokens are due to requests that are waiting on + // latches and locks. And the next request that wants to be + // admitted is not contending with those latches/locks but gets + // stuck behind it anyway. We could instead count the # of + // requests that go through this bucket and are also past the + // latch/lock acquisition but not yet evaluated, and block if + // that count is greater than some small multiple of GOMAXPROCS. + + b.signal() // signal a waiter, if any + c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) + return nil + } + + if !logged && log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "waiting for flow tokens (pri=%s stream=%s tokens=%s)", + pri, stream, tokens) + logged = true + } + + select { + case <-b.wait(): // wait for a signal + case <-ctx.Done(): + if ctx.Err() != nil { + c.metrics.onErrored(class) + } + return ctx.Err() + } + } + + // TODO(irfansharif): Use the create time for ordering among waiting + // requests. Integrate it with epoch-LIFO. +} + +// DeductTokens is part of the kvflowcontrol.Controller interface. +func (c *Controller) DeductTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + tokens kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) bool { + if tokens < 0 { + log.Fatalf(ctx, "malformed argument: -ve tokens deducted (pri=%s tokens=%s stream=%s)", + pri, tokens, stream) + } + c.adjustTokens(ctx, pri, -tokens, stream) + return true +} + +// ReturnTokens is part of the kvflowcontrol.Controller interface. +func (c *Controller) ReturnTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + tokens kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) { + if tokens < 0 { + log.Fatalf(ctx, "malformed argument: -ve tokens deducted (pri=%s tokens=%s stream=%s)", + pri, tokens, stream) + } + c.adjustTokens(ctx, pri, tokens, stream) +} + +func (c *Controller) adjustTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + delta kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) { + class := admissionpb.WorkClassFromPri(pri) + + c.mu.Lock() + defer c.mu.Unlock() + + b := c.getBucketLocked(stream) + adjustment, unaccounted := b.adjust(ctx, class, delta, c.mu.limit) + c.metrics.onTokenAdjustment(adjustment) + c.metrics.onUnaccounted(unaccounted) + if adjustment[regular] > 0 || adjustment[elastic] > 0 { + b.signal() // signal a waiter, if any + } + + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "adjusted flow tokens (pri=%s stream=%s delta=%s): regular=%s elastic=%s", + pri, stream, delta, b.tokens[regular], b.tokens[elastic]) + } +} + +func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) bucket { + b, ok := c.mu.buckets[stream] + if !ok { + b = newBucket(c.mu.limit) + c.mu.buckets[stream] = b + } + return b +} + +// bucket holds flow tokens for {regular,elastic} traffic over a +// kvflowcontrol.Stream. It's used to synchronize handoff between threads +// returning and waiting for flow tokens. +type bucket struct { + tokens tokensPerWorkClass // protected by Controller.mu + // Waiting requests do so by waiting on signalCh without holding mutexes. + // Requests first check for available tokens, waiting if unavailable. + // - Whenever tokens are returned, signalCh is signaled, waking up a single + // waiting request. If the request finds no available tokens, it starts + // waiting again. + // - Whenever a request gets admitted, it signals the next waiter if any. + // The invariant we are ensuring is that whenever there are tokens + // available, the channel will have an entry, so at least one request that + // observed unavailable tokens will get unblocked, which will in turn + // unblock others. The concurrent request that adds tokens also tops up the + // channel so that the at least one waiter that observed unavailable tokens + // will find the channel signaled. + signalCh chan struct{} +} + +func newBucket(t tokensPerWorkClass) bucket { + return bucket{ + tokens: map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + regular: t[regular], + elastic: t[elastic], + }, + signalCh: make(chan struct{}, 1), + } +} + +func (b *bucket) signal() { + select { + case b.signalCh <- struct{}{}: // non-blocking channel write that ensures it's topped up to 1 entry + default: + } +} + +func (b *bucket) wait() chan struct{} { + return b.signalCh +} + +func (b *bucket) adjust( + ctx context.Context, + class admissionpb.WorkClass, + delta kvflowcontrol.Tokens, + limit tokensPerWorkClass, +) (adjustment, unaccounted tokensPerWorkClass) { + unaccounted = tokensPerWorkClass{ + regular: 0, + elastic: 0, + } + + before := tokensPerWorkClass{ + regular: b.tokens[regular], + elastic: b.tokens[elastic], + } + + switch class { + case elastic: + // Elastic {deductions,returns} only affect elastic flow tokens. + b.tokens[class] += delta + if delta > 0 && b.tokens[class] > limit[class] { + unaccounted[class] = b.tokens[class] - limit[class] + b.tokens[class] = limit[class] // enforce ceiling + } + case regular: + b.tokens[class] += delta + if delta > 0 && b.tokens[class] > limit[class] { + unaccounted[class] = b.tokens[class] - limit[class] + b.tokens[class] = limit[class] // enforce ceiling + } + + b.tokens[elastic] += delta + if delta > 0 && b.tokens[elastic] > limit[elastic] { + unaccounted[elastic] = b.tokens[elastic] - limit[elastic] + b.tokens[elastic] = limit[elastic] // enforce ceiling + } + } + + if buildutil.CrdbTestBuild && (unaccounted[regular] != 0 || unaccounted[elastic] != 0) { + log.Fatalf(ctx, "unaccounted[regular]=%s unaccounted[elastic]=%s for class=%s delta=%s limit[regular]=%s limit[elastic]=%s", + unaccounted[regular], unaccounted[elastic], class, delta, limit[regular], limit[elastic]) + } + + adjustment = tokensPerWorkClass{ + regular: b.tokens[regular] - before[regular], + elastic: b.tokens[elastic] - before[elastic], + } + return adjustment, unaccounted +} + +type tokensPerWorkClass map[admissionpb.WorkClass]kvflowcontrol.Tokens + +const ( + minTokensPerStream kvflowcontrol.Tokens = 1 << 20 // 1 MiB + maxTokensPerStream kvflowcontrol.Tokens = 64 << 20 // 64 MiB +) + +func validateTokenRange(b int64) error { + t := kvflowcontrol.Tokens(b) + if t < minTokensPerStream { + return fmt.Errorf("minimum flowed tokens allowed is %s, got %s", minTokensPerStream, t) + } + if t > maxTokensPerStream { + return fmt.Errorf("maximum flow tokens allowed is %s, got %s", maxTokensPerStream, t) + } + return nil +} + +func (c *Controller) testingGetTokensForStream(stream kvflowcontrol.Stream) tokensPerWorkClass { + c.mu.Lock() + defer c.mu.Unlock() + ret := make(map[admissionpb.WorkClass]kvflowcontrol.Tokens) + for wc, c := range c.getBucketLocked(stream).tokens { + ret[wc] = c + } + return ret +} + +func (c *Controller) testingGetLimit() tokensPerWorkClass { + c.mu.Lock() + defer c.mu.Unlock() + return c.mu.limit +} + +// testingNonBlockingAdmit is a non-blocking alternative to Admit() for use in +// tests. +// - it checks if we have a non-zero number of flow tokens +// - if we do, we return immediately with admitted=true +// - if we don't, we return admitted=false and two callbacks: +// - signaled, which can be polled to check whether we're ready to try and +// admitting again; +// - admit, which can be used to try and admit again. If still not admitted, +// callers are to wait until they're signaled again. +func (c *Controller) testingNonBlockingAdmit( + pri admissionpb.WorkPriority, stream kvflowcontrol.Stream, +) (admitted bool, signaled func() bool, admit func() bool) { + class := admissionpb.WorkClassFromPri(pri) + c.metrics.onWaiting(class) + + tstart := c.clock.PhysicalTime() + admit = func() bool { + c.mu.Lock() + b := c.getBucketLocked(stream) + tokens := b.tokens[class] + c.mu.Unlock() + + if tokens <= 0 { + return false + } + + b.signal() // signal a waiter, if any + c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) + return true + } + + if admit() { + return true, nil, nil + } + + b := c.testingGetBucket(stream) + return false, b.testingSignaled, admit +} + +func (c *Controller) testingGetBucket(stream kvflowcontrol.Stream) bucket { + c.mu.Lock() + defer c.mu.Unlock() + return c.getBucketLocked(stream) +} + +func (b *bucket) testingSignaled() bool { + select { + case <-b.wait(): // check if signaled + return true + default: + return false + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go new file mode 100644 index 000000000000..9a1eb4f1c40f --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -0,0 +1,238 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" +) + +var ( + // Metric templates for the flow token controller. We make use of aggmetrics + // to get per-tenant metrics and an aggregation across all tenants. Within a + // tenant, we maintain flow tokens for each admissionpb.WorkClass with + // appropriately segmented metrics. We don't export metrics at a per + // kvflowcontrol.Stream-level; streams are identified (partly) by the store + // ID receiving replication traffic, which is a high cardinality measure + // (storeIDs could ratchet up arbitrarily). The similar argument technically + // applies for per-tenant metrics, but there we'd rather eat the cost. + // + // TODO(irfansharif): Actually use aggmetrics. + // TODO(irfansharif): Consider aggregated metrics per remote store, + // aggregated across all tenants. + // TODO(irfansharif): To improve upon the tenant*stores cardinality, + // consider "inspectz" style pages to give token counts and queue lengths of + // individual buckets (#66772). + + flowTokensAvailable = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_available", + Help: "Flow tokens available for %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensDeducted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_deducted", + Help: "Flow tokens deducted by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensReturned = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_returned", + Help: "Flow tokens returned by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensUnaccounted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_unaccounted", + Help: "Flow tokens returned by %s requests that were unaccounted for, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + requestsWaiting = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_waiting", + Help: "Number of %s requests waiting for flow tokens", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + requestsAdmitted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_admitted", + Help: "Number of %s requests admitted by the flow controller", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + requestsErrored = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_errored", + Help: "Number of %s requests that errored out while waiting for flow tokens", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + waitDuration = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_wait_duration", + Help: "Latency histogram for time %s requests spent waiting for flow tokens", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + + totalStreamCount = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_stream_count", + Help: "Total number of replication streams for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } + + blockedStreamCount = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_blocked_stream_count", + Help: "Number of replication streams with no flow tokens available for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } +) + +// annotateMetricTemplateWithWorkClass uses the given metric template to build +// one suitable for the specific work class. +func annotateMetricTemplateWithWorkClass( + wc admissionpb.WorkClass, tmpl metric.Metadata, +) metric.Metadata { + rv := tmpl + rv.Name = fmt.Sprintf(tmpl.Name, wc) + rv.Help = fmt.Sprintf(tmpl.Help, wc) + return rv +} + +type metrics struct { + FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge + FlowTokensDeducted [admissionpb.NumWorkClasses]*metric.Counter + FlowTokensReturned [admissionpb.NumWorkClasses]*metric.Counter + FlowTokensUnaccounted [admissionpb.NumWorkClasses]*metric.Counter + RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge + RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter + RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter + WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram + TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge +} + +var _ metric.Struct = &metrics{} + +func newMetrics(c *Controller) *metrics { + m := &metrics{} + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + wc := wc // copy loop variable + m.FlowTokensAvailable[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, flowTokensAvailable), + func() int64 { + sum := int64(0) + c.mu.Lock() + defer c.mu.Unlock() + for _, wbc := range c.mu.buckets { + sum += int64(wbc.tokens[wc]) + } + return sum + }, + ) + m.FlowTokensDeducted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.FlowTokensReturned[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.FlowTokensUnaccounted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + m.RequestsWaiting[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClass(wc, requestsWaiting), + ) + m.RequestsAdmitted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, requestsAdmitted), + ) + m.RequestsErrored[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, requestsErrored), + ) + m.WaitDuration[wc] = metric.NewHistogram( + metric.HistogramOptions{ + Metadata: annotateMetricTemplateWithWorkClass(wc, waitDuration), + Duration: base.DefaultHistogramWindowInterval(), + Buckets: metric.IOLatencyBuckets, + Mode: metric.HistogramModePrometheus, + }, + ) + m.TotalStreamCount[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, totalStreamCount), + func() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return int64(len(c.mu.buckets)) + }, + ) + m.BlockedStreamCount[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, blockedStreamCount), + func() int64 { + count := int64(0) + c.mu.Lock() + defer c.mu.Unlock() + for _, wbc := range c.mu.buckets { + if wbc.tokens[wc] <= 0 { + count += 1 + } + } + return count + }, + ) + } + return m +} + +func (m *metrics) onWaiting(class admissionpb.WorkClass) { + m.RequestsWaiting[class].Inc(1) +} + +func (m *metrics) onAdmitted(class admissionpb.WorkClass, dur time.Duration) { + m.RequestsAdmitted[class].Inc(1) + m.RequestsWaiting[class].Dec(1) + m.WaitDuration[class].RecordValue(dur.Nanoseconds()) +} + +func (m *metrics) onErrored(class admissionpb.WorkClass) { + m.RequestsErrored[class].Inc(1) +} + +func (m *metrics) onTokenAdjustment(adjustment tokensPerWorkClass) { + for class, delta := range adjustment { + if delta < 0 { + m.FlowTokensDeducted[class].Inc(-int64(delta)) + } else if delta > 0 { + m.FlowTokensReturned[class].Inc(int64(delta)) + } + } +} + +func (m *metrics) onUnaccounted(unaccounted tokensPerWorkClass) { + for class, delta := range unaccounted { + m.FlowTokensUnaccounted[class].Inc(int64(delta)) + } +} + +// MetricStruct implements the metric.Struct interface. +func (m *metrics) MetricStruct() {} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go new file mode 100644 index 000000000000..b595f06d645d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go @@ -0,0 +1,523 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/asciitsdb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/dustin/go-humanize" + "github.com/guptarohit/asciigraph" + "github.com/mkungla/bexp/v3" + "github.com/stretchr/testify/require" +) + +// TestUsingSimulation is a data-driven test for kvflowcontrol.Controller. +// It gives package authors a way to understand how flow tokens are maintained +// for individual replication streams, how write bandwidth is shaped by these +// tokens, and how requests queue/dequeue internally. We provide the following +// syntax: +// +// - "init" +// Initialize the flow controller and test simulator. +// +// - "timeline" +// start= end= class={regular,elastic} \ +// stream=t/s adjust={+,-}/s rate=/s \ +// [deduction-delay=] +// .... +// Creates a "thread" that operates between t='start' to (non-inclusive) +// t='end', issuing the specified 'rate' of requests of the given work +// 'class', over the given 'stream', where the flow tokens are +// {deducted,returned} with the given bandwidth. The 'rate' controls the +// granularity of token adjustment, i.e. if adjust=+100bytes/s and +// rate=5/s, then each return adjusts by +100/5 = +20bytes. If flow tokens +// are being deducted (-ve 'adjust'), they go through Admit() followed by +// DeductTokens(). If they're being returned (+ve 'adjust'), they simply go +// through ReturnTokens(). The optional 'deduction-delay' parameter controls +// the number of ticks between each request being granted admission and it +// deducting the corresponding flow tokens. +// +// - "simulate" [end=] +// Simulate timelines until the optionally specified timestamp. If no +// timestamp is specified, the largest end time of all registered timelines +// is used instead. +// +// - "plot" [height=] [width=] [precision=] \ +// [start=] [end=] +// unit= [rate=true] +// .... +// Plot the flow controller specified metrics (and optionally its rate of +// change) with the specified units. The following metrics are supported: +// a. kvadmission.flow_controller.{regular,elastic}_tokens_{available,deducted,returned} +// b. kvadmission.flow_controller.{regular,elastic}_requests_{waiting,admitted,errored} +// c. kvadmission.flow_controller.{regular,elastic}{,_blocked}_stream_count +// d. kvadmission.flow_controller.{regular,elastic}_wait_duration +// To overlay metrics onto the same plot, the selector supports curly brace +// expansion. If the unit is one of {MiB,MB,KB,KiB,s,ms,us,μs}, or the +// bandwidth equivalents (/s), the y-axis is automatically +// converted. +// +// Internally we make use of a test-only non-blocking interface for the flow +// controller in order to enable deterministic simulation. +func TestUsingSimulation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + datadriven.Walk(t, datapathutils.TestDataPath(t, "simulation"), func(t *testing.T, path string) { + var ( + controller *Controller + simulator *simulator + tsdb *asciitsdb.TSDB + mtime *timeutil.ManualTime + ) + + ctx := context.Background() + datadriven.RunTest(t, path, + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + registry := metric.NewRegistry() + tsdb = asciitsdb.New(t, registry) + mtime = timeutil.NewManualTime(tstart) + controller = New( + registry, + cluster.MakeTestingClusterSettings(), + hlc.NewClock(mtime, time.Nanosecond /* maxOffset */), + ) + tsdb.Register(controller.metrics) + simulator = newSimulator(t, controller, tsdb, mtime) + return "" + + case "timeline": + require.NotNilf(t, controller, "uninitialized flow controller (did you use 'init'?)") + require.NotNilf(t, simulator, "uninitialized simulator (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.True(t, len(parts) >= 6, `expected form: + start= end= class={regular,elastic} \ + stream=t/s adjust={+,-}/s rate=/s \ + [deduction-delay=] +`) + + var ( + start, end time.Time + pri admissionpb.WorkPriority + stream kvflowcontrol.Stream + delta kvflowcontrol.Tokens + rate int + ) + + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + } + + require.True(t, strings.HasPrefix(parts[0], "start=")) + require.True(t, strings.HasPrefix(parts[1], "end=")) + require.True(t, strings.HasPrefix(parts[2], "class=")) + require.True(t, strings.HasPrefix(parts[3], "stream=")) + require.True(t, strings.HasPrefix(parts[4], "adjust=")) + require.True(t, strings.HasPrefix(parts[5], "rate=")) + + for i := range parts[:6] { + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + parts[i] = strings.TrimSpace(inner[1]) + } + + // Parse start=. + dur, err := time.ParseDuration(parts[0]) + require.NoError(t, err) + start = mtime.Now().Add(dur) + + // Parse end=. + dur, err = time.ParseDuration(parts[1]) + require.NoError(t, err) + end = mtime.Now().Add(dur) + + // Parse class={regular,elastic}. + switch parts[2] { + case "regular": + pri = admissionpb.NormalPri + case "elastic": + pri = admissionpb.BulkNormalPri + default: + t.Fatalf("unexpected class: %s", parts[1]) + } + + { // Parse stream=t/s. + inner := strings.Split(parts[3], "/") + require.Len(t, inner, 2) + + ti, err := strconv.Atoi(strings.TrimPrefix(inner[0], "t")) + require.NoError(t, err) + + si, err := strconv.Atoi(strings.TrimPrefix(inner[1], "s")) + require.NoError(t, err) + + stream = kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(uint64(ti)), + StoreID: roachpb.StoreID(si), + } + } + + // Parse adjust={+,-}/s. + isPositive := strings.Contains(parts[4], "+") + parts[4] = strings.TrimPrefix(parts[4], "+") + parts[4] = strings.TrimPrefix(parts[4], "-") + bytes, err := humanize.ParseBytes(strings.TrimSuffix(parts[4], "/s")) + require.NoError(t, err) + delta = kvflowcontrol.Tokens(int64(bytes)) + if !isPositive { + delta = -delta + } + + // Parse rate=/s. + rate, err = strconv.Atoi(strings.TrimSuffix(parts[5], "/s")) + require.NoError(t, err) + + deductionDelay := 0 + if len(parts) > 6 { + for _, part := range parts[6:] { + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "deduction-delay=") { + part = strings.TrimPrefix(part, "deduction-delay=") + dur, err := time.ParseDuration(part) + require.NoError(t, err) + deductionDelay = int(dur.Nanoseconds() / tick.Nanoseconds()) + } + } + } + simulator.timeline(start, end, pri, stream, delta, rate, deductionDelay) + } + return "" + + case "simulate": + require.NotNilf(t, simulator, "uninitialized simulator (did you use 'init'?)") + var end time.Time + if d.HasArg("end") { + // Parse end=. + var endStr string + d.ScanArgs(t, "end", &endStr) + dur, err := time.ParseDuration(endStr) + require.NoError(t, err) + end = mtime.Now().Add(dur) + } + simulator.simulate(ctx, end) + return "" + + case "plot": + var h, w, p = 15, 40, 1 + if d.HasArg("height") { + d.ScanArgs(t, "height", &h) + } + if d.HasArg("width") { + d.ScanArgs(t, "width", &w) + } + if d.HasArg("precision") { + d.ScanArgs(t, "precision", &p) + } + + var buf strings.Builder + for i, line := range strings.Split(d.Input, "\n") { + var ( + selector, unit string + rated bool + ) + parts := strings.Fields(line) + for i, part := range parts { + part = strings.TrimSpace(part) + if i == 0 { + selector = part + continue + } + + if strings.HasPrefix(part, "rate=") { + var err error + rated, err = strconv.ParseBool(strings.TrimPrefix(part, "rate=")) + require.NoError(t, err) + } + + if strings.HasPrefix(part, "unit=") { + unit = strings.TrimPrefix(part, "unit=") + } + } + + caption := strings.TrimPrefix(selector, "kvadmission.flow_controller.") + if rated { + caption = fmt.Sprintf("rate(%s)", caption) + } + caption = fmt.Sprintf("%s (%s)", caption, unit) + + options := []asciitsdb.Option{ + asciitsdb.WithGraphOptions( + asciigraph.Height(h), + asciigraph.Width(w), + asciigraph.Precision(uint(p)), + asciigraph.Caption(caption), + ), + } + if rated { + options = append(options, asciitsdb.WithRate(int(time.Second/metricTick))) + } + switch unit { + case "μs", "us", "microseconds": + time.Microsecond.Nanoseconds() + options = append(options, asciitsdb.WithDivisor(float64(time.Microsecond.Nanoseconds())) /* ns => μs conversion */) + case "ms", "milliseconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Millisecond.Nanoseconds())) /* ns => μs conversion */) + case "s", "seconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Second.Nanoseconds())) /* ns => μs conversion */) + case "MiB", "MiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MiByte) /* 1 MiB */) + case "MB", "MB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MByte) /* 1 MB */) + case "KiB", "KiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KiByte) /* 1 KiB */) + case "KB", "KB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KByte) /* 1 KB */) + default: + } + + start := tstart + if d.HasArg("start") { + // Parse start=. + var startStr string + d.ScanArgs(t, "start", &startStr) + dur, err := time.ParseDuration(startStr) + require.NoError(t, err) + start = tstart.Add(dur) + options = append(options, asciitsdb.WithOffset(start.Sub(tstart).Nanoseconds()/metricTick.Nanoseconds())) + } + + if d.HasArg("end") { + // Parse end=. + var endStr string + d.ScanArgs(t, "end", &endStr) + dur, err := time.ParseDuration(endStr) + require.NoError(t, err) + end := tstart.Add(dur) + options = append(options, asciitsdb.WithLimit(end.Sub(start).Nanoseconds()/metricTick.Nanoseconds())) + } + + if i > 0 { + buf.WriteString("\n\n\n") + } + metrics := bexp.Parse(strings.TrimSpace(selector)) + buf.WriteString(tsdb.Plot(metrics, options...)) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }, + ) + }) +} + +var tstart = timeutil.Unix(0, 0) + +const tick = time.Millisecond +const metricTick = 100 * tick + +type simulator struct { + t *testing.T + + controller *Controller + ticker []ticker + tsdb *asciitsdb.TSDB + mtime *timeutil.ManualTime +} + +func newSimulator( + t *testing.T, controller *Controller, tsdb *asciitsdb.TSDB, mtime *timeutil.ManualTime, +) *simulator { + return &simulator{ + t: t, + tsdb: tsdb, + controller: controller, + mtime: mtime, + } +} + +func (s *simulator) timeline( + start, end time.Time, + pri admissionpb.WorkPriority, + stream kvflowcontrol.Stream, + delta kvflowcontrol.Tokens, + rate, deductionDelay int, +) { + if rate == 0 { + return // nothing to do + } + s.ticker = append(s.ticker, ticker{ + t: s.t, + controller: s.controller, + + start: start, + end: end, + pri: pri, + stream: stream, + + deductionDelay: deductionDelay, + deduct: make(map[time.Time][]func()), + waitHandles: make(map[time.Time]waitHandle), + + // Using the parameters above, we figure out two things: + // - On which ticks do we adjust flow tokens? + // - How much by, each time? + // + // If the request rate we're simulating is: + // - 1000/sec, we adjust flow tokens every tick(=1ms). + // - 500/sec, we adjust flow tokens every 2 ticks (=2ms). + // - .... + // + // How much do we adjust by each time? Given we're making 'rate' requests + // per second, and have to deduct 'delta' tokens per second, each request + // just deducts delta/rate. + mod: int(time.Second/tick) / rate, + delta: kvflowcontrol.Tokens(int(delta) / rate), + }) +} + +func (s *simulator) simulate(ctx context.Context, end time.Time) { + s.mtime.Backwards(s.mtime.Since(tstart)) // reset to tstart + s.tsdb.Clear() + for i := range s.ticker { + s.ticker[i].reset() + if s.ticker[i].end.After(end) { + end = s.ticker[i].end + } + } + + for { + t := s.mtime.Now() + if t.After(end) || t.Equal(end) { + break + } + for i := range s.ticker { + s.ticker[i].tick(ctx, t) + } + if t.UnixNano()%metricTick.Nanoseconds() == 0 { + s.tsdb.Scrape(ctx) + } + s.mtime.Advance(tick) + } +} + +type waitHandle struct { + ctx context.Context + signaled func() bool + admit func() bool +} + +type ticker struct { + t *testing.T + start, end time.Time + pri admissionpb.WorkPriority + stream kvflowcontrol.Stream + delta kvflowcontrol.Tokens + controller *Controller + mod, ticks int // used to control the ticks at which we interact with the controller + + deduct map[time.Time][]func() + waitHandles map[time.Time]waitHandle + + deductionDelay int +} + +func (ti *ticker) tick(ctx context.Context, t time.Time) { + if ds, ok := ti.deduct[t]; ok { + for _, deduct := range ds { + deduct() + } + delete(ti.deduct, t) + } + for key, handle := range ti.waitHandles { + // Process all waiting requests from earlier. Do this even if t > + // ti.end since these requests could've been generated earlier. + if handle.ctx.Err() != nil { + delete(ti.waitHandles, key) + continue + } + if !handle.signaled() { + continue + } + if handle.admit() { + if ti.deductionDelay == 0 { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + } else { + future := t.Add(tick * time.Duration(ti.deductionDelay)) + ti.deduct[future] = append(ti.deduct[future], func() { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + }) + } + delete(ti.waitHandles, key) + return + } + } + + if t.Before(ti.start) || (t.After(ti.end) || t.Equal(ti.end)) { + return // we're outside our [ti.start, ti.end), there's nothing left to do + } + + defer func() { ti.ticks += 1 }() + if ti.ticks%ti.mod != 0 { + return // nothing to do in this tick + } + + if ti.delta >= 0 { // return tokens + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + return + } + + admitted, signaled, admit := ti.controller.testingNonBlockingAdmit(ti.pri, ti.stream) + if admitted { + if ti.deductionDelay == 0 { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + } else { + future := t.Add(tick * time.Duration(ti.deductionDelay)) + ti.deduct[future] = append(ti.deduct[future], func() { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + }) + } + return + } + ti.waitHandles[t] = waitHandle{ + ctx: ctx, + signaled: signaled, + admit: admit, + } +} + +func (ti *ticker) reset() { + ti.ticks = 0 + ti.deduct = make(map[time.Time][]func()) + ti.waitHandles = make(map[time.Time]waitHandle) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment new file mode 100644 index 000000000000..5e4c3a460894 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment @@ -0,0 +1,65 @@ +init +---- + +adjust +class=regular delta=-1MiB +class=regular delta=-7MiB +class=regular delta=-2MiB +class=regular delta=-2MiB +class=regular delta=+2MiB +class=regular delta=-2MiB +class=regular delta=+4MiB +class=regular delta=+2MiB +class=elastic delta=-2MiB +class=regular delta=-2MiB +class=regular delta=+2MiB +class=elastic delta=+2MiB +class=regular delta=+6MiB +---- + +history +---- + regular | elastic + +16MiB | +8.0MiB +====================================== + -1.0MiB regular +15MiB | +7.0MiB + -7.0MiB regular +8.0MiB | +0B (elastic blocked) + -2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) + -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) + +2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) + -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) + +4.0MiB regular +8.0MiB | +0B (elastic blocked) + +2.0MiB regular +10MiB | +2.0MiB + -2.0MiB elastic +10MiB | +0B (elastic blocked) + -2.0MiB regular +8.0MiB | -2.0MiB (elastic blocked) + +2.0MiB regular +10MiB | +0B (elastic blocked) + +2.0MiB elastic +10MiB | +2.0MiB + +6.0MiB regular +16MiB | +8.0MiB + +# ------------------------------------------------------ + +init +---- + +adjust +class=elastic delta=-7MiB +class=regular delta=-7MiB +class=elastic delta=+6MiB +class=regular delta=-1MiB +class=regular delta=-6MiB +class=regular delta=+6MiB +class=regular delta=-9MiB +---- + +history +---- + regular | elastic + +16MiB | +8.0MiB +====================================== + -7.0MiB elastic +16MiB | +1.0MiB + -7.0MiB regular +9.0MiB | -6.0MiB (elastic blocked) + +6.0MiB elastic +9.0MiB | +0B (elastic blocked) + -1.0MiB regular +8.0MiB | -1.0MiB (elastic blocked) + -6.0MiB regular +2.0MiB | -7.0MiB (elastic blocked) + +6.0MiB regular +8.0MiB | -1.0MiB (elastic blocked) + -9.0MiB regular -1.0MiB | -10MiB (regular and elastic blocked) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy new file mode 100644 index 000000000000..371f31f87dc3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy @@ -0,0 +1,198 @@ +# Show how waiting work ends up getting admitted choppily if the flow tokens +# being returned are being done so in a similarly choppy way. This is showing +# that we're shaping incoming writes to exactly the rate of flow token returns, +# i.e. we're controlling the flow tightly. +init +---- + +# Set up an open-loop thread issuing 2MiB/s of regular writes from t=0s to +# t=25s. +timeline +start=0s end=25s class=regular stream=t1/s1 adjust=-2MiB/s rate=10/s +---- + +# Set up choppy flow token returns starting at t=15s. The average rate of +# returns is lower than 2MiB/s, so we should always have some waiting work. +timeline +start=15s end=16s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=16s end=17s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=17s end=18s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=18s end=19s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=19s end=20s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=20s end=21s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=21s end=22s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=22s end=23s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=23s end=24s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=24s end=25s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +---- + +simulate +---- + +# Observe the initial smooth rate of flow token deductions, and later the +# choppier rate of flow token returns which we induced above. Notice that the +# rate of flow token deductions exactly mirrors the flow token returns, so +# traffic shaping is happening. +plot +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB/s rate=true +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB +kvadmission.flow_controller.regular_tokens_available unit=MiB +---- +---- + 2.0 ┤ ╭──────────╮ ╭╮ ╭╮ + 1.9 ┤ │ │ ││ ╭╮ + 1.7 ┤ │ │ ╭╮╮ ││ ││ + 1.6 ┤ │ │ │││ ││ ││ + 1.5 ┤ │ ╰╮ │╰╮ ││ ╭╯│ + 1.3 ┤ │ │ │ │ ││ │╯│ + 1.2 ┤ │ │ │ │ ╭╯│╮ │ │ + 1.1 ┤ │ │ │ │ │ ╰╮ │ │ + 0.9 ┤ │ │ ╭╯ │ │ │ │ │╮╭ + 0.8 ┤ │ │ │╯ │ │ │╭╯ │││ + 0.7 ┤ │ │ │ │ │ ││ ╰╮│ + 0.5 ┤ │ │ │ │╭╯ ││ ││ + 0.4 ┤ │ │ ╭╯ ││╯ ││ ╰╯ + 0.3 ┤ │ │ │╯ ││ ╰╯ ╰╯ + 0.1 ┤ │ ╰╮ │ ╰╯ ╰╯ + 0.0 ┼───────────────────────╯ + rate(regular_tokens_{deducted,returned}) (MiB/s) + + + 26.2 ┤ ╭── + 24.5 ┤ ╭─╯ + 22.7 ┤ ╭──╯ + 21.0 ┤ ╭─╯ + 19.2 ┤ ╭──╯ + 17.5 ┤ ╭─╯ + 15.7 ┤ ╭────────────╯ + 14.0 ┤ ╭╯ + 12.2 ┤ ╭─╯ + 10.5 ┤ ╭╯ ╭─ + 8.7 ┤ ╭─╯ ╭──╯ + 7.0 ┤ ╭╯ ╭──╯ + 5.2 ┤ ╭╯ ╭─╯ + 3.5 ┤ ╭─╯ ╭──╯ + 1.7 ┤╭╯ ╭─╯ + 0.0 ┼────────────────────────╯ + regular_tokens_{deducted,returned} (MiB) + + + 15.8 ┼╮ + 14.7 ┤╰╮ + 13.7 ┤ │ + 12.6 ┤ ╰╮ + 11.5 ┤ ╰╮ + 10.5 ┤ ╰╮ + 9.4 ┤ ╰╮ + 8.3 ┤ ╰╮ + 7.3 ┤ │ + 6.2 ┤ ╰╮ + 5.1 ┤ ╰╮ + 4.1 ┤ ╰╮ + 3.0 ┤ ╰╮ + 1.9 ┤ ╰╮ + 0.9 ┤ │ + -0.2 ┤ ╰─────────────────────────── + regular_tokens_available (MiB) +---- +---- + + +# Zoom into the more interesting second half of the graph, where flow tokens +# are being returned choppily. Given the average rate of returns is lower +# than what's being requested (2MiB/s), the total flow tokens available bounces +# off of zero. +plot start=15s end=30s +kvadmission.flow_controller.regular_tokens_available unit=MiB +---- + 0.2 ┤ ╭╮ + 0.2 ┤ ╭╮ ╭╯│ + 0.1 ┤ ╭╯│ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╮ ╭╮ │ │ + 0.1 ┤ │ │ ││╭╯│ │ │ + 0.0 ┤ │ │ ╭╮ │╰╯ │ │ │ + 0.0 ┤ │ │ ╭╯│ │ │ ╭╮│ │ ╭╮ + -0.0 ┤╭╮ │ │ │ │ │ │ ╭──╯││ │ ││ ╭ + -0.0 ┤││╭╯ │ │ ╰╮│ │ ╭╯ ││ │ ││╭╯ + -0.1 ┤│╰╯ │ │ ││ │╭╯ ╰╯ │ │╰╯ + -0.1 ┤│ │ │ ╰╯ ╰╯ │ │ + -0.1 ┼╯ │ ╭╯ │ ╭╯ + -0.1 ┤ │ ╭╯ │ ╭╯ + -0.2 ┤ │╭╯ │╭╯ + -0.2 ┤ ╰╯ ╰╯ + regular_tokens_available (MiB) + + +# Note again the mirroring between token returns which immediately allows +# admission, followed by token deductions. +plot start=15s end=30s +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB rate=true +---- + 2.1 ┤ ╭╮ + 2.0 ┤ ╭╮│ ╭╮ ╭╮╮ + 1.9 ┤ │╰╮ ││╮ │││ + 1.7 ┤ ╭╯╯│ ╭╯╰╮ ╭╯╰╮ + 1.6 ┤ ││ │╮ │╯ │ │╯ │ + 1.4 ┤ │╯ ││ ╭╯ │ ╭╯ │╮ + 1.3 ┤ ╭╯ ╰╮ │ ╰╮ │╯ ││ + 1.1 ┤ │╯ │╮ ╭╯ │ ╭╯ ╰╮ + 1.0 ┤ ╭╯ ││ │╯ │ │╯ │╮ + 0.9 ┤ │╯ ╰╮ ╭╯ │╮ ╭╯ ││ ╭ + 0.7 ┤ ╭╯ │ │ ╰╮ ╭╯ ││ ╭╯ + 0.6 ┤ ╭╯╯ │╮ ╭╯ │ │╯ ╰╮ │╯ + 0.4 ┤ │╯ ││╭╯ │ ╭╯ │╮╭╯ + 0.3 ┤╭╯ ╰─╯│ ╰─╯ │╭╯ + 0.1 ┼╯╯ │╭╯ ╰╯ ╰╯╯ +-0.0 ┼╯ ╰╯ + rate(regular_tokens_{deducted,returned}) (MiB) + + +# So we're still admitting work choppily, and observe corresponding deductions +# in the waiting request rate. But given the open-loop thread above, the # of +# waiting request is still growing unboundedly. +plot start=15s end=30s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.regular_requests_waiting unit=reqs +---- +---- + 10.7 ┤ ╭╮ + 9.9 ┼╮ ││ ╭╮ ╭╮ ╭╮ + 9.2 ┤╰╮ ╭╯│ │╰╮ ╭─╮ ╭╮ ╭╯│ ││ + 8.4 ┤ │ │ ╰╮ │ │ ╭╯ │ ╭╯╰╮ ╭╯ │ │╰╮ + 7.7 ┤ │ ╭╯ │ │ ╰╮ │ │ │ │ │ ╰╮╭╯ │ + 6.9 ┤ ╰╮ │ │╭╯ │ ╭╯ ╰╮│ ╰╮ ╭╯ ││ ╰╮ + 6.1 ┤ ╰╮╭╯ ││ ╰╮│ ╭╯ │ │ ││ ╰ + 5.4 ┤ ││ ╰│ │╯ │ ╰╮╯ ╭╯ + 4.6 ┤ ╰╮ ╭╯ ╰╮ │ ╭╰╮ │╮ + 3.9 ┤ ╭╰╮ ││ ╭╯│ │╮ │ │ ││ ╭ + 3.1 ┤ ╭╯ ╰╮ │╰╮ │ ╰╮ ╭╯│ ╭╯ ╰╮ ╭╯│ ╭╯ + 2.3 ┤ ╭╯ │ │ │ ╭╯ ╰╮ │ │ ╭╯ ╰╮ │ ╰╮╭╯ + 1.6 ┤ │ ╰╮╭╯ │ │ │ │ ╰╮│ │ │ ││ + 0.8 ┤╭╯ ││ │╭╯ ╰─╯ ╰╯ ╰╮│ ╰╯ + 0.1 ┼╯ ││ ╰╯ ╰╯ + -0.7 ┤ ╰╯ + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 118 ┤ ╭ + 115 ┤ ╭──╯ + 112 ┤ ╭╯ + 108 ┤ ╭╯ + 105 ┤ ╭─────╯ + 102 ┤ ╭─╯ + 99 ┤ ╭─╯ + 96 ┤ ╭─╯ + 92 ┤ ╭╯ + 89 ┤ ╭─────╯ + 86 ┤ ╭─╯ + 83 ┤ ╭─╯ + 80 ┤ ╭╯ + 76 ┤ ╭╯ + 73 ┤ ╭──────╯ + 70 ┼─╯ + regular_requests_waiting (reqs) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission new file mode 100644 index 000000000000..39b74c681351 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission @@ -0,0 +1,97 @@ +# Demonstrate how any delay in token deduction after being admitted can lead to +# over-admission. +init +---- + +# Take away all 16MiB of regular flow tokens; we want a buildup of waiting +# requests to over-admit from. +timeline +start=0s end=1s class=regular stream=t1/s1 adjust=-16MiB/s rate=1/s +---- + +# Queue 10/s*10s=100 requests for admission, asking for 10*4MiB=40MiB of +# tokens. For these requests, induce a 2s delay between Admit() and +# DeductTokens(). +timeline +start=10s end=20s class=regular stream=t1/s1 adjust=-4MiB/s rate=10/s deduction-delay=2s +---- + +# Return 1KiB of flow tokens at t=30. +timeline +start=30s end=31s class=regular stream=t1/s1 adjust=+1KiB/s rate=1/s +---- + +simulate end=40s +---- + +# Observe how the single 1KiB flow token return ends up admitting all 100 +# waiting requests, over-admitting by 40MiB. +# +# TODO(irfansharif): Introduce a "tentative deducted counter" on a per-stream +# basis, to prevent this kind of over-admission. It's likely to occur any time +# there's AC queueing due to CPU control, waiting on locks/latches, etc. +plot start=0s end=40s +kvadmission.flow_controller.regular_tokens_available unit=MiB +kvadmission.flow_controller.regular_requests_admitted unit=reqs/s rate=true +kvadmission.flow_controller.regular_requests_waiting unit=reqs +---- +---- + 0.0 ┼───────────────────────────────╮ + -2.7 ┤ │ + -5.3 ┤ │ + -8.0 ┤ │ + -10.7 ┤ │ + -13.3 ┤ │ + -16.0 ┤ │ + -18.7 ┤ │ + -21.3 ┤ │ + -24.0 ┤ │ + -26.7 ┤ │ + -29.3 ┤ │ + -32.0 ┤ │ + -34.7 ┤ │ + -37.3 ┤ │ + -40.0 ┤ ╰─────── + regular_tokens_available (MiB) + + + 100.0 ┤ ╭╮ + 93.3 ┤ ││ + 86.7 ┤ ││ + 80.0 ┤ ││ + 73.3 ┤ ││ + 66.7 ┤ ││ + 60.0 ┤ ││ + 53.3 ┤ ││ + 46.7 ┤ ││ + 40.0 ┤ ││ + 33.3 ┤ ││ + 26.7 ┤ ││ + 20.0 ┤ ││ + 13.3 ┤ ││ + 6.7 ┤ ││ + 0.0 ┼─────────────────────────────╯╰──────── + rate(regular_requests_admitted) (reqs/s) + + + 100.0 ┤ ╭─────────╮ + 93.3 ┤ ╭╯ │ + 86.7 ┤ ╭╯ │ + 80.0 ┤ │ │ + 73.3 ┤ ╭╯ │ + 66.7 ┤ ╭╯ │ + 60.0 ┤ │ │ + 53.3 ┤ ╭╯ │ + 46.7 ┤ ╭╯ │ + 40.0 ┤ │ │ + 33.3 ┤ ╭╯ │ + 26.7 ┤ ╭╯ │ + 20.0 ┤ │ │ + 13.3 ┤ ╭╯ │ + 6.7 ┤ │ │ + 0.0 ┼──────────╯ ╰───────── + regular_requests_waiting (reqs) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview new file mode 100644 index 000000000000..a70ee8cd31d4 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview @@ -0,0 +1,188 @@ +# Walk through the basics of the datadriven syntax. +init +---- + +# Set up a worker deducting flow tokens at 2MiB/s over 100 reqs/s for regular +# write work to two stores, s1 and s2. With 16MiB or regular tokens, we expect +# it to get depleted in 8s. The remaining 8s of incoming work ends up getting +# queued. +timeline +start=0s end=16s class=regular stream=t1/s1 adjust=-2MiB/s rate=100/s +start=0s end=16s class=regular stream=t1/s2 adjust=-2MiB/s rate=100/s +---- + +# Schedule the return of flow tokens at t=20s in similar increments. +timeline +start=20s end=36s class=regular stream=t1/s1 adjust=+2MiB/s rate=100/s +start=20s end=36s class=regular stream=t1/s2 adjust=+2MiB/s rate=100/s +---- + +simulate end=40s +---- + +# There two replication streams, so we initially have 16*2=32MiB of regular +# flow tokens and 8*2=16MiB of elastic flow tokens. Since we're not returning +# flow tokens until t=20s, we deplete them at t=8s. Also note that despite this +# being regular traffic, we deduct elastic flow tokens for coarse intra-tenant +# prioritization -- those tokens get depleted at t=4s. Both are deducted at +# 4MiB/s (2MiB/s for each stream) until we have 0MiB regular tokens and -16MiB +# elastic tokens. +plot start=0s end=20s +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +kvadmission.flow_controller.{regular,elastic}_tokens_deducted unit=MiB/s rate=true +---- +---- + 32.0 ┼╮ + 28.8 ┤╰─╮ + 25.6 ┤ ╰╮ + 22.4 ┤ ╰─╮ + 19.2 ┤ ╰─╮ + 16.0 ┼╮ ╰╮ + 12.8 ┤╰─╮ ╰─╮ + 9.6 ┤ ╰╮ ╰╮ + 6.4 ┤ ╰─╮ ╰─╮ + 3.2 ┤ ╰─╮ ╰╮ + -0.0 ┤ ╰╮ ╰──────────────────────── + -3.2 ┤ ╰─╮ + -6.4 ┤ ╰╮ + -9.6 ┤ ╰─╮ + -12.8 ┤ ╰╮ + -16.0 ┤ ╰──────────────────────── + {regular,elastic}_tokens_available (MiB) + + + 4.0 ┤ ╭─────────────╮ + 3.7 ┤ │ │ + 3.5 ┤ │ ╰╮ + 3.2 ┤ │ │ + 2.9 ┤ │ │ + 2.7 ┤ │ │ + 2.4 ┤ │ │ + 2.1 ┤ │ │ + 1.9 ┤ │ │ + 1.6 ┤ │ │ + 1.3 ┤ │ ╰╮ + 1.1 ┤ │ │ + 0.8 ┤ │ │ + 0.5 ┤ │ │ + 0.3 ┤ │ │ + 0.0 ┼─╯ ╰───────────────────── + rate({regular,elastic}_tokens_deducted) (MiB/s) +---- +---- + +# Plot the rates at which we (i) admit work when there are flow tokens +# available, and (ii) enqueue work when there are none. Since we're generating +# 100 requests/s for two streams, we observe an aggregate admission rate of +# ~200/s, and then later 200/s of waiting requests growth. There are no +# errors. +plot start=0s end=20s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.{regular,elastic}_requests_errored unit=reqs/s rate=true +---- +---- + 200 ┤ ╭─────────────╮ ╭─────────────╮ + 187 ┤ │ │ │ │ + 173 ┤ │ ╰╮│ │ + 160 ┤ │ ││ │ + 147 ┤ │ ││ │ + 133 ┤ │ ╭╯ ╰╮ + 120 ┤ │ │ │ + 107 ┤ │ │ │ + 93 ┤ │ │ │ + 80 ┤ │ │ │ + 67 ┤ │ │╮ │ + 53 ┤ │ ││ │ + 40 ┤ │ ││ │ + 27 ┤ │ ╭╯│ ╰╮ + 13 ┤ │ │ │ │ + 0 ┼───────────────╯ ╰───────────────╰───── + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 0.0 ┼─────────────────────────────────────── + rate({regular,elastic}_requests_errored) (reqs/s) +---- +---- + +# Confirm that there are two streams underneath, both of which eventually get +# blocked for {regular,elastic} traffic, with the latter happening first. +plot start=0s end=20s +kvadmission.flow_controller.regular_stream_count unit=streams +kvadmission.flow_controller.{regular,elastic}_blocked_stream_count unit=streams +---- +---- + 2.0 ┼─────────────────────────────────────── + regular_stream_count (streams) + + + 2.0 ┤ ╭─────────────────────────────── + 1.9 ┤ │ │ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ │ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ │ + 0.9 ┤ │ │ + 0.8 ┤ │ │ + 0.7 ┤ │ │ + 0.5 ┤ │ │ + 0.4 ┤ │ │ + 0.3 ┤ │ │ + 0.1 ┤ │ │ + 0.0 ┼───────╯───────╯ + {regular,elastic}_blocked_stream_count (streams) +---- +---- + +# Observe what happens once flow tokens are returned -- we start admitting work +# at ~200/s, which matches the rate at which we're reducing the number of +# waiting requests. By t=36s we'll have returned all {regular,elastic} flow +# tokens, including for the requests that had to wait for admission. +plot start=18s end=40s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +---- +---- + 200 ┤ ╭───────────╮ + 175 ┤ │ ╰╮ + 150 ┤ ╭╯ │ + 125 ┤ │ │ + 100 ┤ │ │ + 75 ┤ │ │ + 50 ┤ ╭╯ ╰╮ + 25 ┤ │ │ + 0 ┼───╮ ╭─────────────────── + -25 ┤ │ │ + -50 ┤ ╰╮ ╭╯ + -75 ┤ │ │ + -100 ┤ │ │ + -125 ┤ │ │ + -150 ┤ ╰╮ │ + -175 ┤ │ ╭╯ + -200 ┤ ╰───────────╯ + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 32.0 ┤ ╭─────── + 28.8 ┤ ╭─╯ + 25.6 ┤ ╭╯ + 22.4 ┤ ╭╯ + 19.2 ┤ ╭─╯ + 16.0 ┤ ╭╯ ╭─────── + 12.8 ┤ ╭─╯ ╭─╯ + 9.6 ┤ ╭╯ ╭╯ + 6.4 ┤ ╭─╯ ╭╯ + 3.2 ┤ ╭╯ ╭─╯ + -0.0 ┼──────────────────╯ ╭╯ + -3.2 ┤ ╭─╯ + -6.4 ┤ ╭╯ + -9.6 ┤ ╭─╯ + -12.8 ┤ ╭╯ + -16.0 ┼──────────────────╯ + {regular,elastic}_tokens_available (MiB) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization new file mode 100644 index 000000000000..43df08beebc0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization @@ -0,0 +1,122 @@ +# Show that elastic work can get completely starved out by regular work, but +# not the other way around. +init +---- + +# Kick off two threads, one for each work class, each writing at 1MiB/s over +# across reqs/s. +timeline +start=0s end=16s class=regular stream=t1/s1 adjust=-1MiB/s rate=100/s +start=0s end=8s class=elastic stream=t1/s1 adjust=-1MiB/s rate=100/s +---- + +# start=16s end=32s class=regular stream=t1/s1 adjust=+2MiB/s rate=100/s +# start=32s end=36s class=elastic stream=t1/s1 adjust=+2MiB/s rate=100/s + +simulate +---- + +# Observe that initially elastic tokens deplete faster than regular tokens +# (while elastic tokens > 0MiB), since regular work deducts from both but +# elastic work only deducts from the elastic bucket. Eventually the rate of +# elastic token deductions slows down since elastic requests stop being +# admitted (and thus deducting tokens) once elastic tokens <= 0 MiB. So it's +# only regular request deductions from that point on. See +# TestFlowTokenAdjustment for more details. +plot +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +---- + 16.0 ┼╮ + 14.1 ┤╰────╮ + 12.3 ┤ ╰───╮ + 10.4 ┤ ╰────╮ + 8.6 ┤ ╰───╮ + 6.7 ┼─╮ ╰────╮ + 4.8 ┤ ╰──╮ ╰───╮ + 3.0 ┤ ╰─╮ ╰────╮ + 1.1 ┤ ╰─╮ ╰───╮ + -0.7 ┤ ╰───╮ ╰── + -2.6 ┤ ╰───╮ + -4.5 ┤ ╰────╮ + -6.3 ┤ ╰───╮ + -8.2 ┤ ╰────╮ + -10.0 ┤ ╰───╮ + -11.9 ┤ ╰──── + {regular,elastic}_tokens_available (MiB) + + +# Confirm that all throughout we're able to admit regular requests at its +# incoming rate of 100/s. But for elastic requests, once we're out of elastic +# flow tokens, we stop admitting and start waiting instead. We run of elastic +# tokens faster since there are fewer of them (8MiB instead of 16MiB), and also +# they're deducted by both regular and elastic work, compared to regular tokens +# that are deducted only by regular work. +plot +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.elastic_requests_{admitted,waiting} unit=reqs/s rate=true +---- +---- + 100.0 ┤ ╭──────────────────────────────────── + 93.3 ┤ │ + 86.7 ┤ │ + 80.0 ┤ │ + 73.3 ┤ │ + 66.7 ┤ │ + 60.0 ┤ │ + 53.3 ┤ │ + 46.7 ┤ │ + 40.0 ┤ │ + 33.3 ┤ │ + 26.7 ┤ │ + 20.0 ┤ │ + 13.3 ┤ │ + 6.7 ┤ │ + 0.0 ┼─────────────────────────────────────── + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 100.0 ┤ ╭──────╮ ╭──────╮ + 93.3 ┤ │ ╰╮╭╯ │ + 86.7 ┤ │ ││ ╰╮ + 80.0 ┤ │ ││ │ + 73.3 ┤ │ ││ │ + 66.7 ┤ │ ││ │ + 60.0 ┤ │ ││ │ + 53.3 ┤ │ ╰│ │ + 46.7 ┤ │ ╭╯ │ + 40.0 ┤ │ ││ ╰╮ + 33.3 ┤ │ ││ │ + 26.7 ┤ │ ││ │ + 20.0 ┤ │ ││ │ + 13.3 ┤ │ ││ │ + 6.7 ┤ │ ╭╯╰╮ │ + 0.0 ┼─────────╯ ╰────────╰───────────────── + rate(elastic_requests_{admitted,waiting}) (reqs/s) +---- +---- + +# Confirm the above -- when both regular and elastic work gets admitted, we're +# deducting elastic tokens at 2MiB/s, and at t=4s when elastic work gets +# blocked, we start deducting at 1MiB/s. +plot +kvadmission.flow_controller.elastic_tokens_deducted unit=MiB rate=true +---- + 2.0 ┤ ╭──────╮ + 1.9 ┤ │ ╰╮ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ ╰╮ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ ╰─────────────────────────── + 0.9 ┤ │ + 0.8 ┤ │ + 0.7 ┤ │ + 0.5 ┤ │ + 0.4 ┤ │ + 0.3 ┤ │ + 0.1 ┤ │ + 0.0 ┼──╯ + rate(elastic_tokens_deducted) (MiB) + +# vim:ft=sh diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index cd5f4a80dd0a..57dc9911080f 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -10,7 +10,11 @@ package admissionpb -import "math" +import ( + "math" + + "github.com/cockroachdb/redact" +) // WorkPriority represents the priority of work. In an WorkQueue, it is only // used for ordering within a tenant. High priority work can starve lower @@ -39,6 +43,19 @@ const ( OneAboveHighPri int = int(HighPri) + 1 ) +func (w WorkPriority) String() string { + return redact.StringWithoutMarkers(w) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { + if s, ok := WorkPriorityDict[w]; ok { + p.Print(s) + return + } + p.Printf("custom-pri=%d", w) +} + // WorkPriorityDict is a mapping of the priorities to a short string name. The // name is used as the suffix on exported work queue metrics. var WorkPriorityDict = map[WorkPriority]string{ @@ -80,13 +97,18 @@ func WorkClassFromPri(pri WorkPriority) WorkClass { } func (w WorkClass) String() string { + return redact.StringWithoutMarkers(w) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (w WorkClass) SafeFormat(p redact.SafePrinter, verb rune) { switch w { case RegularWorkClass: - return "regular" + p.Printf("regular") case ElasticWorkClass: - return "elastic" + p.Print("elastic") default: - return "" + p.Print("") } } diff --git a/pkg/util/admission/testdata/store_work_queue b/pkg/util/admission/testdata/store_work_queue index ae615d87acd6..26fba8dc46e4 100644 --- a/pkg/util/admission/testdata/store_work_queue +++ b/pkg/util/admission/testdata/store_work_queue @@ -63,7 +63,7 @@ print regular workqueue: closed epoch: 0 tenantHeap len: 1 top tenant: 57 tenant-id: 53 used: 101, w: 1, fifo: -128 tenant-id: 55 used: 600, w: 1, fifo: -128 - tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 0] + tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 0] elastic workqueue: closed epoch: 0 tenantHeap len: 0 stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} diff --git a/pkg/util/admission/testdata/work_queue b/pkg/util/admission/testdata/work_queue index 3a736dcc48c3..a625fea3755c 100644 --- a/pkg/util/admission/testdata/work_queue +++ b/pkg/util/admission/testdata/work_queue @@ -26,7 +26,7 @@ tryGet: returning false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] admit id=3 tenant=53 priority=0 create-time-millis=2 bypass=false ---- @@ -36,7 +36,7 @@ admit id=3 tenant=53 priority=0 create-time-millis=2 bypass=false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] # Request from tenant 71. admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false @@ -51,8 +51,8 @@ admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 5, epoch: 0, qt: 100] [1: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] [1: pri: low-pri, ct: 4, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -65,8 +65,8 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # Cancel a request from tenant 53. cancel-work id=3 @@ -76,8 +76,8 @@ id 3: admit failed print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # The work admitted for tenant 53 is done. work-done id=1 @@ -88,8 +88,8 @@ returnGrant 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # A request from the system tenant bypasses admission control, but is # reflected in the WorkQueue state. @@ -102,8 +102,8 @@ print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 tenant-id: 1 used: 1, w: 1, fifo: -128 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] granted chain-id=7 ---- @@ -151,12 +151,12 @@ tryGet: returning false advance-time millis=205 ---- closed epoch: 2 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] print ---- closed epoch: 2 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -189,7 +189,7 @@ admit id=4 tenant=53 priority=0 create-time-millis=400 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 399, epoch: 3, qt: 405, lifo-ordering] [1: pri: 0, ct: 50, epoch: 0, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] + tenant-id: 53 used: 1, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 399, epoch: 3, qt: 405, lifo-ordering] [1: pri: normal-pri, ct: 50, epoch: 0, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] # Latest request in closed epoch is granted. granted chain-id=6 @@ -209,7 +209,7 @@ granted: returned 1 print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] # Add request to closed epoch. admit id=5 tenant=53 priority=0 create-time-millis=300 bypass=false @@ -224,7 +224,7 @@ admit id=6 tenant=53 priority=0 create-time-millis=500 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Add high priority request in open epoch 5. admit id=7 tenant=53 priority=127 create-time-millis=550 bypass=false @@ -235,13 +235,13 @@ admit id=7 tenant=53 priority=127 create-time-millis=550 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: high-pri, ct: 550, epoch: 5, qt: 405] [1: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Make the request wait for 60ms so we don't switch back to fifo. advance-time millis=60 ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: high-pri, ct: 550, epoch: 5, qt: 405] [1: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=8 ---- @@ -262,13 +262,13 @@ admit id=8 tenant=53 priority=0 create-time-millis=350 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # One request moved from open to closed epoch heap. advance-time millis=40 ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405, lifo-ordering] [1: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405, lifo-ordering] [1: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=10 ---- @@ -279,7 +279,7 @@ granted: returned 1 print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 6, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 6, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=11 ---- @@ -290,7 +290,7 @@ granted: returned 1 print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 7, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 7, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Can dequeue from the open epochs heap if nothing else is remaining. granted chain-id=12 @@ -312,13 +312,13 @@ tryGet: returning false print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This time advance means the previous request will see significant queueing. advance-time millis=100 ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This request in an already closed epoch gets ahead because of higher # create-time-millis. @@ -328,7 +328,7 @@ admit id=10 tenant=53 priority=0 create-time-millis=390 bypass=false print ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 390, epoch: 3, qt: 605, lifo-ordering] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 390, epoch: 3, qt: 605, lifo-ordering] [1: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] granted chain-id=12 ---- @@ -339,13 +339,13 @@ granted: returned 1 print ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 9, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This advance will switch all priorities back to FIFO. advance-time millis=100 ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] admit id=11 tenant=53 priority=0 create-time-millis=610 bypass=false ---- @@ -359,7 +359,7 @@ admit id=12 tenant=53 priority=-128 create-time-millis=615 bypass=false print ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 610, epoch: 6, qt: 705] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [2: pri: -128, ct: 615, epoch: 6, qt: 705] + tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 610, epoch: 6, qt: 705] [1: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] [2: pri: low-pri, ct: 615, epoch: 6, qt: 705] granted chain-id=13 ---- @@ -372,7 +372,7 @@ granted: returned 1 print ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 10, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [1: pri: -128, ct: 615, epoch: 6, qt: 705] + tenant-id: 53 used: 10, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] [1: pri: low-pri, ct: 615, epoch: 6, qt: 705] granted chain-id=14 ---- @@ -401,7 +401,7 @@ tryGet: returning false print ---- closed epoch: 7 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 12, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 810, epoch: 8, qt: 805] + tenant-id: 53 used: 12, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 810, epoch: 8, qt: 805] # Cancel that request. cancel-work id=13 @@ -446,19 +446,19 @@ tryGet: returning false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Weight is unchanged for tenant 5. set-tenant-weights weights=10:11 ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Now tenant 5 has a new weight. set-tenant-weights weights=5:6,10:11 ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] admit id=2 tenant=10 priority=0 create-time-millis=1 bypass=false ---- @@ -466,8 +466,8 @@ admit id=2 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 0, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 0, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=1 ---- @@ -500,8 +500,8 @@ admit id=4 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 1, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 1, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Grant to tenant 10 so it is using 2 slots. granted chain-id=3 @@ -513,7 +513,7 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] tenant-id: 10 used: 2, w: 11, fifo: -128 # Another request from tenant 10. @@ -524,8 +524,8 @@ admit id=5 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Increase tenant 10's weight so it becomes the top of the heap. Weight # scaling is also applied here to make the max weight 20, and reduce tenant @@ -533,22 +533,22 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 5 set-tenant-weights weights=5:6,10:30 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 4, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 20, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 4, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 20, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Restore all weights to 1. Tenant 5 is now top of the heap. set-tenant-weights weights=5:1,10:1 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Adjust weights to make tenant 10 just slightly preferable over tenant 5. set-tenant-weights weights=5:6,10:13 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 13, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 13, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Add another request for tenant 10. admit id=6 tenant=10 priority=0 create-time-millis=1 bypass=false @@ -565,16 +565,16 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 3, w: 13, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 3, w: 13, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Bump up tenant 10's weight to a huge value. Tenant 5's weight is not scaled # down to 0, since the minimum weight is 1. set-tenant-weights weights=5:1,10:100000 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 3, w: 20, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 3, w: 20, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -623,27 +623,27 @@ admit id=8 tenant=8 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 8 top tenant: 1 - tenant-id: 1 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 2 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 3 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 4 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 7 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 8 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 1 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 2 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 3 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 4 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 7 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 8 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Set weights for 7 out of the 8 tenants. set-tenant-weights weights=1:2,2:3,3:4,4:5,5:6,7:8,8:9 ---- closed epoch: 0 tenantHeap len: 8 top tenant: 1 - tenant-id: 1 used: 0, w: 2, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 2 used: 0, w: 3, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 3 used: 0, w: 4, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 4 used: 0, w: 5, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 7 used: 0, w: 8, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 8 used: 0, w: 9, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 1 used: 0, w: 2, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 2 used: 0, w: 3, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 3 used: 0, w: 4, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 4 used: 0, w: 5, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 7 used: 0, w: 8, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 8 used: 0, w: 9, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=1 ---- diff --git a/pkg/util/asciitsdb/BUILD.bazel b/pkg/util/asciitsdb/BUILD.bazel new file mode 100644 index 000000000000..8a81a4c655cc --- /dev/null +++ b/pkg/util/asciitsdb/BUILD.bazel @@ -0,0 +1,18 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "asciitsdb", + srcs = ["asciitsdb.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/asciitsdb", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/syncutil", + "@com_github_guptarohit_asciigraph//:asciigraph", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/asciitsdb/asciitsdb.go b/pkg/util/asciitsdb/asciitsdb.go new file mode 100644 index 000000000000..a5d0918f3975 --- /dev/null +++ b/pkg/util/asciitsdb/asciitsdb.go @@ -0,0 +1,311 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package asciitsdb + +import ( + "context" + "fmt" + "math" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/guptarohit/asciigraph" + "github.com/stretchr/testify/require" +) + +// TSDB is used to plot ASCII timeseries for tests that want to observe how CRDB +// metrics behave. +type TSDB struct { + t *testing.T + reg *metric.Registry + + mu struct { + syncutil.Mutex + scraped bool + points map[string][]float64 // registered metric name => scraped data points + } +} + +// New returns a new ASCII TSDB. +func New(t *testing.T, reg *metric.Registry) *TSDB { + tsdb := &TSDB{t: t, reg: reg} + tsdb.mu.points = make(map[string][]float64) + return tsdb +} + +// Register registers a metric struct for plotting. For histograms, we follow +// CRDB's internal TSDB naming conventions, i.e. we export values for: +// - {metric name}-count +// - {metric name}-avg +// - {metric name}-max +// - {metric name}-p99 +// - {metric name}-p90 +// - {metric name}-p75 +// - {metric name}-p50 +func (t *TSDB) Register(mstruct interface{}) { + // NB: This code was cargo culted from the metric registry's + // AddStruct method. + + v := reflect.ValueOf(mstruct) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + typ := v.Type() + + for i := 0; i < v.NumField(); i++ { + vfield, tfield := v.Field(i), typ.Field(i) + tname := tfield.Name + if !vfield.CanInterface() { + t.t.Logf("skipping unexported field %s", tname) + continue + } + switch vfield.Kind() { + case reflect.Array: + for i := 0; i < vfield.Len(); i++ { + velem := vfield.Index(i) + telemName := fmt.Sprintf("%s[%d]", tname, i) + // Permit elements in the array to be nil. + const skipNil = true + t.registerMetricValue(velem, telemName, skipNil) + } + default: + // No metric fields should be nil. + const skipNil = false + t.registerMetricValue(vfield, tname, skipNil) + } + } +} + +// Scrape all registered metrics. It records a single data point for all +// registered metrics. +func (t *TSDB) Scrape(ctx context.Context) { + // TB: This code is cargo culted entirely from the TSDB scraper. + + t.mu.Lock() + defer t.mu.Unlock() + t.mu.scraped = true + + t.reg.Each(func(name string, val interface{}) { + if _, ok := t.mu.points[name]; !ok { + return + } + + switch mtr := val.(type) { + case metric.WindowedHistogram: + n := float64(mtr.TotalCountWindowed()) + if _, ok := t.mu.points[name]; !ok { + return + } + t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], n) + avg := mtr.TotalSumWindowed() / n + if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) { + avg = 0 + } + t.mu.points[name+"-avg"] = append(t.mu.points[name+"-avg"], avg) + for _, pt := range []quantile{ + {"-max", 100}, + {"-p99", 99}, + {"-p90", 90}, + {"-p75", 75}, + {"-p50", 50}, + } { + t.mu.points[name+pt.suffix] = append(t.mu.points[name+pt.suffix], mtr.ValueAtQuantileWindowed(pt.quantile)) + } + case metric.PrometheusExportable: + // NB: this branch is intentionally at the bottom since all metrics + // implement it. + m := mtr.ToPrometheusMetric() + if m.Gauge != nil { + t.mu.points[name] = append(t.mu.points[name], *m.Gauge.Value) + } else if m.Counter != nil { + t.mu.points[name] = append(t.mu.points[name], *m.Counter.Value) + } + default: + log.Fatalf(ctx, "cannot extract value for type %T", mtr) + } + }) +} + +// Plot plots the given metrics, using the given options. +func (t *TSDB) Plot(metrics []string, options ...Option) string { + c := configure(config{ + limit: math.MaxInt, + }, options) + + var plots [][]float64 + for _, metric := range metrics { + points, ok := t.read(metric) + require.Truef(t.t, ok, "%s not found", metric) + if c.rate != 0 { + points = rate(points, c.rate) + } + if c.divisor != 0 { + points = divide(points, c.divisor) + } + c.limit = min(c.limit, int64(len(points))-c.offset) + points = points[c.offset : c.offset+c.limit] + plots = append(plots, points) + } + return asciigraph.PlotMany(plots, c.graphopts...) +} + +// Clear all recorded data points. +func (t *TSDB) Clear() { + t.mu.Lock() + defer t.mu.Unlock() + for metric := range t.mu.points { + t.mu.points[metric] = nil + } +} + +// read reads all data points for the given metric name. +func (t *TSDB) read(metric string) ([]float64, bool) { + t.mu.Lock() + defer t.mu.Unlock() + + points, ok := t.mu.points[metric] + return points, ok +} + +func (t *TSDB) registerMetricValue(val reflect.Value, name string, skipNil bool) { + if val.Kind() == reflect.Ptr && val.IsNil() { + if skipNil { + t.t.Logf("skipping nil metric field %s", name) + } else { + t.t.Fatalf("found nil metric field %s", name) + } + return + } + switch typ := val.Interface().(type) { + case metric.Iterable: + t.registerIterable(typ) + case metric.Struct: + t.Register(typ) + default: + t.t.Logf("skipping non-metric field %s", name) + } +} + +func (t *TSDB) registerIterable(metric metric.Iterable) { + t.mu.Lock() + defer t.mu.Unlock() + if t.mu.scraped { + t.t.Fatalf("register all metrics upfront before Scrape()") + } + t.mu.points[metric.GetName()] = []float64{} +} + +// Option represents a configuration setting. +type Option interface { + apply(c *config) +} + +// WithRate plots the rate of growth for a given metric. The parameter is used +// to control how far back (with respect to individual points) compute the delta +// over. +func WithRate(r int) Option { + return optionFunc(func(c *config) { + c.rate = r + }) +} + +// WithDivisor divides all individual points of a metric by the given divisor. +func WithDivisor(d float64) Option { + return optionFunc(func(c *config) { + c.divisor = d + }) +} + +// WithGraphOptions configures the look and feel of the generated ASCII graphs. +func WithGraphOptions(opts ...asciigraph.Option) Option { + return optionFunc(func(c *config) { + c.graphopts = opts + }) +} + +// WithOffset is used to offset a specified number of points in the graph. +func WithOffset(o int64) Option { + return optionFunc(func(c *config) { + c.offset = o + }) +} + +// WithLimit is used to limit the number of points in the graph. +func WithLimit(l int64) Option { + return optionFunc(func(c *config) { + c.limit = l + }) +} + +// config holds the various graphing options. +type config struct { + rate int + divisor float64 + offset, limit int64 + graphopts []asciigraph.Option +} + +// An optionFunc applies an option. +type optionFunc func(*config) + +// apply implements the Option interface. +func (of optionFunc) apply(c *config) { of(c) } + +func configure(defaults config, options []Option) *config { + for _, o := range options { + o.apply(&defaults) + } + return &defaults +} + +// rate takes a list of individual data points (typically read from a cumulative +// counter) and returns a derivative, computed simply by taking each +// corresponding input point and subtracting one r points ago. If r points are +// collected per-{time unit}, this ends up computing the rate of growth +// per-{time unit}. +func rate(input []float64, r int) []float64 { + rated := make([]float64, len(input)) + for i := 0; i < len(input); i++ { + if i < r { + rated[i] = 0 + continue + } + + delta := input[i] - input[i-r] + rated[i] = delta + } + return rated +} + +// divide takes a list of individual data points and returns another, where each +// point is the corresponding input divided by the divisor. +func divide(input []float64, divisor float64) []float64 { + output := make([]float64, len(input)) + for i := 0; i < len(input); i++ { + output[i] = input[i] / divisor + } + return output +} + +type quantile struct { + suffix string + quantile float64 +} + +func min(i, j int64) int64 { + if i < j { + return i + } + return j +}