diff --git a/DEPS.bzl b/DEPS.bzl index 5763daa75831..f07b5ba1e02d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -6023,6 +6023,16 @@ def go_deps(): "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mjibson/esc/com_github_mjibson_esc-v0.2.0.zip", ], ) + go_repository( + name = "com_github_mkungla_bexp_v3", + build_file_proto_mode = "disable_global", + importpath = "github.com/mkungla/bexp/v3", + sha256 = "903a932e83da8be3426e29c1484d79f814a825b2c2743c36d43b054910a9f886", + strip_prefix = "github.com/mkungla/bexp/v3@v3.0.1", + urls = [ + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mkungla/bexp/v3/com_github_mkungla_bexp_v3-v3.0.1.zip", + ], + ) go_repository( name = "com_github_mmatczuk_go_generics", build_file_proto_mode = "disable_global", diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index d8ac51caeb57..d608b2af1401 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -627,6 +627,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mitchellh/osext/com_github_mitchellh_osext-v0.0.0-20151018003038-5e2d6d41470f.zip": "d8e6e5f6bd749cfa0c1c17c40f5dc0fd19e4a0a83245f46bde23bea4e65d1a20", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mitchellh/reflectwalk/com_github_mitchellh_reflectwalk-v1.0.0.zip": "318ab84e22d4554a7540c7ebc9b4fb607e2608578c3a5bb72434203988048145", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mjibson/esc/com_github_mjibson_esc-v0.2.0.zip": "9f090786bd43dddb5c0d798b449d5e8aede4cb7d106f56dcac0aebd8fd1929cc", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mkungla/bexp/v3/com_github_mkungla_bexp_v3-v3.0.1.zip": "903a932e83da8be3426e29c1484d79f814a825b2c2743c36d43b054910a9f886", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mmatczuk/go_generics/com_github_mmatczuk_go_generics-v0.0.0-20181212143635-0aaa050f9bab.zip": "18c1e95c93f1f82be0184bc13bf49eb4350c7a4ff524b1bf440b3eb9ff14acc9", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mmcloughlin/geohash/com_github_mmcloughlin_geohash-v0.9.0.zip": "7162856858d9bb3c411d4b42ad19dfff579341ddf0580122e3f1ac3be05c7441", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/moby/locker/com_github_moby_locker-v1.0.1.zip": "f07361346d12a24e168db7fb2f21281883bee6060f1aedf7507bccf20c4a793f", diff --git a/go.mod b/go.mod index 55a7d0d49559..a0dbcdb2745c 100644 --- a/go.mod +++ b/go.mod @@ -176,6 +176,7 @@ require ( github.com/mattn/goveralls v0.0.2 github.com/mibk/dupl v1.0.0 github.com/mitchellh/reflectwalk v1.0.0 + github.com/mkungla/bexp/v3 v3.0.1 github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab github.com/montanaflynn/stats v0.6.3 github.com/mozillazg/go-slugify v0.2.0 diff --git a/go.sum b/go.sum index 1f5f69b057c6..b39406359cca 100644 --- a/go.sum +++ b/go.sum @@ -1673,6 +1673,8 @@ github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQ github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mjibson/esc v0.2.0/go.mod h1:9Hw9gxxfHulMF5OJKCyhYD7PzlSdhzXyaGEBRPH1OPs= +github.com/mkungla/bexp/v3 v3.0.1 h1:UqcWAaxWn+rmJ+3ZgwokMrUerGMUeOFihUPrTLPFZ9Q= +github.com/mkungla/bexp/v3 v3.0.1/go.mod h1:zkzndAaEcYdZcu+cB4WkNYQuG7pwjzMOZLn3vM8KD+8= github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab h1:QjLgi/L+MjxysinrA8KkNZLf3cAhTluBoSXUvFeN144= github.com/mmatczuk/go_generics v0.0.0-20181212143635-0aaa050f9bab/go.mod h1:Fs8p4al9GNa3b42YfZOoUVMdjQ2WlNEYlnoboJKPpJ8= github.com/mmcloughlin/geohash v0.9.0 h1:FihR004p/aE1Sju6gcVq5OLDqGcMnpBY+8moBqIsVOs= diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 861d35784cd7..b2a6437a20f3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -204,6 +204,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/gc:gc_test", "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", "//pkg/kv/kvserver/liveness:liveness_test", "//pkg/kv/kvserver/logstore:logstore_test", @@ -1216,6 +1217,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvadmission:kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol", "//pkg/kv/kvserver/kvserverbase:kvserverbase", @@ -2018,6 +2021,7 @@ GO_TARGETS = [ "//pkg/util/admission:admission", "//pkg/util/admission:admission_test", "//pkg/util/arith:arith", + "//pkg/util/asciitsdb:asciitsdb", "//pkg/util/binfetcher:binfetcher", "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray", @@ -2615,6 +2619,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/intentresolver:get_x_data", "//pkg/kv/kvserver/kvadmission:get_x_data", "//pkg/kv/kvserver/kvflowcontrol:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", @@ -3119,6 +3124,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/admission:get_x_data", "//pkg/util/admission/admissionpb:get_x_data", "//pkg/util/arith:get_x_data", + "//pkg/util/asciitsdb:get_x_data", "//pkg/util/binfetcher:get_x_data", "//pkg/util/bitarray:get_x_data", "//pkg/util/bufalloc:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel index 04ef54d4893d..4817b09ce5a9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -13,6 +13,8 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/roachpb", "//pkg/util/admission/admissionpb", + "@com_github_cockroachdb_redact//:redact", + "@com_github_dustin_go_humanize//:go-humanize", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index f4b1a0ed64a3..285cb16cce1e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -353,3 +353,5 @@ package kvflowcontrol // the receiver to send messages to return tokens up to some point, and // (b) the receiver has either not received the message for which we've // deducted tokens, or forgotten about it. +// - Ensure that flow tokens aren't leaked, by checking that after the tests +// quiesce, flow tokens are back to their original limits. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 68f2cfa7b3a0..635b946f6b14 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -19,6 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/redact" + "github.com/dustin/go-humanize" ) // Stream models the stream over which we replicate data traffic, the @@ -34,7 +36,10 @@ type Stream struct { // Tokens represent the finite capacity of a given stream, expressed in bytes // for data we're looking to replicate. Use of replication streams are // predicated on tokens being available. -type Tokens uint64 +// +// NB: We use a signed integer to accommodate data structures that deal with +// token deltas, or buckets that are allowed to go into debt. +type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. @@ -136,3 +141,30 @@ type DispatchReader interface { PendingDispatch() []roachpb.NodeID PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries } + +func (t Tokens) String() string { + return redact.StringWithoutMarkers(t) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (t Tokens) SafeFormat(p redact.SafePrinter, verb rune) { + sign := "+" + if t < 0 { + sign = "-" + t = -t + } + p.Printf("%s%s", sign, humanize.IBytes(uint64(t))) +} + +func (s Stream) String() string { + return redact.StringWithoutMarkers(s) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (s Stream) SafeFormat(p redact.SafePrinter, verb rune) { + tenantSt := s.TenantID.String() + if s.TenantID.IsSystem() { + tenantSt = "1" + } + p.Printf("t%s/s%s", tenantSt, s.StoreID.String()) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel new file mode 100644 index 000000000000..eaf9f5c6311b --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel @@ -0,0 +1,55 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvflowcontroller", + srcs = [ + "kvflowcontroller.go", + "kvflowcontroller_metrics.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "kvflowcontroller_test", + srcs = [ + "kvflowcontrol_token_adjustment_test.go", + "kvflowcontroller_simulation_test.go", + ], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + embed = [":kvflowcontroller"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/testutils/datapathutils", + "//pkg/util/admission/admissionpb", + "//pkg/util/asciitsdb", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/timeutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_dustin_go_humanize//:go-humanize", + "@com_github_guptarohit_asciigraph//:asciigraph", + "@com_github_mkungla_bexp_v3//:bexp", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go new file mode 100644 index 000000000000..9bd06ef6f547 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go @@ -0,0 +1,159 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/datadriven" + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/require" +) + +func TestFlowTokenAdjustment(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var ( + ctx = context.Background() + controller *Controller + adjustments []adjustment + stream = kvflowcontrol.Stream{ + TenantID: roachpb.SystemTenantID, + StoreID: roachpb.StoreID(1), + } + ) + + datadriven.RunTest(t, "testdata/flow_token_adjustment", + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + controller = New( + metric.NewRegistry(), + cluster.MakeTestingClusterSettings(), + hlc.NewClockWithSystemTimeSource(time.Nanosecond), + ) + adjustments = nil + return "" + + case "adjust": + require.NotNilf(t, controller, "uninitialized flow controller (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.Len(t, parts, 2, "expected form 'class={regular,elastic} delta={+,-}") + + var delta kvflowcontrol.Tokens + var pri admissionpb.WorkPriority + + // Parse class={regular,elastic}. + parts[0] = strings.TrimSpace(parts[0]) + require.True(t, strings.HasPrefix(parts[0], "class=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "class=") + switch parts[0] { + case "regular": + pri = admissionpb.NormalPri + case "elastic": + pri = admissionpb.BulkNormalPri + } + + // Parse delta={+,-} + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "delta=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "delta=") + require.True(t, strings.HasPrefix(parts[1], "+") || strings.HasPrefix(parts[1], "-")) + isPositive := strings.Contains(parts[1], "+") + parts[1] = strings.TrimPrefix(parts[1], "+") + parts[1] = strings.TrimPrefix(parts[1], "-") + bytes, err := humanize.ParseBytes(parts[1]) + require.NoError(t, err) + delta = kvflowcontrol.Tokens(int64(bytes)) + if !isPositive { + delta = -delta + } + + controller.adjustTokens(ctx, pri, delta, stream) + adjustments = append(adjustments, adjustment{ + pri: pri, + delta: delta, + post: controller.testingGetTokensForStream(stream), + }) + } + return "" + + case "history": + limit := controller.testingGetLimit() + + var buf strings.Builder + buf.WriteString(" regular | elastic\n") + buf.WriteString(fmt.Sprintf(" %8s | %8s\n", + printTrimmedTokens(limit[admissionpb.RegularWorkClass]), + printTrimmedTokens(limit[admissionpb.ElasticWorkClass]), + )) + buf.WriteString("======================================\n") + for _, h := range adjustments { + buf.WriteString(fmt.Sprintf("%s\n", h)) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} + +type adjustment struct { + pri admissionpb.WorkPriority + delta kvflowcontrol.Tokens + post tokensPerWorkClass +} + +func printTrimmedTokens(t kvflowcontrol.Tokens) string { + return strings.ReplaceAll(t.String(), " ", "") +} + +func (h adjustment) String() string { + class := admissionpb.WorkClassFromPri(h.pri) + + comment := "" + if h.post[admissionpb.RegularWorkClass] <= 0 { + comment = "regular" + } + if h.post[admissionpb.ElasticWorkClass] <= 0 { + if len(comment) == 0 { + comment = "elastic" + } else { + comment = "regular and elastic" + } + } + if len(comment) != 0 { + comment = fmt.Sprintf(" (%s blocked)", comment) + } + return fmt.Sprintf("%8s %7s %8s | %8s%s", + printTrimmedTokens(h.delta), + class, + printTrimmedTokens(h.post[admissionpb.RegularWorkClass]), + printTrimmedTokens(h.post[admissionpb.ElasticWorkClass]), + comment, + ) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go new file mode 100644 index 000000000000..7d86c3c07445 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -0,0 +1,414 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +var regularTokensPerStream = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kvadmission.flow_controller.regular_tokens_per_stream", + "flow tokens available for regular work on a per-stream basis", + 16<<20, // 16 MiB + validateTokenRange, +) + +var elasticTokensPerStream = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kvadmission.flow_controller.elastic_tokens_per_stream", + "flow tokens available for elastic work on a per-stream basis", + 8<<20, // 8 MiB + validateTokenRange, +) + +// Aliases to make the code below slightly easier to read. +const regular, elastic = admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass + +// Controller is a concrete implementation of the kvflowcontrol.Controller +// interface. It provides flow control for replication traffic in KV and is +// typically held at the node-level. +type Controller struct { + mu struct { + syncutil.Mutex + + // Token limit per work class, tracking + // kvadmission.flow_controller.{regular,elastic}_tokens_per_stream. + limit tokensPerWorkClass + + // We maintain flow token buckets for {regular,elastic} work along each + // stream. This is lazily instantiated. + buckets map[kvflowcontrol.Stream]bucket + } + metrics *metrics + clock *hlc.Clock +} + +var _ kvflowcontrol.Controller = &Controller{} + +// New constructs a new Controller. +func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock) *Controller { + c := &Controller{ + clock: clock, + } + + regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)) + elasticTokens := kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)) + c.mu.limit = map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + regular: regularTokens, + elastic: elasticTokens, + } + c.mu.buckets = make(map[kvflowcontrol.Stream]bucket) + regularTokensPerStream.SetOnChange(&settings.SV, func(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + before := tokensPerWorkClass{ + regular: c.mu.limit[regular], + elastic: c.mu.limit[elastic], + } + now := tokensPerWorkClass{ + regular: kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)), + elastic: kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)), + } + adjustment := tokensPerWorkClass{ + regular: now[regular] - before[regular], + elastic: now[elastic] - before[elastic], + } + c.mu.limit = now + for _, b := range c.mu.buckets { + b.tokens[regular] += adjustment[regular] + b.tokens[elastic] += adjustment[elastic] + c.metrics.onTokenAdjustment(adjustment) + if adjustment[regular] > 0 || adjustment[elastic] > 0 { + b.signal() // signal a waiter, if any + } + } + }) + c.metrics = newMetrics(c) + registry.AddMetricStruct(c.metrics) + return c +} + +// Admit is part of the kvflowcontrol.Controller interface. It blocks until +// there are flow tokens available for replication over the given stream for +// work of the given priority. +func (c *Controller) Admit( + ctx context.Context, pri admissionpb.WorkPriority, _ time.Time, stream kvflowcontrol.Stream, +) error { + class := admissionpb.WorkClassFromPri(pri) + c.metrics.onWaiting(class) + + logged := false + tstart := c.clock.PhysicalTime() + for { + c.mu.Lock() + b := c.getBucketLocked(stream) + tokens := b.tokens[class] + c.mu.Unlock() + + if tokens > 0 { + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "flow tokens available (pri=%s stream=%s tokens=%s wait-duration=%s)", + pri, stream, tokens, c.clock.PhysicalTime().Sub(tstart)) + } + + // TODO(irfansharif): Right now we continue forwarding admission + // grants to request while the available tokens > 0, which can lead + // to over-admission since deduction is deferred (see + // testdata/simulation/over_admission). One mitigation could be + // terminating grant forwarding if the 'tentatively deducted tokens' + // exceeds some amount (say, 16 MiB). When tokens are actually + // deducted, we'll reduce from this 'tentatively deducted' count. + // We can re-signal() on every actual token deduction where + // available tokens is still > 0. + + b.signal() // signal a waiter, if any + c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) + return nil + } + + if !logged && log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "waiting for flow tokens (pri=%s stream=%s tokens=%s)", + pri, stream, tokens) + logged = true + } + + select { + case <-b.wait(): // wait for a signal + case <-ctx.Done(): + if ctx.Err() != nil { + c.metrics.onErrored(class) + } + return ctx.Err() + } + } + + // TODO(irfansharif): Use the create time for ordering among waiting + // requests. Integrate it with epoch-LIFO. +} + +// DeductTokens is part of the kvflowcontrol.Controller interface. +func (c *Controller) DeductTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + tokens kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) bool { + if tokens < 0 { + log.Fatalf(ctx, "malformed argument: -ve tokens deducted (pri=%s tokens=%s stream=%s)", + pri, tokens, stream) + } + c.adjustTokens(ctx, pri, -tokens, stream) + return true +} + +// ReturnTokens is part of the kvflowcontrol.Controller interface. +func (c *Controller) ReturnTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + tokens kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) { + if tokens < 0 { + log.Fatalf(ctx, "malformed argument: -ve tokens deducted (pri=%s tokens=%s stream=%s)", + pri, tokens, stream) + } + c.adjustTokens(ctx, pri, tokens, stream) +} + +func (c *Controller) adjustTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + delta kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) { + class := admissionpb.WorkClassFromPri(pri) + + c.mu.Lock() + defer c.mu.Unlock() + + b := c.getBucketLocked(stream) + adjustment, unaccounted := b.adjust(ctx, class, delta, c.mu.limit) + c.metrics.onTokenAdjustment(adjustment) + c.metrics.onUnaccounted(unaccounted) + if adjustment[regular] > 0 || adjustment[elastic] > 0 { + b.signal() // signal a waiter, if any + } + + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "adjusted flow tokens (pri=%s stream=%s delta=%s): regular=%s elastic=%s", + pri, stream, delta, b.tokens[regular], b.tokens[elastic]) + } +} + +func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) bucket { + b, ok := c.mu.buckets[stream] + if !ok { + b = newBucket(c.mu.limit) + c.mu.buckets[stream] = b + } + return b +} + +// bucket holds flow tokens for {regular,elastic} traffic over a +// kvflowcontrol.Stream. It's used to synchronize handoff between threads +// returning and waiting for flow tokens. +// - Tokens are protected by Controller.mu; +// - Waiting requests do so by waiting on channel signalCh, without +// holding mutexes. Requests first check for available tokens, waiting if +// unavailable. +// - Whenever tokens are returned, signalCh is signaled, waking up a single +// waiter. +// - If the request finds no available tokens, it starts waiting again. +// - Whenever a request gets admitted, it signals the next waiter if any. +type bucket struct { + tokens tokensPerWorkClass + signalCh chan struct{} +} + +func newBucket(t tokensPerWorkClass) bucket { + return bucket{ + tokens: map[admissionpb.WorkClass]kvflowcontrol.Tokens{ + regular: t[regular], + elastic: t[elastic], + }, + signalCh: make(chan struct{}, 1), + } +} + +func (b *bucket) signal() { + select { + case b.signalCh <- struct{}{}: + default: + } +} + +func (b *bucket) wait() chan struct{} { + return b.signalCh +} + +func (b *bucket) adjust( + ctx context.Context, + class admissionpb.WorkClass, + delta kvflowcontrol.Tokens, + limit tokensPerWorkClass, +) (adjustment, unaccounted tokensPerWorkClass) { + unaccounted = tokensPerWorkClass{ + regular: 0, + elastic: 0, + } + + before := tokensPerWorkClass{ + regular: b.tokens[regular], + elastic: b.tokens[elastic], + } + + switch class { + case elastic: + // Elastic {deductions,returns} only affect elastic flow tokens. + b.tokens[class] += delta + if delta > 0 && b.tokens[class] > limit[class] { + unaccounted[class] = b.tokens[class] - limit[class] + b.tokens[class] = limit[class] // enforce ceiling + } + case regular: + b.tokens[class] += delta + if delta > 0 && b.tokens[class] > limit[class] { + unaccounted[class] = b.tokens[class] - limit[class] + b.tokens[class] = limit[class] // enforce ceiling + } + + b.tokens[elastic] += delta + if delta > 0 && b.tokens[elastic] > limit[elastic] { + unaccounted[elastic] = b.tokens[elastic] - limit[elastic] + b.tokens[elastic] = limit[elastic] // enforce ceiling + } + } + + if buildutil.CrdbTestBuild && (unaccounted[regular] != 0 || unaccounted[elastic] != 0) { + log.Fatalf(ctx, "unaccounted[regular]=%s unaccounted[elastic]=%s for class=%s delta=%s limit[regular]=%s limit[elastic]=%s", + unaccounted[regular], unaccounted[elastic], class, delta, limit[regular], limit[elastic]) + } + + adjustment = tokensPerWorkClass{ + regular: b.tokens[regular] - before[regular], + elastic: b.tokens[elastic] - before[elastic], + } + return adjustment, unaccounted +} + +type tokensPerWorkClass map[admissionpb.WorkClass]kvflowcontrol.Tokens + +const ( + minTokensPerStream kvflowcontrol.Tokens = 1 << 20 // 1 MiB + maxTokensPerStream kvflowcontrol.Tokens = 64 << 20 // 64 MiB +) + +func validateTokenRange(b int64) error { + t := kvflowcontrol.Tokens(b) + if t < minTokensPerStream { + return fmt.Errorf("minimum flowed tokens allowed is %s, got %s", minTokensPerStream, t) + } + if t > maxTokensPerStream { + return fmt.Errorf("maximum flow tokens allowed is %s, got %s", maxTokensPerStream, t) + } + return nil +} + +func (c *Controller) testingGetTokensForStream(stream kvflowcontrol.Stream) tokensPerWorkClass { + c.mu.Lock() + defer c.mu.Unlock() + ret := make(map[admissionpb.WorkClass]kvflowcontrol.Tokens) + for wc, c := range c.getBucketLocked(stream).tokens { + ret[wc] = c + } + return ret +} + +func (c *Controller) testingGetLimit() tokensPerWorkClass { + c.mu.Lock() + defer c.mu.Unlock() + return c.mu.limit +} + +// testingNonBlockingAdmit is a non-blocking alternative to Admit() for use in +// tests. +// - it checks if we have a non-zero number of flow tokens +// - if we do, we return immediately with admitted=true +// - if we don't, we return admitted=false and two callbacks: +// - signaled, which can be polled to check whether we're ready to try and +// admitting again; +// - admit, which can be used to try and admit again. If still not admitted, +// callers are to wait until they're signaled again. +func (c *Controller) testingNonBlockingAdmit( + pri admissionpb.WorkPriority, stream kvflowcontrol.Stream, +) (admitted bool, signaled func() bool, admit func() bool) { + class := admissionpb.WorkClassFromPri(pri) + c.metrics.onWaiting(class) + + tstart := c.clock.PhysicalTime() + admit = func() bool { + c.mu.Lock() + b := c.getBucketLocked(stream) + tokens := b.tokens[class] + c.mu.Unlock() + + if tokens <= 0 { + return false + } + + b.signal() // signal a waiter, if any + c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) + return true + } + + if admit() { + return true, nil, nil + } + + b := c.testingGetBucket(stream) + return false, b.testingSignaled, admit +} + +func (c *Controller) testingGetBucket(stream kvflowcontrol.Stream) bucket { + c.mu.Lock() + defer c.mu.Unlock() + return c.getBucketLocked(stream) +} + +func (b *bucket) testingSignaled() bool { + select { + case <-b.wait(): // check if signaled + return true + default: + return false + } +} + +func min(i, j kvflowcontrol.Tokens) kvflowcontrol.Tokens { + if i < j { + return i + } + return j +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go new file mode 100644 index 000000000000..696c8a3adb55 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -0,0 +1,235 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" +) + +var ( + // Metric templates for the flow token controller. We make use of aggmetrics + // to get per-tenant metrics and an aggregation across all tenants. Within a + // tenant, we maintain flow tokens for each admissionpb.WorkClass with + // appropriately segmented metrics. We don't export metrics at a per + // kvflowcontrol.Stream-level; streams are identified (partly) by the store + // ID receiving replication traffic, which is a high cardinality measure + // (storeIDs could ratchet up arbitrarily). The similar argument technically + // applies for per-tenant metrics, but there we'd rather eat the cost. + // + // TODO(irfansharif): Actually use aggmetrics. + // TODO(irfansharif): Consider aggregated metrics per remote store, + // aggregated across all tenants. + // TODO(irfansharif): To improve upon the tenant*stores cardinality, + // consider "inspectz" style pages to give token counts and queue lengths of + // individual buckets (#66772). + + flowTokensAvailable = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_available", + Help: "Flow tokens available for %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensDeducted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_deducted", + Help: "Flow tokens deducted by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensReturned = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_returned", + Help: "Flow tokens returned by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + flowTokensUnaccounted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_tokens_unaccounted", + Help: "Flow tokens returned by %s requests that were unaccounted for, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + + requestsWaiting = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_waiting", + Help: "Number of %s requests waiting for flow tokens", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + requestsAdmitted = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_admitted", + Help: "Number of %s requests admitted by the flow controller", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + requestsErrored = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_requests_errored", + Help: "Number of %s requests that errored out while waiting for flow tokens", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + + waitDuration = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_wait_duration", + Help: "Latency histogram for time %s requests spent waiting for flow tokens", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + + totalStreamCount = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_stream_count", + Help: "Total number of replication streams for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } + + blockedStreamCount = metric.Metadata{ + Name: "kvadmission.flow_controller.%s_blocked_stream_count", + Help: "Number of replication streams with no flow tokens available for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } +) + +// annotateMetricTemplateWithWorkClass uses the given metric template to build +// one suitable for the specific work class. +func annotateMetricTemplateWithWorkClass( + wc admissionpb.WorkClass, tmpl metric.Metadata, +) metric.Metadata { + rv := tmpl + rv.Name = fmt.Sprintf(tmpl.Name, wc) + rv.Help = fmt.Sprintf(tmpl.Help, wc) + return rv +} + +type metrics struct { + FlowTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge + FlowTokensDeducted [admissionpb.NumWorkClasses]*metric.Counter + FlowTokensReturned [admissionpb.NumWorkClasses]*metric.Counter + FlowTokensUnaccounted [admissionpb.NumWorkClasses]*metric.Counter + RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge + RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter + RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter + WaitDuration [admissionpb.NumWorkClasses]*metric.Histogram + TotalStreamCount [admissionpb.NumWorkClasses]*metric.Gauge + BlockedStreamCount [admissionpb.NumWorkClasses]*metric.Gauge +} + +var _ metric.Struct = &metrics{} + +func newMetrics(c *Controller) *metrics { + m := &metrics{} + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + wc := wc // copy loop variable + m.FlowTokensAvailable[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, flowTokensAvailable), + func() int64 { + sum := int64(0) + c.mu.Lock() + defer c.mu.Unlock() + for _, wbc := range c.mu.buckets { + sum += int64(wbc.tokens[wc]) + } + return sum + }, + ) + m.FlowTokensDeducted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensDeducted), + ) + m.FlowTokensReturned[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensReturned), + ) + m.FlowTokensUnaccounted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, flowTokensUnaccounted), + ) + m.RequestsWaiting[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClass(wc, requestsWaiting), + ) + m.RequestsAdmitted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, requestsAdmitted), + ) + m.RequestsErrored[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClass(wc, requestsErrored), + ) + m.WaitDuration[wc] = metric.NewHistogram( + annotateMetricTemplateWithWorkClass(wc, waitDuration), + base.DefaultHistogramWindowInterval(), + metric.IOLatencyBuckets, + ) + m.TotalStreamCount[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, totalStreamCount), + func() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return int64(len(c.mu.buckets)) + }, + ) + m.BlockedStreamCount[wc] = metric.NewFunctionalGauge( + annotateMetricTemplateWithWorkClass(wc, blockedStreamCount), + func() int64 { + count := int64(0) + c.mu.Lock() + defer c.mu.Unlock() + for _, wbc := range c.mu.buckets { + if wbc.tokens[wc] <= 0 { + count += 1 + } + } + return count + }, + ) + } + return m +} + +func (m *metrics) onWaiting(class admissionpb.WorkClass) { + m.RequestsWaiting[class].Inc(1) +} + +func (m *metrics) onAdmitted(class admissionpb.WorkClass, dur time.Duration) { + m.RequestsAdmitted[class].Inc(1) + m.RequestsWaiting[class].Dec(1) + m.WaitDuration[class].RecordValue(dur.Nanoseconds()) +} + +func (m *metrics) onErrored(class admissionpb.WorkClass) { + m.RequestsErrored[class].Inc(1) +} + +func (m *metrics) onTokenAdjustment(adjustment tokensPerWorkClass) { + for class, delta := range adjustment { + if delta < 0 { + m.FlowTokensDeducted[class].Inc(-int64(delta)) + } else if delta > 0 { + m.FlowTokensReturned[class].Inc(int64(delta)) + } + } +} + +func (m *metrics) onUnaccounted(unaccounted tokensPerWorkClass) { + for class, delta := range unaccounted { + m.FlowTokensUnaccounted[class].Inc(int64(delta)) + } +} + +// MetricStruct implements the metric.Struct interface. +func (m *metrics) MetricStruct() {} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go new file mode 100644 index 000000000000..b595f06d645d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_simulation_test.go @@ -0,0 +1,523 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontroller + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/asciitsdb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/dustin/go-humanize" + "github.com/guptarohit/asciigraph" + "github.com/mkungla/bexp/v3" + "github.com/stretchr/testify/require" +) + +// TestUsingSimulation is a data-driven test for kvflowcontrol.Controller. +// It gives package authors a way to understand how flow tokens are maintained +// for individual replication streams, how write bandwidth is shaped by these +// tokens, and how requests queue/dequeue internally. We provide the following +// syntax: +// +// - "init" +// Initialize the flow controller and test simulator. +// +// - "timeline" +// start= end= class={regular,elastic} \ +// stream=t/s adjust={+,-}/s rate=/s \ +// [deduction-delay=] +// .... +// Creates a "thread" that operates between t='start' to (non-inclusive) +// t='end', issuing the specified 'rate' of requests of the given work +// 'class', over the given 'stream', where the flow tokens are +// {deducted,returned} with the given bandwidth. The 'rate' controls the +// granularity of token adjustment, i.e. if adjust=+100bytes/s and +// rate=5/s, then each return adjusts by +100/5 = +20bytes. If flow tokens +// are being deducted (-ve 'adjust'), they go through Admit() followed by +// DeductTokens(). If they're being returned (+ve 'adjust'), they simply go +// through ReturnTokens(). The optional 'deduction-delay' parameter controls +// the number of ticks between each request being granted admission and it +// deducting the corresponding flow tokens. +// +// - "simulate" [end=] +// Simulate timelines until the optionally specified timestamp. If no +// timestamp is specified, the largest end time of all registered timelines +// is used instead. +// +// - "plot" [height=] [width=] [precision=] \ +// [start=] [end=] +// unit= [rate=true] +// .... +// Plot the flow controller specified metrics (and optionally its rate of +// change) with the specified units. The following metrics are supported: +// a. kvadmission.flow_controller.{regular,elastic}_tokens_{available,deducted,returned} +// b. kvadmission.flow_controller.{regular,elastic}_requests_{waiting,admitted,errored} +// c. kvadmission.flow_controller.{regular,elastic}{,_blocked}_stream_count +// d. kvadmission.flow_controller.{regular,elastic}_wait_duration +// To overlay metrics onto the same plot, the selector supports curly brace +// expansion. If the unit is one of {MiB,MB,KB,KiB,s,ms,us,μs}, or the +// bandwidth equivalents (/s), the y-axis is automatically +// converted. +// +// Internally we make use of a test-only non-blocking interface for the flow +// controller in order to enable deterministic simulation. +func TestUsingSimulation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + datadriven.Walk(t, datapathutils.TestDataPath(t, "simulation"), func(t *testing.T, path string) { + var ( + controller *Controller + simulator *simulator + tsdb *asciitsdb.TSDB + mtime *timeutil.ManualTime + ) + + ctx := context.Background() + datadriven.RunTest(t, path, + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + registry := metric.NewRegistry() + tsdb = asciitsdb.New(t, registry) + mtime = timeutil.NewManualTime(tstart) + controller = New( + registry, + cluster.MakeTestingClusterSettings(), + hlc.NewClock(mtime, time.Nanosecond /* maxOffset */), + ) + tsdb.Register(controller.metrics) + simulator = newSimulator(t, controller, tsdb, mtime) + return "" + + case "timeline": + require.NotNilf(t, controller, "uninitialized flow controller (did you use 'init'?)") + require.NotNilf(t, simulator, "uninitialized simulator (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.True(t, len(parts) >= 6, `expected form: + start= end= class={regular,elastic} \ + stream=t/s adjust={+,-}/s rate=/s \ + [deduction-delay=] +`) + + var ( + start, end time.Time + pri admissionpb.WorkPriority + stream kvflowcontrol.Stream + delta kvflowcontrol.Tokens + rate int + ) + + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + } + + require.True(t, strings.HasPrefix(parts[0], "start=")) + require.True(t, strings.HasPrefix(parts[1], "end=")) + require.True(t, strings.HasPrefix(parts[2], "class=")) + require.True(t, strings.HasPrefix(parts[3], "stream=")) + require.True(t, strings.HasPrefix(parts[4], "adjust=")) + require.True(t, strings.HasPrefix(parts[5], "rate=")) + + for i := range parts[:6] { + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + parts[i] = strings.TrimSpace(inner[1]) + } + + // Parse start=. + dur, err := time.ParseDuration(parts[0]) + require.NoError(t, err) + start = mtime.Now().Add(dur) + + // Parse end=. + dur, err = time.ParseDuration(parts[1]) + require.NoError(t, err) + end = mtime.Now().Add(dur) + + // Parse class={regular,elastic}. + switch parts[2] { + case "regular": + pri = admissionpb.NormalPri + case "elastic": + pri = admissionpb.BulkNormalPri + default: + t.Fatalf("unexpected class: %s", parts[1]) + } + + { // Parse stream=t/s. + inner := strings.Split(parts[3], "/") + require.Len(t, inner, 2) + + ti, err := strconv.Atoi(strings.TrimPrefix(inner[0], "t")) + require.NoError(t, err) + + si, err := strconv.Atoi(strings.TrimPrefix(inner[1], "s")) + require.NoError(t, err) + + stream = kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(uint64(ti)), + StoreID: roachpb.StoreID(si), + } + } + + // Parse adjust={+,-}/s. + isPositive := strings.Contains(parts[4], "+") + parts[4] = strings.TrimPrefix(parts[4], "+") + parts[4] = strings.TrimPrefix(parts[4], "-") + bytes, err := humanize.ParseBytes(strings.TrimSuffix(parts[4], "/s")) + require.NoError(t, err) + delta = kvflowcontrol.Tokens(int64(bytes)) + if !isPositive { + delta = -delta + } + + // Parse rate=/s. + rate, err = strconv.Atoi(strings.TrimSuffix(parts[5], "/s")) + require.NoError(t, err) + + deductionDelay := 0 + if len(parts) > 6 { + for _, part := range parts[6:] { + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "deduction-delay=") { + part = strings.TrimPrefix(part, "deduction-delay=") + dur, err := time.ParseDuration(part) + require.NoError(t, err) + deductionDelay = int(dur.Nanoseconds() / tick.Nanoseconds()) + } + } + } + simulator.timeline(start, end, pri, stream, delta, rate, deductionDelay) + } + return "" + + case "simulate": + require.NotNilf(t, simulator, "uninitialized simulator (did you use 'init'?)") + var end time.Time + if d.HasArg("end") { + // Parse end=. + var endStr string + d.ScanArgs(t, "end", &endStr) + dur, err := time.ParseDuration(endStr) + require.NoError(t, err) + end = mtime.Now().Add(dur) + } + simulator.simulate(ctx, end) + return "" + + case "plot": + var h, w, p = 15, 40, 1 + if d.HasArg("height") { + d.ScanArgs(t, "height", &h) + } + if d.HasArg("width") { + d.ScanArgs(t, "width", &w) + } + if d.HasArg("precision") { + d.ScanArgs(t, "precision", &p) + } + + var buf strings.Builder + for i, line := range strings.Split(d.Input, "\n") { + var ( + selector, unit string + rated bool + ) + parts := strings.Fields(line) + for i, part := range parts { + part = strings.TrimSpace(part) + if i == 0 { + selector = part + continue + } + + if strings.HasPrefix(part, "rate=") { + var err error + rated, err = strconv.ParseBool(strings.TrimPrefix(part, "rate=")) + require.NoError(t, err) + } + + if strings.HasPrefix(part, "unit=") { + unit = strings.TrimPrefix(part, "unit=") + } + } + + caption := strings.TrimPrefix(selector, "kvadmission.flow_controller.") + if rated { + caption = fmt.Sprintf("rate(%s)", caption) + } + caption = fmt.Sprintf("%s (%s)", caption, unit) + + options := []asciitsdb.Option{ + asciitsdb.WithGraphOptions( + asciigraph.Height(h), + asciigraph.Width(w), + asciigraph.Precision(uint(p)), + asciigraph.Caption(caption), + ), + } + if rated { + options = append(options, asciitsdb.WithRate(int(time.Second/metricTick))) + } + switch unit { + case "μs", "us", "microseconds": + time.Microsecond.Nanoseconds() + options = append(options, asciitsdb.WithDivisor(float64(time.Microsecond.Nanoseconds())) /* ns => μs conversion */) + case "ms", "milliseconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Millisecond.Nanoseconds())) /* ns => μs conversion */) + case "s", "seconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Second.Nanoseconds())) /* ns => μs conversion */) + case "MiB", "MiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MiByte) /* 1 MiB */) + case "MB", "MB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MByte) /* 1 MB */) + case "KiB", "KiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KiByte) /* 1 KiB */) + case "KB", "KB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KByte) /* 1 KB */) + default: + } + + start := tstart + if d.HasArg("start") { + // Parse start=. + var startStr string + d.ScanArgs(t, "start", &startStr) + dur, err := time.ParseDuration(startStr) + require.NoError(t, err) + start = tstart.Add(dur) + options = append(options, asciitsdb.WithOffset(start.Sub(tstart).Nanoseconds()/metricTick.Nanoseconds())) + } + + if d.HasArg("end") { + // Parse end=. + var endStr string + d.ScanArgs(t, "end", &endStr) + dur, err := time.ParseDuration(endStr) + require.NoError(t, err) + end := tstart.Add(dur) + options = append(options, asciitsdb.WithLimit(end.Sub(start).Nanoseconds()/metricTick.Nanoseconds())) + } + + if i > 0 { + buf.WriteString("\n\n\n") + } + metrics := bexp.Parse(strings.TrimSpace(selector)) + buf.WriteString(tsdb.Plot(metrics, options...)) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }, + ) + }) +} + +var tstart = timeutil.Unix(0, 0) + +const tick = time.Millisecond +const metricTick = 100 * tick + +type simulator struct { + t *testing.T + + controller *Controller + ticker []ticker + tsdb *asciitsdb.TSDB + mtime *timeutil.ManualTime +} + +func newSimulator( + t *testing.T, controller *Controller, tsdb *asciitsdb.TSDB, mtime *timeutil.ManualTime, +) *simulator { + return &simulator{ + t: t, + tsdb: tsdb, + controller: controller, + mtime: mtime, + } +} + +func (s *simulator) timeline( + start, end time.Time, + pri admissionpb.WorkPriority, + stream kvflowcontrol.Stream, + delta kvflowcontrol.Tokens, + rate, deductionDelay int, +) { + if rate == 0 { + return // nothing to do + } + s.ticker = append(s.ticker, ticker{ + t: s.t, + controller: s.controller, + + start: start, + end: end, + pri: pri, + stream: stream, + + deductionDelay: deductionDelay, + deduct: make(map[time.Time][]func()), + waitHandles: make(map[time.Time]waitHandle), + + // Using the parameters above, we figure out two things: + // - On which ticks do we adjust flow tokens? + // - How much by, each time? + // + // If the request rate we're simulating is: + // - 1000/sec, we adjust flow tokens every tick(=1ms). + // - 500/sec, we adjust flow tokens every 2 ticks (=2ms). + // - .... + // + // How much do we adjust by each time? Given we're making 'rate' requests + // per second, and have to deduct 'delta' tokens per second, each request + // just deducts delta/rate. + mod: int(time.Second/tick) / rate, + delta: kvflowcontrol.Tokens(int(delta) / rate), + }) +} + +func (s *simulator) simulate(ctx context.Context, end time.Time) { + s.mtime.Backwards(s.mtime.Since(tstart)) // reset to tstart + s.tsdb.Clear() + for i := range s.ticker { + s.ticker[i].reset() + if s.ticker[i].end.After(end) { + end = s.ticker[i].end + } + } + + for { + t := s.mtime.Now() + if t.After(end) || t.Equal(end) { + break + } + for i := range s.ticker { + s.ticker[i].tick(ctx, t) + } + if t.UnixNano()%metricTick.Nanoseconds() == 0 { + s.tsdb.Scrape(ctx) + } + s.mtime.Advance(tick) + } +} + +type waitHandle struct { + ctx context.Context + signaled func() bool + admit func() bool +} + +type ticker struct { + t *testing.T + start, end time.Time + pri admissionpb.WorkPriority + stream kvflowcontrol.Stream + delta kvflowcontrol.Tokens + controller *Controller + mod, ticks int // used to control the ticks at which we interact with the controller + + deduct map[time.Time][]func() + waitHandles map[time.Time]waitHandle + + deductionDelay int +} + +func (ti *ticker) tick(ctx context.Context, t time.Time) { + if ds, ok := ti.deduct[t]; ok { + for _, deduct := range ds { + deduct() + } + delete(ti.deduct, t) + } + for key, handle := range ti.waitHandles { + // Process all waiting requests from earlier. Do this even if t > + // ti.end since these requests could've been generated earlier. + if handle.ctx.Err() != nil { + delete(ti.waitHandles, key) + continue + } + if !handle.signaled() { + continue + } + if handle.admit() { + if ti.deductionDelay == 0 { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + } else { + future := t.Add(tick * time.Duration(ti.deductionDelay)) + ti.deduct[future] = append(ti.deduct[future], func() { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + }) + } + delete(ti.waitHandles, key) + return + } + } + + if t.Before(ti.start) || (t.After(ti.end) || t.Equal(ti.end)) { + return // we're outside our [ti.start, ti.end), there's nothing left to do + } + + defer func() { ti.ticks += 1 }() + if ti.ticks%ti.mod != 0 { + return // nothing to do in this tick + } + + if ti.delta >= 0 { // return tokens + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + return + } + + admitted, signaled, admit := ti.controller.testingNonBlockingAdmit(ti.pri, ti.stream) + if admitted { + if ti.deductionDelay == 0 { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + } else { + future := t.Add(tick * time.Duration(ti.deductionDelay)) + ti.deduct[future] = append(ti.deduct[future], func() { + ti.controller.adjustTokens(ctx, ti.pri, ti.delta, ti.stream) + }) + } + return + } + ti.waitHandles[t] = waitHandle{ + ctx: ctx, + signaled: signaled, + admit: admit, + } +} + +func (ti *ticker) reset() { + ti.ticks = 0 + ti.deduct = make(map[time.Time][]func()) + ti.waitHandles = make(map[time.Time]waitHandle) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment new file mode 100644 index 000000000000..5e4c3a460894 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/flow_token_adjustment @@ -0,0 +1,65 @@ +init +---- + +adjust +class=regular delta=-1MiB +class=regular delta=-7MiB +class=regular delta=-2MiB +class=regular delta=-2MiB +class=regular delta=+2MiB +class=regular delta=-2MiB +class=regular delta=+4MiB +class=regular delta=+2MiB +class=elastic delta=-2MiB +class=regular delta=-2MiB +class=regular delta=+2MiB +class=elastic delta=+2MiB +class=regular delta=+6MiB +---- + +history +---- + regular | elastic + +16MiB | +8.0MiB +====================================== + -1.0MiB regular +15MiB | +7.0MiB + -7.0MiB regular +8.0MiB | +0B (elastic blocked) + -2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) + -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) + +2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) + -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) + +4.0MiB regular +8.0MiB | +0B (elastic blocked) + +2.0MiB regular +10MiB | +2.0MiB + -2.0MiB elastic +10MiB | +0B (elastic blocked) + -2.0MiB regular +8.0MiB | -2.0MiB (elastic blocked) + +2.0MiB regular +10MiB | +0B (elastic blocked) + +2.0MiB elastic +10MiB | +2.0MiB + +6.0MiB regular +16MiB | +8.0MiB + +# ------------------------------------------------------ + +init +---- + +adjust +class=elastic delta=-7MiB +class=regular delta=-7MiB +class=elastic delta=+6MiB +class=regular delta=-1MiB +class=regular delta=-6MiB +class=regular delta=+6MiB +class=regular delta=-9MiB +---- + +history +---- + regular | elastic + +16MiB | +8.0MiB +====================================== + -7.0MiB elastic +16MiB | +1.0MiB + -7.0MiB regular +9.0MiB | -6.0MiB (elastic blocked) + +6.0MiB elastic +9.0MiB | +0B (elastic blocked) + -1.0MiB regular +8.0MiB | -1.0MiB (elastic blocked) + -6.0MiB regular +2.0MiB | -7.0MiB (elastic blocked) + +6.0MiB regular +8.0MiB | -1.0MiB (elastic blocked) + -9.0MiB regular -1.0MiB | -10MiB (regular and elastic blocked) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy new file mode 100644 index 000000000000..212cd333d935 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/admitting_waiting_choppy @@ -0,0 +1,198 @@ +# Show how waiting work ends up getting admitted choppily if the flow tokens +# being returned are being done so in a similarly choppy way. This is showing +# that we're shaping incoming writes to exactly the rate of flow token returns, +# i.e. we're controlling the flow tightly. +init +---- + +# Set up an open-loop thread issuing 2MiB/s of regular writes from t=0s to +# t=25s. +timeline +start=0s end=25s class=regular stream=t1/s1 adjust=-2MiB/s rate=10/s +---- + +# Set up choppy flow token returns starting at t=15s. The average rate of +# returns is lower than 2MiB/s, so we should always have some waiting work. +timeline +start=15s end=16s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=16s end=17s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=17s end=18s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=18s end=19s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=19s end=20s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=20s end=21s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=21s end=22s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +start=22s end=23s class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +start=23s end=24s class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +start=24s end=25s class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +---- + +simulate +---- + +# Observe the initial smooth rate of flow token deductions, and later the +# choppier rate of flow token returns which we induced above. Notice that the +# rate of flow token deductions exactly mirrors the flow token returns, so +# traffic shaping is happening. +plot +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB/s rate=true +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB +kvadmission.flow_controller.regular_tokens_available unit=MiB +---- +---- + 2.0 ┤ ╭──────────╮ ╭╮ ╭╮ + 1.9 ┤ │ │ ││ ╭╮ + 1.7 ┤ │ │ ╭╮╮ ││ ││ + 1.6 ┤ │ │ │││ ││ ││ + 1.5 ┤ │ ╰╮ │╰╮ ││ ╭╯│ + 1.3 ┤ │ │ │ │ ││ │╯│ + 1.2 ┤ │ │ │ │ ╭╯│╮ │ │ + 1.1 ┤ │ │ │ │ │ ╰╮ │ │ + 0.9 ┤ │ │ ╭╯ │ │ │ │ │╮╭ + 0.8 ┤ │ │ │╯ │ │ │╭╯ │││ + 0.7 ┤ │ │ │ │ │ ││ ╰╮│ + 0.5 ┤ │ │ │ │╭╯ ││ ││ + 0.4 ┤ │ │ ╭╯ ││╯ ││ ╰╯ + 0.3 ┤ │ │ │╯ ││ ╰╯ ╰╯ + 0.1 ┤ │ ╰╮ │ ╰╯ ╰╯ + 0.0 ┼───────────────────────╯ + rate(regular_tokens_{deducted,returned}) (MiB/s) + + + 26.2 ┤ ╭── + 24.5 ┤ ╭─╯ + 22.7 ┤ ╭──╯ + 21.0 ┤ ╭─╯ + 19.2 ┤ ╭──╯ + 17.5 ┤ ╭─╯ + 15.7 ┤ ╭────────────╯ + 14.0 ┤ ╭╯ + 12.2 ┤ ╭─╯ + 10.5 ┤ ╭╯ ╭─ + 8.7 ┤ ╭─╯ ╭──╯ + 7.0 ┤ ╭╯ ╭──╯ + 5.2 ┤ ╭╯ ╭─╯ + 3.5 ┤ ╭─╯ ╭──╯ + 1.7 ┤╭╯ ╭─╯ + 0.0 ┼────────────────────────╯ + regular_tokens_{deducted,returned} (MiB) + + + 15.8 ┼╮ + 14.7 ┤╰╮ + 13.7 ┤ │ + 12.6 ┤ ╰╮ + 11.5 ┤ ╰╮ + 10.5 ┤ ╰╮ + 9.4 ┤ ╰╮ + 8.3 ┤ ╰╮ + 7.3 ┤ │ + 6.2 ┤ ╰╮ + 5.1 ┤ ╰╮ + 4.1 ┤ ╰╮ + 3.0 ┤ ╰╮ + 1.9 ┤ ╰╮ + 0.9 ┤ │ + -0.2 ┤ ╰─────────────────────────── + regular_tokens_available (MiB) +---- +---- + + +# Zoom into the more interesting second half of the graph, where flow tokens +# are being returned choppily. Given the average rate of returns is lower +# than what's being requested (2MiB/s), the total flow tokens available bounces +# off of zero. +plot start=15s end=30s +kvadmission.flow_controller.regular_tokens_available unit=MiB +---- + 0.2 ┤ ╭╮ + 0.2 ┤ ╭╮ ╭╯│ + 0.1 ┤ ╭╯│ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╮ ╭╮ │ │ + 0.1 ┤ │ │ ││╭╯│ │ │ + 0.0 ┤ │ │ ╭╮ │╰╯ │ │ │ + 0.0 ┤ │ │ ╭╯│ │ │ ╭╮│ │ ╭╮ + -0.0 ┤╭╮ │ │ │ │ │ │ ╭──╯││ │ ││ ╭ + -0.0 ┤││╭╯ │ │ ╰╮│ │ ╭╯ ││ │ ││╭╯ + -0.1 ┤│╰╯ │ │ ││ │╭╯ ╰╯ │ │╰╯ + -0.1 ┤│ │ │ ╰╯ ╰╯ │ │ + -0.1 ┼╯ │ ╭╯ │ ╭╯ + -0.1 ┤ │ ╭╯ │ ╭╯ + -0.2 ┤ │╭╯ │╭╯ + -0.2 ┤ ╰╯ ╰╯ + regular_tokens_available (MiB) + + +# Note again the mirroring between token returns which immediately allows +# admission, followed by token deductions. +plot start=15s end=30s +kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB rate=true +---- + 2.1 ┤ ╭╮ + 2.0 ┤ ╭╮│ ╭╮ ╭╮╮ + 1.9 ┤ │╰╮ ││╮ │││ + 1.7 ┤ ╭╯╯│ ╭╯╰╮ ╭╯╰╮ + 1.6 ┤ ││ │╮ │╯ │ │╯ │ + 1.4 ┤ │╯ ││ ╭╯ │ ╭╯ │╮ + 1.3 ┤ ╭╯ ╰╮ │ ╰╮ │╯ ││ + 1.1 ┤ │╯ │╮ ╭╯ │ ╭╯ ╰╮ + 1.0 ┤ ╭╯ ││ │╯ │ │╯ │╮ + 0.9 ┤ │╯ ╰╮ ╭╯ │╮ ╭╯ ││ ╭ + 0.7 ┤ ╭╯ │ │ ╰╮ ╭╯ ││ ╭╯ + 0.6 ┤ ╭╯╯ │╮ ╭╯ │ │╯ ╰╮ │╯ + 0.4 ┤ │╯ ││╭╯ │ ╭╯ │╮╭╯ + 0.3 ┤╭╯ ╰─╯│ ╰─╯ │╭╯ + 0.1 ┼╯╯ │╭╯ ╰╯ ╰╯╯ +-0.0 ┼╯ ╰╯ + rate(regular_tokens_{deducted,returned}) (MiB) + + +# So we're still admitting work choppily, and observe corresponding deductions +# in the waiting request rate. But given the open-loop thread above, the # of +# waiting request is still growing unboundedly. +plot start=15s end=30s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.regular_requests_waiting unit=reqs/s +---- +---- + 10.7 ┤ ╭╮ + 9.9 ┼╮ ││ ╭╮ ╭╮ ╭╮ + 9.2 ┤╰╮ ╭╯│ │╰╮ ╭─╮ ╭╮ ╭╯│ ││ + 8.4 ┤ │ │ ╰╮ │ │ ╭╯ │ ╭╯╰╮ ╭╯ │ │╰╮ + 7.7 ┤ │ ╭╯ │ │ ╰╮ │ │ │ │ │ ╰╮╭╯ │ + 6.9 ┤ ╰╮ │ │╭╯ │ ╭╯ ╰╮│ ╰╮ ╭╯ ││ ╰╮ + 6.1 ┤ ╰╮╭╯ ││ ╰╮│ ╭╯ │ │ ││ ╰ + 5.4 ┤ ││ ╰│ │╯ │ ╰╮╯ ╭╯ + 4.6 ┤ ╰╮ ╭╯ ╰╮ │ ╭╰╮ │╮ + 3.9 ┤ ╭╰╮ ││ ╭╯│ │╮ │ │ ││ ╭ + 3.1 ┤ ╭╯ ╰╮ │╰╮ │ ╰╮ ╭╯│ ╭╯ ╰╮ ╭╯│ ╭╯ + 2.3 ┤ ╭╯ │ │ │ ╭╯ ╰╮ │ │ ╭╯ ╰╮ │ ╰╮╭╯ + 1.6 ┤ │ ╰╮╭╯ │ │ │ │ ╰╮│ │ │ ││ + 0.8 ┤╭╯ ││ │╭╯ ╰─╯ ╰╯ ╰╮│ ╰╯ + 0.1 ┼╯ ││ ╰╯ ╰╯ + -0.7 ┤ ╰╯ + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 118 ┤ ╭ + 115 ┤ ╭──╯ + 112 ┤ ╭╯ + 108 ┤ ╭╯ + 105 ┤ ╭─────╯ + 102 ┤ ╭─╯ + 99 ┤ ╭─╯ + 96 ┤ ╭─╯ + 92 ┤ ╭╯ + 89 ┤ ╭─────╯ + 86 ┤ ╭─╯ + 83 ┤ ╭─╯ + 80 ┤ ╭╯ + 76 ┤ ╭╯ + 73 ┤ ╭──────╯ + 70 ┼─╯ + regular_requests_waiting (reqs/s) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission new file mode 100644 index 000000000000..39b74c681351 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/over_admission @@ -0,0 +1,97 @@ +# Demonstrate how any delay in token deduction after being admitted can lead to +# over-admission. +init +---- + +# Take away all 16MiB of regular flow tokens; we want a buildup of waiting +# requests to over-admit from. +timeline +start=0s end=1s class=regular stream=t1/s1 adjust=-16MiB/s rate=1/s +---- + +# Queue 10/s*10s=100 requests for admission, asking for 10*4MiB=40MiB of +# tokens. For these requests, induce a 2s delay between Admit() and +# DeductTokens(). +timeline +start=10s end=20s class=regular stream=t1/s1 adjust=-4MiB/s rate=10/s deduction-delay=2s +---- + +# Return 1KiB of flow tokens at t=30. +timeline +start=30s end=31s class=regular stream=t1/s1 adjust=+1KiB/s rate=1/s +---- + +simulate end=40s +---- + +# Observe how the single 1KiB flow token return ends up admitting all 100 +# waiting requests, over-admitting by 40MiB. +# +# TODO(irfansharif): Introduce a "tentative deducted counter" on a per-stream +# basis, to prevent this kind of over-admission. It's likely to occur any time +# there's AC queueing due to CPU control, waiting on locks/latches, etc. +plot start=0s end=40s +kvadmission.flow_controller.regular_tokens_available unit=MiB +kvadmission.flow_controller.regular_requests_admitted unit=reqs/s rate=true +kvadmission.flow_controller.regular_requests_waiting unit=reqs +---- +---- + 0.0 ┼───────────────────────────────╮ + -2.7 ┤ │ + -5.3 ┤ │ + -8.0 ┤ │ + -10.7 ┤ │ + -13.3 ┤ │ + -16.0 ┤ │ + -18.7 ┤ │ + -21.3 ┤ │ + -24.0 ┤ │ + -26.7 ┤ │ + -29.3 ┤ │ + -32.0 ┤ │ + -34.7 ┤ │ + -37.3 ┤ │ + -40.0 ┤ ╰─────── + regular_tokens_available (MiB) + + + 100.0 ┤ ╭╮ + 93.3 ┤ ││ + 86.7 ┤ ││ + 80.0 ┤ ││ + 73.3 ┤ ││ + 66.7 ┤ ││ + 60.0 ┤ ││ + 53.3 ┤ ││ + 46.7 ┤ ││ + 40.0 ┤ ││ + 33.3 ┤ ││ + 26.7 ┤ ││ + 20.0 ┤ ││ + 13.3 ┤ ││ + 6.7 ┤ ││ + 0.0 ┼─────────────────────────────╯╰──────── + rate(regular_requests_admitted) (reqs/s) + + + 100.0 ┤ ╭─────────╮ + 93.3 ┤ ╭╯ │ + 86.7 ┤ ╭╯ │ + 80.0 ┤ │ │ + 73.3 ┤ ╭╯ │ + 66.7 ┤ ╭╯ │ + 60.0 ┤ │ │ + 53.3 ┤ ╭╯ │ + 46.7 ┤ ╭╯ │ + 40.0 ┤ │ │ + 33.3 ┤ ╭╯ │ + 26.7 ┤ ╭╯ │ + 20.0 ┤ │ │ + 13.3 ┤ ╭╯ │ + 6.7 ┤ │ │ + 0.0 ┼──────────╯ ╰───────── + regular_requests_waiting (reqs) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview new file mode 100644 index 000000000000..690a869a349b --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/overview @@ -0,0 +1,188 @@ +# Walk through the basics of the datadriven syntax. +init +---- + +# Set up a worker deducting flow tokens at 2MiB/s over 100 reqs/s for regular +# write work to two stores, s1 and s2. With 16MiB or regular tokens, we expect +# it to get depleted in 8s. The remaining 8s of incoming work ends up getting +# queued. +timeline +start=0s end=16s class=regular stream=t1/s1 adjust=-2MiB/s rate=100/s +start=0s end=16s class=regular stream=t1/s2 adjust=-2MiB/s rate=100/s +---- + +# Schedule the return of flow tokens at t=20s in similar increments. +timeline +start=20s end=36s class=regular stream=t1/s1 adjust=+2MiB/s rate=100/s +start=20s end=36s class=regular stream=t1/s2 adjust=+2MiB/s rate=100/s +---- + +simulate end=40s +---- + +# There two replication streams, so we initially have 16*2=32MiB of regular +# flow tokens and 8*2=16MiB of elastic flow tokens. Since we're not returning +# flow tokens until t=20s, we deplete them at t=8s. Also note that despite this +# being regular traffic, we deduct elastic flow tokens for coarse intra-tenant +# prioritization -- those tokens get deducted at t=4s. Both are deducted at +# 4MiB/s (2MiB/s for each stream) until we have 0MiB regular tokens and -16MiB +# elastic tokens. +plot start=0s end=20s +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +kvadmission.flow_controller.{regular,elastic}_tokens_deducted unit=MiB/s rate=true +---- +---- + 32.0 ┼╮ + 28.8 ┤╰─╮ + 25.6 ┤ ╰╮ + 22.4 ┤ ╰─╮ + 19.2 ┤ ╰─╮ + 16.0 ┼╮ ╰╮ + 12.8 ┤╰─╮ ╰─╮ + 9.6 ┤ ╰╮ ╰╮ + 6.4 ┤ ╰─╮ ╰─╮ + 3.2 ┤ ╰─╮ ╰╮ + -0.0 ┤ ╰╮ ╰──────────────────────── + -3.2 ┤ ╰─╮ + -6.4 ┤ ╰╮ + -9.6 ┤ ╰─╮ + -12.8 ┤ ╰╮ + -16.0 ┤ ╰──────────────────────── + {regular,elastic}_tokens_available (MiB) + + + 4.0 ┤ ╭─────────────╮ + 3.7 ┤ │ │ + 3.5 ┤ │ ╰╮ + 3.2 ┤ │ │ + 2.9 ┤ │ │ + 2.7 ┤ │ │ + 2.4 ┤ │ │ + 2.1 ┤ │ │ + 1.9 ┤ │ │ + 1.6 ┤ │ │ + 1.3 ┤ │ ╰╮ + 1.1 ┤ │ │ + 0.8 ┤ │ │ + 0.5 ┤ │ │ + 0.3 ┤ │ │ + 0.0 ┼─╯ ╰───────────────────── + rate({regular,elastic}_tokens_deducted) (MiB/s) +---- +---- + +# Plot the rates at which we (i) admit work when there are flow tokens +# available, and (ii) enqueue work when there are none. Since we're generating +# 100 requests/s for two streams, we observe an aggregate admission rate of +# ~200/s, and then later 200/s of waiting requests growth. There are no +# errors. +plot start=0s end=20s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.{regular,elastic}_requests_errored unit=reqs/s rate=true +---- +---- + 200 ┤ ╭─────────────╮ ╭─────────────╮ + 187 ┤ │ │ │ │ + 173 ┤ │ ╰╮│ │ + 160 ┤ │ ││ │ + 147 ┤ │ ││ │ + 133 ┤ │ ╭╯ ╰╮ + 120 ┤ │ │ │ + 107 ┤ │ │ │ + 93 ┤ │ │ │ + 80 ┤ │ │ │ + 67 ┤ │ │╮ │ + 53 ┤ │ ││ │ + 40 ┤ │ ││ │ + 27 ┤ │ ╭╯│ ╰╮ + 13 ┤ │ │ │ │ + 0 ┼───────────────╯ ╰───────────────╰───── + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 0.0 ┼─────────────────────────────────────── + rate({regular,elastic}_requests_errored) (reqs/s) +---- +---- + +# Confirm that there are two streams underneath, both of which eventually get +# blocked for {regular,elastic} traffic, with the latter happening first. +plot start=0s end=20s +kvadmission.flow_controller.regular_stream_count unit=streams +kvadmission.flow_controller.{regular,elastic}_blocked_stream_count unit=streams +---- +---- + 2.0 ┼─────────────────────────────────────── + regular_stream_count (streams) + + + 2.0 ┤ ╭─────────────────────────────── + 1.9 ┤ │ │ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ │ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ │ + 0.9 ┤ │ │ + 0.8 ┤ │ │ + 0.7 ┤ │ │ + 0.5 ┤ │ │ + 0.4 ┤ │ │ + 0.3 ┤ │ │ + 0.1 ┤ │ │ + 0.0 ┼───────╯───────╯ + {regular,elastic}_blocked_stream_count (streams) +---- +---- + +# Observe what happens once flow tokens are returned -- we start admitting work +# at ~200/s, which matches the rate at which we're reducing the number of +# waiting requests. By t=36s we'll have returned all {regular,elastic} flow +# tokens, including for the requests that had to wait for admission. +plot start=18s end=40s +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +---- +---- + 200 ┤ ╭───────────╮ + 175 ┤ │ ╰╮ + 150 ┤ ╭╯ │ + 125 ┤ │ │ + 100 ┤ │ │ + 75 ┤ │ │ + 50 ┤ ╭╯ ╰╮ + 25 ┤ │ │ + 0 ┼───╮ ╭─────────────────── + -25 ┤ │ │ + -50 ┤ ╰╮ ╭╯ + -75 ┤ │ │ + -100 ┤ │ │ + -125 ┤ │ │ + -150 ┤ ╰╮ │ + -175 ┤ │ ╭╯ + -200 ┤ ╰───────────╯ + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 32.0 ┤ ╭─────── + 28.8 ┤ ╭─╯ + 25.6 ┤ ╭╯ + 22.4 ┤ ╭╯ + 19.2 ┤ ╭─╯ + 16.0 ┤ ╭╯ ╭─────── + 12.8 ┤ ╭─╯ ╭─╯ + 9.6 ┤ ╭╯ ╭╯ + 6.4 ┤ ╭─╯ ╭╯ + 3.2 ┤ ╭╯ ╭─╯ + -0.0 ┼──────────────────╯ ╭╯ + -3.2 ┤ ╭─╯ + -6.4 ┤ ╭╯ + -9.6 ┤ ╭─╯ + -12.8 ┤ ╭╯ + -16.0 ┼──────────────────╯ + {regular,elastic}_tokens_available (MiB) +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization new file mode 100644 index 000000000000..43df08beebc0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/testdata/simulation/regular_elastic_prioritization @@ -0,0 +1,122 @@ +# Show that elastic work can get completely starved out by regular work, but +# not the other way around. +init +---- + +# Kick off two threads, one for each work class, each writing at 1MiB/s over +# across reqs/s. +timeline +start=0s end=16s class=regular stream=t1/s1 adjust=-1MiB/s rate=100/s +start=0s end=8s class=elastic stream=t1/s1 adjust=-1MiB/s rate=100/s +---- + +# start=16s end=32s class=regular stream=t1/s1 adjust=+2MiB/s rate=100/s +# start=32s end=36s class=elastic stream=t1/s1 adjust=+2MiB/s rate=100/s + +simulate +---- + +# Observe that initially elastic tokens deplete faster than regular tokens +# (while elastic tokens > 0MiB), since regular work deducts from both but +# elastic work only deducts from the elastic bucket. Eventually the rate of +# elastic token deductions slows down since elastic requests stop being +# admitted (and thus deducting tokens) once elastic tokens <= 0 MiB. So it's +# only regular request deductions from that point on. See +# TestFlowTokenAdjustment for more details. +plot +kvadmission.flow_controller.{regular,elastic}_tokens_available unit=MiB +---- + 16.0 ┼╮ + 14.1 ┤╰────╮ + 12.3 ┤ ╰───╮ + 10.4 ┤ ╰────╮ + 8.6 ┤ ╰───╮ + 6.7 ┼─╮ ╰────╮ + 4.8 ┤ ╰──╮ ╰───╮ + 3.0 ┤ ╰─╮ ╰────╮ + 1.1 ┤ ╰─╮ ╰───╮ + -0.7 ┤ ╰───╮ ╰── + -2.6 ┤ ╰───╮ + -4.5 ┤ ╰────╮ + -6.3 ┤ ╰───╮ + -8.2 ┤ ╰────╮ + -10.0 ┤ ╰───╮ + -11.9 ┤ ╰──── + {regular,elastic}_tokens_available (MiB) + + +# Confirm that all throughout we're able to admit regular requests at its +# incoming rate of 100/s. But for elastic requests, once we're out of elastic +# flow tokens, we stop admitting and start waiting instead. We run of elastic +# tokens faster since there are fewer of them (8MiB instead of 16MiB), and also +# they're deducted by both regular and elastic work, compared to regular tokens +# that are deducted only by regular work. +plot +kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true +kvadmission.flow_controller.elastic_requests_{admitted,waiting} unit=reqs/s rate=true +---- +---- + 100.0 ┤ ╭──────────────────────────────────── + 93.3 ┤ │ + 86.7 ┤ │ + 80.0 ┤ │ + 73.3 ┤ │ + 66.7 ┤ │ + 60.0 ┤ │ + 53.3 ┤ │ + 46.7 ┤ │ + 40.0 ┤ │ + 33.3 ┤ │ + 26.7 ┤ │ + 20.0 ┤ │ + 13.3 ┤ │ + 6.7 ┤ │ + 0.0 ┼─────────────────────────────────────── + rate(regular_requests_{admitted,waiting}) (reqs/s) + + + 100.0 ┤ ╭──────╮ ╭──────╮ + 93.3 ┤ │ ╰╮╭╯ │ + 86.7 ┤ │ ││ ╰╮ + 80.0 ┤ │ ││ │ + 73.3 ┤ │ ││ │ + 66.7 ┤ │ ││ │ + 60.0 ┤ │ ││ │ + 53.3 ┤ │ ╰│ │ + 46.7 ┤ │ ╭╯ │ + 40.0 ┤ │ ││ ╰╮ + 33.3 ┤ │ ││ │ + 26.7 ┤ │ ││ │ + 20.0 ┤ │ ││ │ + 13.3 ┤ │ ││ │ + 6.7 ┤ │ ╭╯╰╮ │ + 0.0 ┼─────────╯ ╰────────╰───────────────── + rate(elastic_requests_{admitted,waiting}) (reqs/s) +---- +---- + +# Confirm the above -- when both regular and elastic work gets admitted, we're +# deducting elastic tokens at 2MiB/s, and at t=4s when elastic work gets +# blocked, we start deducting at 1MiB/s. +plot +kvadmission.flow_controller.elastic_tokens_deducted unit=MiB rate=true +---- + 2.0 ┤ ╭──────╮ + 1.9 ┤ │ ╰╮ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ ╰╮ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ ╰─────────────────────────── + 0.9 ┤ │ + 0.8 ┤ │ + 0.7 ┤ │ + 0.5 ┤ │ + 0.4 ┤ │ + 0.3 ┤ │ + 0.1 ┤ │ + 0.0 ┼──╯ + rate(elastic_tokens_deducted) (MiB) + +# vim:ft=sh diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index cd5f4a80dd0a..57dc9911080f 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -10,7 +10,11 @@ package admissionpb -import "math" +import ( + "math" + + "github.com/cockroachdb/redact" +) // WorkPriority represents the priority of work. In an WorkQueue, it is only // used for ordering within a tenant. High priority work can starve lower @@ -39,6 +43,19 @@ const ( OneAboveHighPri int = int(HighPri) + 1 ) +func (w WorkPriority) String() string { + return redact.StringWithoutMarkers(w) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { + if s, ok := WorkPriorityDict[w]; ok { + p.Print(s) + return + } + p.Printf("custom-pri=%d", w) +} + // WorkPriorityDict is a mapping of the priorities to a short string name. The // name is used as the suffix on exported work queue metrics. var WorkPriorityDict = map[WorkPriority]string{ @@ -80,13 +97,18 @@ func WorkClassFromPri(pri WorkPriority) WorkClass { } func (w WorkClass) String() string { + return redact.StringWithoutMarkers(w) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (w WorkClass) SafeFormat(p redact.SafePrinter, verb rune) { switch w { case RegularWorkClass: - return "regular" + p.Printf("regular") case ElasticWorkClass: - return "elastic" + p.Print("elastic") default: - return "" + p.Print("") } } diff --git a/pkg/util/admission/testdata/store_work_queue b/pkg/util/admission/testdata/store_work_queue index ae615d87acd6..26fba8dc46e4 100644 --- a/pkg/util/admission/testdata/store_work_queue +++ b/pkg/util/admission/testdata/store_work_queue @@ -63,7 +63,7 @@ print regular workqueue: closed epoch: 0 tenantHeap len: 1 top tenant: 57 tenant-id: 53 used: 101, w: 1, fifo: -128 tenant-id: 55 used: 600, w: 1, fifo: -128 - tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 0] + tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 0] elastic workqueue: closed epoch: 0 tenantHeap len: 0 stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} diff --git a/pkg/util/admission/testdata/work_queue b/pkg/util/admission/testdata/work_queue index 3a736dcc48c3..a625fea3755c 100644 --- a/pkg/util/admission/testdata/work_queue +++ b/pkg/util/admission/testdata/work_queue @@ -26,7 +26,7 @@ tryGet: returning false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] admit id=3 tenant=53 priority=0 create-time-millis=2 bypass=false ---- @@ -36,7 +36,7 @@ admit id=3 tenant=53 priority=0 create-time-millis=2 bypass=false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] # Request from tenant 71. admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false @@ -51,8 +51,8 @@ admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 5, epoch: 0, qt: 100] [1: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 5, epoch: 0, qt: 100] [1: pri: low-pri, ct: 4, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -65,8 +65,8 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 2, epoch: 0, qt: 100] [1: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # Cancel a request from tenant 53. cancel-work id=3 @@ -76,8 +76,8 @@ id 3: admit failed print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 71 - tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # The work admitted for tenant 53 is done. work-done id=1 @@ -88,8 +88,8 @@ returnGrant 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] # A request from the system tenant bypasses admission control, but is # reflected in the WorkQueue state. @@ -102,8 +102,8 @@ print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 53 tenant-id: 1 used: 1, w: 1, fifo: -128 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100] - tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 3, epoch: 0, qt: 100] + tenant-id: 71 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: low-pri, ct: 4, epoch: 0, qt: 100] granted chain-id=7 ---- @@ -151,12 +151,12 @@ tryGet: returning false advance-time millis=205 ---- closed epoch: 2 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] print ---- closed epoch: 2 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 53 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -189,7 +189,7 @@ admit id=4 tenant=53 priority=0 create-time-millis=400 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 1, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 399, epoch: 3, qt: 405, lifo-ordering] [1: pri: 0, ct: 50, epoch: 0, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] + tenant-id: 53 used: 1, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 399, epoch: 3, qt: 405, lifo-ordering] [1: pri: normal-pri, ct: 50, epoch: 0, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] # Latest request in closed epoch is granted. granted chain-id=6 @@ -209,7 +209,7 @@ granted: returned 1 print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] # Add request to closed epoch. admit id=5 tenant=53 priority=0 create-time-millis=300 bypass=false @@ -224,7 +224,7 @@ admit id=6 tenant=53 priority=0 create-time-millis=500 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Add high priority request in open epoch 5. admit id=7 tenant=53 priority=127 create-time-millis=550 bypass=false @@ -235,13 +235,13 @@ admit id=7 tenant=53 priority=127 create-time-millis=550 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: high-pri, ct: 550, epoch: 5, qt: 405] [1: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Make the request wait for 60ms so we don't switch back to fifo. advance-time millis=60 ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 3, w: 1, fifo: 1 waiting work heap: [0: pri: high-pri, ct: 550, epoch: 5, qt: 405] [1: pri: normal-pri, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=8 ---- @@ -262,13 +262,13 @@ admit id=8 tenant=53 priority=0 create-time-millis=350 bypass=false print ---- closed epoch: 3 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405] [1: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # One request moved from open to closed epoch heap. advance-time millis=40 ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405, lifo-ordering] [1: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 5, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 400, epoch: 4, qt: 405, lifo-ordering] [1: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=10 ---- @@ -279,7 +279,7 @@ granted: returned 1 print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 6, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 6, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] granted chain-id=11 ---- @@ -290,7 +290,7 @@ granted: returned 1 print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 7, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405] + tenant-id: 53 used: 7, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 500, epoch: 5, qt: 405] # Can dequeue from the open epochs heap if nothing else is remaining. granted chain-id=12 @@ -312,13 +312,13 @@ tryGet: returning false print ---- closed epoch: 4 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This time advance means the previous request will see significant queueing. advance-time millis=100 ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This request in an already closed epoch gets ahead because of higher # create-time-millis. @@ -328,7 +328,7 @@ admit id=10 tenant=53 priority=0 create-time-millis=390 bypass=false print ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 390, epoch: 3, qt: 605, lifo-ordering] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 8, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 390, epoch: 3, qt: 605, lifo-ordering] [1: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] granted chain-id=12 ---- @@ -339,13 +339,13 @@ granted: returned 1 print ---- closed epoch: 5 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 9, w: 1, fifo: 1 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] # This advance will switch all priorities back to FIFO. advance-time millis=100 ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] + tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] admit id=11 tenant=53 priority=0 create-time-millis=610 bypass=false ---- @@ -359,7 +359,7 @@ admit id=12 tenant=53 priority=-128 create-time-millis=615 bypass=false print ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 610, epoch: 6, qt: 705] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [2: pri: -128, ct: 615, epoch: 6, qt: 705] + tenant-id: 53 used: 9, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 610, epoch: 6, qt: 705] [1: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] [2: pri: low-pri, ct: 615, epoch: 6, qt: 705] granted chain-id=13 ---- @@ -372,7 +372,7 @@ granted: returned 1 print ---- closed epoch: 6 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 10, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [1: pri: -128, ct: 615, epoch: 6, qt: 705] + tenant-id: 53 used: 10, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 380, epoch: 3, qt: 505, lifo-ordering] [1: pri: low-pri, ct: 615, epoch: 6, qt: 705] granted chain-id=14 ---- @@ -401,7 +401,7 @@ tryGet: returning false print ---- closed epoch: 7 tenantHeap len: 1 top tenant: 53 - tenant-id: 53 used: 12, w: 1, fifo: 1 open epochs heap: [0: pri: 0, ct: 810, epoch: 8, qt: 805] + tenant-id: 53 used: 12, w: 1, fifo: 1 open epochs heap: [0: pri: normal-pri, ct: 810, epoch: 8, qt: 805] # Cancel that request. cancel-work id=13 @@ -446,19 +446,19 @@ tryGet: returning false print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Weight is unchanged for tenant 5. set-tenant-weights weights=10:11 ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Now tenant 5 has a new weight. set-tenant-weights weights=5:6,10:11 ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] admit id=2 tenant=10 priority=0 create-time-millis=1 bypass=false ---- @@ -466,8 +466,8 @@ admit id=2 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 0, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 0, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=1 ---- @@ -500,8 +500,8 @@ admit id=4 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 1, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 1, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Grant to tenant 10 so it is using 2 slots. granted chain-id=3 @@ -513,7 +513,7 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 1 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] tenant-id: 10 used: 2, w: 11, fifo: -128 # Another request from tenant 10. @@ -524,8 +524,8 @@ admit id=5 tenant=10 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 11, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 11, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Increase tenant 10's weight so it becomes the top of the heap. Weight # scaling is also applied here to make the max weight 20, and reduce tenant @@ -533,22 +533,22 @@ closed epoch: 0 tenantHeap len: 2 top tenant: 5 set-tenant-weights weights=5:6,10:30 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 4, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 20, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 4, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 20, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Restore all weights to 1. Tenant 5 is now top of the heap. set-tenant-weights weights=5:1,10:1 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Adjust weights to make tenant 10 just slightly preferable over tenant 5. set-tenant-weights weights=5:6,10:13 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 2, w: 13, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 2, w: 13, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Add another request for tenant 10. admit id=6 tenant=10 priority=0 create-time-millis=1 bypass=false @@ -565,16 +565,16 @@ granted: returned 1 print ---- closed epoch: 0 tenantHeap len: 2 top tenant: 5 - tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 3, w: 13, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 3, w: 13, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Bump up tenant 10's weight to a huge value. Tenant 5's weight is not scaled # down to 0, since the minimum weight is 1. set-tenant-weights weights=5:1,10:100000 ---- closed epoch: 0 tenantHeap len: 2 top tenant: 10 - tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 10 used: 3, w: 20, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 1, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 10 used: 3, w: 20, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=5 ---- @@ -623,27 +623,27 @@ admit id=8 tenant=8 priority=0 create-time-millis=1 bypass=false print ---- closed epoch: 0 tenantHeap len: 8 top tenant: 1 - tenant-id: 1 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 2 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 3 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 4 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 7 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 8 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 1 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 2 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 3 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 4 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 7 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 8 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] # Set weights for 7 out of the 8 tenants. set-tenant-weights weights=1:2,2:3,3:4,4:5,5:6,7:8,8:9 ---- closed epoch: 0 tenantHeap len: 8 top tenant: 1 - tenant-id: 1 used: 0, w: 2, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 2 used: 0, w: 3, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 3 used: 0, w: 4, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 4 used: 0, w: 5, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 7 used: 0, w: 8, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] - tenant-id: 8 used: 0, w: 9, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100] + tenant-id: 1 used: 0, w: 2, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 2 used: 0, w: 3, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 3 used: 0, w: 4, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 4 used: 0, w: 5, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 5 used: 0, w: 6, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 6 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 7 used: 0, w: 8, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] + tenant-id: 8 used: 0, w: 9, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 100] granted chain-id=1 ---- diff --git a/pkg/util/asciitsdb/BUILD.bazel b/pkg/util/asciitsdb/BUILD.bazel new file mode 100644 index 000000000000..8a81a4c655cc --- /dev/null +++ b/pkg/util/asciitsdb/BUILD.bazel @@ -0,0 +1,18 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "asciitsdb", + srcs = ["asciitsdb.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/asciitsdb", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/syncutil", + "@com_github_guptarohit_asciigraph//:asciigraph", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/asciitsdb/asciitsdb.go b/pkg/util/asciitsdb/asciitsdb.go new file mode 100644 index 000000000000..b4c1da25f4f9 --- /dev/null +++ b/pkg/util/asciitsdb/asciitsdb.go @@ -0,0 +1,311 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package asciitsdb + +import ( + "context" + "fmt" + "math" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/guptarohit/asciigraph" + "github.com/stretchr/testify/require" +) + +// TSDB is used to plot ASCII timeseries for tests that want to observe how CRDB +// metrics behave. +type TSDB struct { + t *testing.T + reg *metric.Registry + + mu struct { + syncutil.Mutex + scraped bool + points map[string][]float64 // registered metric name => scraped data points + } +} + +// New returns a new ASCII TSDB. +func New(t *testing.T, reg *metric.Registry) *TSDB { + tsdb := &TSDB{t: t, reg: reg} + tsdb.mu.points = make(map[string][]float64) + return tsdb +} + +// Register registers a metric struct for plotting. For histograms, we follow +// CRDB's internal TSDB naming conventions, i.e. we export values for: +// - {metric name}-count +// - {metric name}-avg +// - {metric name}-max +// - {metric name}-p99 +// - {metric name}-p90 +// - {metric name}-p75 +// - {metric name}-p50 +func (t *TSDB) Register(mstruct interface{}) { + // NB: This code was cargo culted from the metric registry's + // AddStruct method. + + v := reflect.ValueOf(mstruct) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + typ := v.Type() + + for i := 0; i < v.NumField(); i++ { + vfield, tfield := v.Field(i), typ.Field(i) + tname := tfield.Name + if !vfield.CanInterface() { + t.t.Logf("skipping unexported field %s", tname) + continue + } + switch vfield.Kind() { + case reflect.Array: + for i := 0; i < vfield.Len(); i++ { + velem := vfield.Index(i) + telemName := fmt.Sprintf("%s[%d]", tname, i) + // Permit elements in the array to be nil. + const skipNil = true + t.registerMetricValue(velem, telemName, skipNil) + } + default: + // No metric fields should be nil. + const skipNil = false + t.registerMetricValue(vfield, tname, skipNil) + } + } +} + +// Scrape all registered metrics. It records a single data point for all +// registered metrics. +func (t *TSDB) Scrape(ctx context.Context) { + // TB: This code is cargo culted entirely from the TSDB scraper. + + t.mu.Lock() + defer t.mu.Unlock() + t.mu.scraped = true + + t.reg.Each(func(name string, val interface{}) { + if _, ok := t.mu.points[name]; !ok { + return + } + + switch mtr := val.(type) { + case metric.WindowedHistogram: + n := float64(mtr.TotalCountWindowed()) + if _, ok := t.mu.points[name]; !ok { + return + } + t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], n) + avg := mtr.TotalSumWindowed() / n + if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) { + avg = 0 + } + t.mu.points[name+"-avg"] = append(t.mu.points[name+"-avg"], avg) + for _, pt := range []quantile{ + {"-max", 100}, + {"-p99", 99}, + {"-p90", 90}, + {"-p75", 75}, + {"-p50", 50}, + } { + t.mu.points[name+pt.suffix] = append(t.mu.points[name+pt.suffix], mtr.ValueAtQuantileWindowed(pt.quantile)) + } + case metric.PrometheusExportable: + // NB: this branch is intentionally at the bottom since all metrics + // implement it. + m := mtr.ToPrometheusMetric() + if m.Gauge != nil { + t.mu.points[name] = append(t.mu.points[name], *m.Gauge.Value) + } else if m.Counter != nil { + t.mu.points[name] = append(t.mu.points[name], *m.Counter.Value) + } + default: + log.Fatalf(ctx, "cannot extract value for type %T", mtr) + } + }) +} + +// Plot plots the given metrics, using the given options. +func (t *TSDB) Plot(metrics []string, options ...Option) string { + c := configure(config{ + limit: math.MaxInt, + }, options) + + var plots [][]float64 + for _, metric := range metrics { + points, ok := t.read(metric) + require.Truef(t.t, ok, "%s not found", metric) + if c.rate != 0 { + points = rate(points, c.rate) + } + if c.divisor != 0 { + points = divide(points, c.divisor) + } + c.limit = min(c.limit, int64(len(points))-c.offset) + points = points[c.offset : c.offset+c.limit] + plots = append(plots, points) + } + return asciigraph.PlotMany(plots, c.graphopts...) +} + +// Clear all recorded data points. +func (t *TSDB) Clear() { + t.mu.Lock() + defer t.mu.Unlock() + for metric := range t.mu.points { + t.mu.points[metric] = nil + } +} + +// read reads all data points for the given metric name. +func (t *TSDB) read(metric string) ([]float64, bool) { + t.mu.Lock() + defer t.mu.Unlock() + + points, ok := t.mu.points[metric] + return points, ok +} + +func (t *TSDB) registerMetricValue(val reflect.Value, name string, skipNil bool) { + if val.Kind() == reflect.Ptr && val.IsNil() { + if skipNil { + t.t.Logf("skipping nil metric field %s", name) + } else { + t.t.Fatalf("found nil metric field %s", name) + } + return + } + switch typ := val.Interface().(type) { + case metric.Iterable: + t.registerIterable(typ) + case metric.Struct: + t.Register(typ) + default: + t.t.Logf("skipping non-metric field %s", name) + } +} + +func (t *TSDB) registerIterable(metric metric.Iterable) { + t.mu.Lock() + defer t.mu.Unlock() + if t.mu.scraped { + t.t.Fatalf("register all metrics upfront before Scrape()") + } + t.mu.points[metric.GetName()] = []float64{} +} + +// Option represents a configuration setting. +type Option interface { + apply(c *config) +} + +// WithRate plots the rate of growth for a given metric. The parameter is used +// to control how far back (with respect to individual points) compute the delta +// over. +func WithRate(r int) Option { + return optionFunc(func(c *config) { + c.rate = r + }) +} + +// WithDivisor divides all individual points of a metric by the given divisor. +func WithDivisor(d float64) Option { + return optionFunc(func(c *config) { + c.divisor = d + }) +} + +// WithGraphOptions configures the look and feel of the generated ASCII graphs. +func WithGraphOptions(opts ...asciigraph.Option) Option { + return optionFunc(func(c *config) { + c.graphopts = opts + }) +} + +// WithOffset is used to offset a specified number of points in the graph. +func WithOffset(o int64) Option { + return optionFunc(func(c *config) { + c.offset = o + }) +} + +// WithLimit is used to limit the number of points in the graph. +func WithLimit(l int64) Option { + return optionFunc(func(c *config) { + c.limit = l + }) +} + +// config holds the various graphing options. +type config struct { + rate int + divisor float64 + offset, limit int64 + graphopts []asciigraph.Option +} + +// An optionFunc applies an option. +type optionFunc func(*config) + +// apply implements the Option interface. +func (of optionFunc) apply(c *config) { of(c) } + +func configure(defaults config, options []Option) *config { + for _, o := range options { + o.apply(&defaults) + } + return &defaults +} + +// rate takes a list of individual data points (typically read from a cumulative +// counter) and returns a derivative, computed simply by taking each +// corresponding input point and subtracting one r points ago. If r points are +// collected per-{time unit}, this ends up commuting the rate of growth +// per-{time unit}. +func rate(input []float64, r int) []float64 { + rated := make([]float64, len(input)) + for i := 0; i < len(input); i++ { + if i < r { + rated[i] = 0 + continue + } + + delta := input[i] - input[i-r] + rated[i] = delta + } + return rated +} + +// divide takes a list of individual data points and returns another, where each +// point is the corresponding input divided by the divisor. +func divide(input []float64, divisor float64) []float64 { + output := make([]float64, len(input)) + for i := 0; i < len(input); i++ { + output[i] = input[i] / divisor + } + return output +} + +type quantile struct { + suffix string + quantile float64 +} + +func min(i, j int64) int64 { + if i < j { + return i + } + return j +}