-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kvflowcontroller: implement kvflowcontrol.Controller
Part of #95563. kvflowcontroller.Controller is a concrete implementation of the kvflowcontrol.Controller interface. It provides flow control for replication traffic in KV and is typically held at the node-level. Internally it maintain flow token buckets for {regular,elastic} work traveling along each kvflowcontrol.Stream. When work tries to Admit(), the controller blocks until there are flow tokens available the specified stream/work class. For each stream we maintain two flow token buckets, one for each work class (regular and elastic). To provide coarse grained prioritization, we want the following properties: - When elastic tokens are deducted/returned, they only affect the elastic bucket. It cannot cause blocking for regular work (which would be a form of priority inversion). - When regular tokens get deducted, they deduct from both the regular and elastic buckets. We do want regular work to starve out elastic work. If sufficient regular work has not yet been logically admitted, and other regular work is waiting for it, we don't want elastic work to be able to go through. To get a sense of what this ends up looking like, consider the following datadriven snippet: history ---- regular | elastic +16MiB | +8.0MiB ====================================== -1.0MiB regular +15MiB | +7.0MiB -7.0MiB regular +8.0MiB | +0B (elastic blocked) -2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) +2.0MiB regular +6.0MiB | -2.0MiB (elastic blocked) -2.0MiB regular +4.0MiB | -4.0MiB (elastic blocked) +4.0MiB regular +8.0MiB | +0B (elastic blocked) +2.0MiB regular +10MiB | +2.0MiB -2.0MiB elastic +10MiB | +0B (elastic blocked) -2.0MiB regular +8.0MiB | -2.0MiB (elastic blocked) +2.0MiB regular +10MiB | +0B (elastic blocked) +2.0MiB elastic +10MiB | +2.0MiB +6.0MiB regular +16MiB | +8.0MiB The kvflowcontrol.Controller implementation is furnished with the following metrics: - kvadmission.flow_controller.{regular,elastic}_tokens_available - kvadmission.flow_controller.{regular,elastic}_tokens_deducted - kvadmission.flow_controller.{regular,elastic}_tokens_returned - kvadmission.flow_controller.{regular,elastic}_tokens_unaccounted - kvadmission.flow_controller.{regular,elastic}_requests_waiting - kvadmission.flow_controller.{regular,elastic}_requests_admitted - kvadmission.flow_controller.{regular,elastic}_requests_errored - kvadmission.flow_controller.{regular,elastic}_wait_duration - kvadmission.flow_controller.{regular,elastic}_stream_count - kvadmission.flow_controller.{regular,elastic}_blocked_stream_count This commit also introduces a deterministic simulator to understand the kind of traffic shaping this flow controller does. The simulator is able to spit out each of the metrics above. To do all this, we added a utility asciitsdb library that comes 'batteries-included' -- it integrates with our metrics registry, is able to scrape relevant metrics, and spit out ascii plots for metric values, all of which we use in tests added in this package. It looks like the following: # Note again the mirroring between token returns which immediately # allows admission, followed by token deductions. plot start=15s end=30s kvadmission.flow_controller.regular_tokens_{deducted,returned} unit=MiB rate=true ---- 2.1 ┤ ╭╮ 2.0 ┤ ╭╮│ ╭╮ ╭╮╮ 1.9 ┤ │╰╮ ││╮ │││ 1.7 ┤ ╭╯╯│ ╭╯╰╮ ╭╯╰╮ 1.6 ┤ ││ │╮ │╯ │ │╯ │ 1.4 ┤ │╯ ││ ╭╯ │ ╭╯ │╮ 1.3 ┤ ╭╯ ╰╮ │ ╰╮ │╯ ││ 1.1 ┤ │╯ │╮ ╭╯ │ ╭╯ ╰╮ 1.0 ┤ ╭╯ ││ │╯ │ │╯ │╮ 0.9 ┤ │╯ ╰╮ ╭╯ │╮ ╭╯ ││ ╭ 0.7 ┤ ╭╯ │ │ ╰╮ ╭╯ ││ ╭╯ 0.6 ┤ ╭╯╯ │╮ ╭╯ │ │╯ ╰╮ │╯ 0.4 ┤ │╯ ││╭╯ │ ╭╯ │╮╭╯ 0.3 ┤╭╯ ╰─╯│ ╰─╯ │╭╯ 0.1 ┼╯╯ │╭╯ ╰╯ ╰╯╯ -0.0 ┼╯ ╰╯ rate(regular_tokens_{deducted,returned}) (MiB) # So we're still admitting work choppily, and observe corresponding # deductions in the waiting request rate. But given the open-loop # thread above, the # of waiting request is still growing # unboundedly. plot start=15s end=30s kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true kvadmission.flow_controller.regular_requests_waiting unit=reqs/s ---- ---- 10.7 ┤ ╭╮ 9.9 ┼╮ ││ ╭╮ ╭╮ ╭╮ 9.2 ┤╰╮ ╭╯│ │╰╮ ╭─╮ ╭╮ ╭╯│ ││ 8.4 ┤ │ │ ╰╮ │ │ ╭╯ │ ╭╯╰╮ ╭╯ │ │╰╮ 7.7 ┤ │ ╭╯ │ │ ╰╮ │ │ │ │ │ ╰╮╭╯ │ 6.9 ┤ ╰╮ │ │╭╯ │ ╭╯ ╰╮│ ╰╮ ╭╯ ││ ╰╮ 6.1 ┤ ╰╮╭╯ ││ ╰╮│ ╭╯ │ │ ││ ╰ 5.4 ┤ ││ ╰│ │╯ │ ╰╮╯ ╭╯ 4.6 ┤ ╰╮ ╭╯ ╰╮ │ ╭╰╮ │╮ 3.9 ┤ ╭╰╮ ││ ╭╯│ │╮ │ │ ││ ╭ 3.1 ┤ ╭╯ ╰╮ │╰╮ │ ╰╮ ╭╯│ ╭╯ ╰╮ ╭╯│ ╭╯ 2.3 ┤ ╭╯ │ │ │ ╭╯ ╰╮ │ │ ╭╯ ╰╮ │ ╰╮╭╯ 1.6 ┤ │ ╰╮╭╯ │ │ │ │ ╰╮│ │ │ ││ 0.8 ┤╭╯ ││ │╭╯ ╰─╯ ╰╯ ╰╮│ ╰╯ 0.1 ┼╯ ││ ╰╯ ╰╯ -0.7 ┤ ╰╯ rate(regular_requests_{admitted,waiting}) (reqs/s) Release note: None
- Loading branch information
1 parent
8e24570
commit 0bbcf4a
Showing
23 changed files
with
2,536 additions
and
73 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "kvflowcontroller", | ||
srcs = [ | ||
"kvflowcontroller.go", | ||
"kvflowcontroller_metrics.go", | ||
], | ||
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//pkg/base", | ||
"//pkg/kv/kvserver/kvflowcontrol", | ||
"//pkg/settings", | ||
"//pkg/settings/cluster", | ||
"//pkg/util/admission/admissionpb", | ||
"//pkg/util/buildutil", | ||
"//pkg/util/hlc", | ||
"//pkg/util/log", | ||
"//pkg/util/metric", | ||
"//pkg/util/syncutil", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "kvflowcontroller_test", | ||
srcs = [ | ||
"kvflowcontrol_token_adjustment_test.go", | ||
"kvflowcontroller_simulation_test.go", | ||
], | ||
args = ["-test.timeout=295s"], | ||
data = glob(["testdata/**"]), | ||
embed = [":kvflowcontroller"], | ||
deps = [ | ||
"//pkg/kv/kvserver/kvflowcontrol", | ||
"//pkg/roachpb", | ||
"//pkg/settings/cluster", | ||
"//pkg/testutils/datapathutils", | ||
"//pkg/util/admission/admissionpb", | ||
"//pkg/util/asciitsdb", | ||
"//pkg/util/hlc", | ||
"//pkg/util/leaktest", | ||
"//pkg/util/log", | ||
"//pkg/util/metric", | ||
"//pkg/util/timeutil", | ||
"@com_github_cockroachdb_datadriven//:datadriven", | ||
"@com_github_dustin_go_humanize//:go-humanize", | ||
"@com_github_guptarohit_asciigraph//:asciigraph", | ||
"@com_github_mkungla_bexp_v3//:bexp", | ||
"@com_github_stretchr_testify//require", | ||
], | ||
) | ||
|
||
get_x_data(name = "get_x_data") |
159 changes: 159 additions & 0 deletions
159
pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontrol_token_adjustment_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
// Copyright 2023 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package kvflowcontroller | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/metric" | ||
"github.com/cockroachdb/datadriven" | ||
"github.com/dustin/go-humanize" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestFlowTokenAdjustment(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
var ( | ||
ctx = context.Background() | ||
controller *Controller | ||
adjustments []adjustment | ||
stream = kvflowcontrol.Stream{ | ||
TenantID: roachpb.SystemTenantID, | ||
StoreID: roachpb.StoreID(1), | ||
} | ||
) | ||
|
||
datadriven.RunTest(t, "testdata/flow_token_adjustment", | ||
func(t *testing.T, d *datadriven.TestData) string { | ||
switch d.Cmd { | ||
case "init": | ||
controller = New( | ||
metric.NewRegistry(), | ||
cluster.MakeTestingClusterSettings(), | ||
hlc.NewClockWithSystemTimeSource(time.Nanosecond), | ||
) | ||
adjustments = nil | ||
return "" | ||
|
||
case "adjust": | ||
require.NotNilf(t, controller, "uninitialized flow controller (did you use 'init'?)") | ||
|
||
for _, line := range strings.Split(d.Input, "\n") { | ||
parts := strings.Fields(line) | ||
require.Len(t, parts, 2, "expected form 'class={regular,elastic} delta={+,-}<size>") | ||
|
||
var delta kvflowcontrol.Tokens | ||
var pri admissionpb.WorkPriority | ||
|
||
// Parse class={regular,elastic}. | ||
parts[0] = strings.TrimSpace(parts[0]) | ||
require.True(t, strings.HasPrefix(parts[0], "class=")) | ||
parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "class=") | ||
switch parts[0] { | ||
case "regular": | ||
pri = admissionpb.NormalPri | ||
case "elastic": | ||
pri = admissionpb.BulkNormalPri | ||
} | ||
|
||
// Parse delta={+,-}<size> | ||
parts[1] = strings.TrimSpace(parts[1]) | ||
require.True(t, strings.HasPrefix(parts[1], "delta=")) | ||
parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "delta=") | ||
require.True(t, strings.HasPrefix(parts[1], "+") || strings.HasPrefix(parts[1], "-")) | ||
isPositive := strings.Contains(parts[1], "+") | ||
parts[1] = strings.TrimPrefix(parts[1], "+") | ||
parts[1] = strings.TrimPrefix(parts[1], "-") | ||
bytes, err := humanize.ParseBytes(parts[1]) | ||
require.NoError(t, err) | ||
delta = kvflowcontrol.Tokens(int64(bytes)) | ||
if !isPositive { | ||
delta = -delta | ||
} | ||
|
||
controller.adjustTokens(ctx, pri, delta, stream) | ||
adjustments = append(adjustments, adjustment{ | ||
pri: pri, | ||
delta: delta, | ||
post: controller.testingGetTokensForStream(stream), | ||
}) | ||
} | ||
return "" | ||
|
||
case "history": | ||
limit := controller.testingGetLimit() | ||
|
||
var buf strings.Builder | ||
buf.WriteString(" regular | elastic\n") | ||
buf.WriteString(fmt.Sprintf(" %8s | %8s\n", | ||
printTrimmedTokens(limit[admissionpb.RegularWorkClass]), | ||
printTrimmedTokens(limit[admissionpb.ElasticWorkClass]), | ||
)) | ||
buf.WriteString("======================================\n") | ||
for _, h := range adjustments { | ||
buf.WriteString(fmt.Sprintf("%s\n", h)) | ||
} | ||
return buf.String() | ||
|
||
default: | ||
return fmt.Sprintf("unknown command: %s", d.Cmd) | ||
} | ||
}) | ||
} | ||
|
||
type adjustment struct { | ||
pri admissionpb.WorkPriority | ||
delta kvflowcontrol.Tokens | ||
post tokensPerWorkClass | ||
} | ||
|
||
func printTrimmedTokens(t kvflowcontrol.Tokens) string { | ||
return strings.ReplaceAll(t.String(), " ", "") | ||
} | ||
|
||
func (h adjustment) String() string { | ||
class := admissionpb.WorkClassFromPri(h.pri) | ||
|
||
comment := "" | ||
if h.post[admissionpb.RegularWorkClass] <= 0 { | ||
comment = "regular" | ||
} | ||
if h.post[admissionpb.ElasticWorkClass] <= 0 { | ||
if len(comment) == 0 { | ||
comment = "elastic" | ||
} else { | ||
comment = "regular and elastic" | ||
} | ||
} | ||
if len(comment) != 0 { | ||
comment = fmt.Sprintf(" (%s blocked)", comment) | ||
} | ||
return fmt.Sprintf("%8s %7s %8s | %8s%s", | ||
printTrimmedTokens(h.delta), | ||
class, | ||
printTrimmedTokens(h.post[admissionpb.RegularWorkClass]), | ||
printTrimmedTokens(h.post[admissionpb.ElasticWorkClass]), | ||
comment, | ||
) | ||
} |
Oops, something went wrong.