From a9b02837cc423b86585ce2e9e23b106cd8d534ef Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Fri, 27 Aug 2021 16:11:20 -0400 Subject: [PATCH 1/3] ccl/serverccl: increase http client timeout in tests Tests run under stress would sometimes timeout waiting on HTTP calls to return. This change creates a custom HTTP client with a 15s timeout. The original client has a default 3s timeout. Resolves #68879 Release justification: non-production code change Release note: None --- pkg/ccl/serverccl/tenant_grpc_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/serverccl/tenant_grpc_test.go b/pkg/ccl/serverccl/tenant_grpc_test.go index aa8e66aa9f05..c65afd779dc0 100644 --- a/pkg/ccl/serverccl/tenant_grpc_test.go +++ b/pkg/ccl/serverccl/tenant_grpc_test.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -38,6 +39,7 @@ func TestTenantGRPCServices(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + httpClient := httputil.NewClientWithTimeout(15 * time.Second) serverParams, _ := tests.CreateTestServerParams() testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ @@ -77,7 +79,7 @@ func TestTenantGRPCServices(t *testing.T) { }) t.Run("gRPC Gateway is running", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -98,7 +100,7 @@ func TestTenantGRPCServices(t *testing.T) { defer connTenant2.Close() t.Run("statements endpoint fans out request to multiple pods", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -115,7 +117,7 @@ func TestTenantGRPCServices(t *testing.T) { defer connTenant3.Close() t.Run("fanout of statements endpoint is segregated by tenant", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -160,7 +162,7 @@ func TestTenantGRPCServices(t *testing.T) { }) t.Run("sessions endpoint is available", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions") + resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) require.Equal(t, 200, resp.StatusCode) From 9d7c9d735aa57e1bafa90272ed2eca8a420549f2 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Thu, 26 Aug 2021 15:50:11 -0500 Subject: [PATCH 2/3] build: point people to `dev generate bazel` over `make bazel-generate` These two methods for generating Bazel files do the exact same thing, but making this change might train people to use `dev` a little bit. Release justification: Non-production code change Release note: None --- Makefile | 12 ++++-------- WORKSPACE | 2 +- build/README.md | 2 +- build/bazelutil/bazel-generate.sh | 5 +++++ build/teamcity-check-genfiles.sh | 2 +- pkg/cmd/dev/generate.go | 3 +-- pkg/cmd/dev/io/exec/exec.go | 6 ------ 7 files changed, 13 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index ef1c06ae54cb..3cdbbbd94931 100644 --- a/Makefile +++ b/Makefile @@ -1785,14 +1785,10 @@ fuzz: ## Run fuzz tests. fuzz: bin/fuzz bin/fuzz $(TESTFLAGS) -tests $(TESTS) -timeout $(TESTTIMEOUT) $(PKG) -# Short hand to re-generate all bazel BUILD files. -# -# Even with --symlink_prefix, some sub-command somewhere hardcodes the -# creation of a "bazel-out" symlink. This bazel-out symlink can only -# be blocked by the existence of a file before the bazel command is -# invoked. For now, this is left as an exercise for the user. -# -bazel-generate: ## Generate all bazel BUILD files. +# Short hand to re-generate all bazel BUILD files. (Does the same thing as +# `./dev generate bazel`.) +.PHONY: bazel-generate +bazel-generate: @echo 'Generating DEPS.bzl and BUILD files using gazelle' ./build/bazelutil/bazel-generate.sh diff --git a/WORKSPACE b/WORKSPACE index 81f06d954a40..8e6623aefef9 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -34,7 +34,7 @@ git_repository( # Load up cockroachdb's go dependencies (the ones listed under go.mod). The # `DEPS.bzl` file is kept up to date using the `update-repos` Gazelle command -# (see `make bazel-generate`). +# (see `build/bazelutil/bazel-generate.sh`). # # gazelle:repository_macro DEPS.bzl%go_deps load("//:DEPS.bzl", "go_deps") diff --git a/build/README.md b/build/README.md index 07aae554a0f2..e5bc8047b828 100644 --- a/build/README.md +++ b/build/README.md @@ -160,7 +160,7 @@ is missing, ensure it is used in code. This can be a blank dependency, e.g. `import _ "golang.org/api/compute/v1"`. These changes must then be committed in the submodule directory (see [Working with Submodules](#working-with-submodules)). -Finally, run `make bazel-generate` to regenerate `DEPS.bzl` with the updated Go dependency information. +Finally, run `./dev generate bazel` to regenerate `DEPS.bzl` with the updated Go dependency information. Programs can then be run using `go build ...` or `go test ...`. diff --git a/build/bazelutil/bazel-generate.sh b/build/bazelutil/bazel-generate.sh index 7573ebcb2d90..06fc3be84cd2 100755 --- a/build/bazelutil/bazel-generate.sh +++ b/build/bazelutil/bazel-generate.sh @@ -2,6 +2,11 @@ set -exuo pipefail +# Even with --symlink_prefix, some sub-command somewhere hardcodes the +# creation of a "bazel-out" symlink. This bazel-out symlink can only +# be blocked by the existence of a file before the bazel command is +# invoked. For now, this is left as an exercise for the user. + bazel run //:gazelle -- update-repos -from_file=go.mod -build_file_proto_mode=disable_global -to_macro=DEPS.bzl%go_deps -prune=true bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && " > pkg/BUILD.bazel bazel run //:gazelle diff --git a/build/teamcity-check-genfiles.sh b/build/teamcity-check-genfiles.sh index bc6f0b6755e4..94aee64134bf 100755 --- a/build/teamcity-check-genfiles.sh +++ b/build/teamcity-check-genfiles.sh @@ -30,7 +30,7 @@ rm artifacts/buildshort.log TEAMCITY_BAZEL_SUPPORT_GENERATE=1 # See teamcity-bazel-support.sh. run run_bazel build/bazelutil/bazel-generate.sh &> artifacts/buildshort.log || (cat artifacts/buildshort.log && false) rm artifacts/buildshort.log -check_clean "Run \`make bazel-generate\` to automatically regenerate these." +check_clean "Run \`./dev generate bazel\` to automatically regenerate these." run build/builder.sh make generate &> artifacts/generate.log || (cat artifacts/generate.log && false) rm artifacts/generate.log check_clean "Run \`make generate\` to automatically regenerate these." diff --git a/pkg/cmd/dev/generate.go b/pkg/cmd/dev/generate.go index df7623a9bec2..50dc7f8ca522 100644 --- a/pkg/cmd/dev/generate.go +++ b/pkg/cmd/dev/generate.go @@ -80,8 +80,7 @@ func (d *dev) generateBazel(cmd *cobra.Command) error { if err != nil { return err } - _, err = d.exec.CommandContext(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh")) - return err + return d.exec.CommandContextInheritingStdStreams(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh")) } func (d *dev) generateDocs(cmd *cobra.Command) error { diff --git a/pkg/cmd/dev/io/exec/exec.go b/pkg/cmd/dev/io/exec/exec.go index eabf2b8206dd..ea44aa082fc9 100644 --- a/pkg/cmd/dev/io/exec/exec.go +++ b/pkg/cmd/dev/io/exec/exec.go @@ -80,12 +80,6 @@ func WithRecording(r *recording.Recording) func(e *Exec) { } } -// CommandContext wraps around exec.CommandContext, executing the named program -// with the given arguments. -func (e *Exec) CommandContext(ctx context.Context, name string, args ...string) ([]byte, error) { - return e.commandContextImpl(ctx, nil, false, name, args...) -} - // CommandContextSilent is like CommandContext, but does not take over // stdout/stderr. It's to be used for "internal" operations. func (e *Exec) CommandContextSilent( From 6a8467d0404ecdff71253bb521d4508469c1de69 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 25 Aug 2021 11:57:06 -0400 Subject: [PATCH 3/3] admission,goschedstats: reduce scheduler sampling frequency when underloaded The goschestats makes the determination of the tick interval every 1s, and either ticks at 1ms or 250ms. 250ms is used when the cpu is very underloaded. The admission control code disables slot and token enforcement if the tick interval is greater than 1ms. This is done since the reduced frequency of CPULoad could cause us to not adjust slots fast enough. Fixes #66881 Release justification: Fix for high-priority issue in new functionality. Release note: None --- pkg/util/admission/granter.go | 81 ++++++++++----- pkg/util/admission/granter_test.go | 13 ++- pkg/util/admission/testdata/granter | 80 +++++++++++++++ pkg/util/goschedstats/BUILD.bazel | 6 +- pkg/util/goschedstats/runnable.go | 137 +++++++++++++++++-------- pkg/util/goschedstats/runnable_test.go | 77 ++++++++++++++ 6 files changed, 324 insertions(+), 70 deletions(-) diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index fcc2d3c1abd6..d4dea88783ee 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -382,6 +382,7 @@ type tokenGranter struct { requester requester availableBurstTokens int maxBurstTokens int + skipTokenEnforcement bool // Optional. Practically, both uses of tokenGranter, for SQLKVResponseWork // and SQLSQLResponseWork have a non-nil value. We don't expect to use // memory overload indicators here since memory accounting and disk spilling @@ -396,8 +397,9 @@ func (tg *tokenGranter) getPairedRequester() requester { return tg.requester } -func (tg *tokenGranter) refillBurstTokens() { +func (tg *tokenGranter) refillBurstTokens(skipTokenEnforcement bool) { tg.availableBurstTokens = tg.maxBurstTokens + tg.skipTokenEnforcement = skipTokenEnforcement } func (tg *tokenGranter) grantKind() grantKind { @@ -412,7 +414,7 @@ func (tg *tokenGranter) tryGetLocked() grantResult { if tg.cpuOverload != nil && tg.cpuOverload.isOverloaded() { return grantFailDueToSharedResource } - if tg.availableBurstTokens > 0 { + if tg.availableBurstTokens > 0 || tg.skipTokenEnforcement { tg.availableBurstTokens-- return grantSuccess } @@ -446,10 +448,12 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) { // KVWork, that are limited by slots (CPU bound work) and/or tokens (IO // bound work). type kvGranter struct { - coord *GrantCoordinator - requester requester - usedSlots int - totalSlots int + coord *GrantCoordinator + requester requester + usedSlots int + totalSlots int + skipSlotEnforcement bool + ioTokensEnabled bool // There is no rate limiting in granting these tokens. That is, they are all // burst tokens. @@ -478,7 +482,7 @@ func (sg *kvGranter) tryGet() bool { } func (sg *kvGranter) tryGetLocked() grantResult { - if sg.usedSlots < sg.totalSlots { + if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement { if !sg.ioTokensEnabled || sg.availableIOTokens > 0 { sg.usedSlots++ if sg.usedSlotsMetric != nil { @@ -555,8 +559,11 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) { // StoreGrantCoordinators) for KVWork that uses that store. See the // NewGrantCoordinators and NewGrantCoordinatorSQL functions. type GrantCoordinator struct { - settings *cluster.Settings + settings *cluster.Settings + lastCPULoadSamplePeriod time.Duration + // mu is ordered before any mutex acquired in a requester implementation. + // TODO(sumeer): move everything covered by mu into a nested struct. mu syncutil.Mutex // NB: Some granters can be nil. granters [numWorkKinds]granterWithLockedCalls @@ -842,20 +849,38 @@ func (coord *GrantCoordinator) GetWorkQueue(workKind WorkKind) *WorkQueue { return coord.queues[workKind].(*WorkQueue) } -// CPULoad implements CPULoadListener and is called every 1ms. The same -// frequency is used for refilling the burst tokens since synchronizing the -// two means that the refilled burst can take into account the latest -// schedulers stats (indirectly, via the implementation of -// cpuOverloadIndicator). -// TODO(sumeer): after experimentation, possibly generalize the 1ms ticks used -// for CPULoad. -func (coord *GrantCoordinator) CPULoad(runnable int, procs int) { +// CPULoad implements CPULoadListener and is called periodically (see +// CPULoadListener for details). The same frequency is used for refilling the +// burst tokens since synchronizing the two means that the refilled burst can +// take into account the latest schedulers stats (indirectly, via the +// implementation of cpuOverloadIndicator). +func (coord *GrantCoordinator) CPULoad(runnable int, procs int, samplePeriod time.Duration) { + if coord.lastCPULoadSamplePeriod != 0 && coord.lastCPULoadSamplePeriod != samplePeriod && + KVAdmissionControlEnabled.Get(&coord.settings.SV) { + log.Infof(context.Background(), "CPULoad switching to period %s", samplePeriod.String()) + } + coord.lastCPULoadSamplePeriod = samplePeriod + coord.mu.Lock() defer coord.mu.Unlock() coord.numProcs = procs - coord.cpuLoadListener.CPULoad(runnable, procs) - coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens() - coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens() + coord.cpuLoadListener.CPULoad(runnable, procs, samplePeriod) + + // Slot adjustment and token refilling requires 1ms periods to work well. If + // the CPULoad ticks are less frequent, there is no guarantee that the + // tokens or slots will be sufficient to service requests. This is + // particularly the case for slots where we dynamically adjust them, and + // high contention can suddenly result in high slot utilization even while + // cpu utilization stays low. We don't want to artificially bottleneck + // request processing when we are in this slow CPULoad ticks regime since we + // can't adjust slots or refill tokens fast enough. So we explicitly tell + // the granters to not do token or slot enforcement. + skipEnforcement := samplePeriod > time.Millisecond + coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement) + coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement) + if coord.granters[KVWork] != nil { + coord.granters[KVWork].(*kvGranter).skipSlotEnforcement = skipEnforcement + } if coord.grantChainActive && !coord.tryTerminateGrantChain() { return } @@ -1264,12 +1289,11 @@ type cpuOverloadIndicator interface { } // CPULoadListener listens to the latest CPU load information. Currently we -// expect this to be called every 1ms. -// TODO(sumeer): experiment with more smoothing. It is possible that rapid -// slot fluctuation may be resulting in under-utilization at a time scale that -// is not observable at our metrics frequency. +// expect this to be called every 1ms, unless the cpu is extremely +// underloaded. If the samplePeriod is > 1ms, admission control enforcement +// for CPU is disabled. type CPULoadListener interface { - CPULoad(runnable int, procs int) + CPULoad(runnable int, procs int, samplePeriod time.Duration) } // kvSlotAdjuster is an implementer of CPULoadListener and @@ -1293,8 +1317,9 @@ type kvSlotAdjuster struct { var _ cpuOverloadIndicator = &kvSlotAdjuster{} var _ CPULoadListener = &kvSlotAdjuster{} -func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) { +func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) { threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV)) + // Simple heuristic, which worked ok in experiments. More sophisticated ones // could be devised. if runnable >= threshold*procs { @@ -1332,7 +1357,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) { } func (kvsa *kvSlotAdjuster) isOverloaded() bool { - return kvsa.granter.usedSlots >= kvsa.granter.totalSlots + return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement } // sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator @@ -1559,7 +1584,9 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) { var _ cpuOverloadIndicator = &sqlNodeCPUOverloadIndicator{} var _ CPULoadListener = &sqlNodeCPUOverloadIndicator{} -func (sn *sqlNodeCPUOverloadIndicator) CPULoad(runnable int, procs int) { +func (sn *sqlNodeCPUOverloadIndicator) CPULoad( + runnable int, procs int, samplePeriod time.Duration, +) { } func (sn *sqlNodeCPUOverloadIndicator) isOverloaded() bool { diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index b6c694385b7e..789bfbcd029c 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -16,6 +16,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -83,7 +84,7 @@ func (tr *testRequester) getAdmittedCount() uint64 { // return-grant work= // took-without-permission work= // continue-grant-chain work= -// cpu-load runnable= procs= +// cpu-load runnable= procs= [infrequent=] // set-io-tokens tokens= func TestGranterBasic(t *testing.T) { defer leaktest.AfterTest(t)() @@ -159,7 +160,15 @@ func TestGranterBasic(t *testing.T) { var runnable, procs int d.ScanArgs(t, "runnable", &runnable) d.ScanArgs(t, "procs", &procs) - coord.CPULoad(runnable, procs) + infrequent := false + if d.HasArg("infrequent") { + d.ScanArgs(t, "infrequent", &infrequent) + } + samplePeriod := time.Millisecond + if infrequent { + samplePeriod = 250 * time.Millisecond + } + coord.CPULoad(runnable, procs, samplePeriod) return flushAndReset() case "set-io-tokens": diff --git a/pkg/util/admission/testdata/granter b/pkg/util/admission/testdata/granter index 6edb98551d2a..c68edae0ceb8 100644 --- a/pkg/util/admission/testdata/granter +++ b/pkg/util/admission/testdata/granter @@ -366,3 +366,83 @@ kv: granted in chain 0, and returning true GrantCoordinator: (chain: id: 6 active: false index: 0) kv: used: 3, total: 3 io-avail: 0 sql-kv-response: avail: 0 sql-sql-response: avail: 1 sql-leaf-start: used: 2, total: 2 sql-root-start: used: 1, total: 1 + +##################################################################### +# Test skipping of enforcements when CPULoad has high sampling period. +init-grant-coordinator min-cpu=1 max-cpu=3 sql-kv-tokens=1 sql-sql-tokens=1 sql-leaf=2 sql-root=2 +---- +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 0, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# No more slots after this slot is granted. +try-get work=kv +---- +kv: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# Since no more KV slots, cannot grant token to sql-kv-response. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned false +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# Since no more KV slots, cannot grant token to sql-sql-response. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned false +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# CPULoad shows overload, so cannot increase KV slots, but since it is +# infrequent, slot and token enforcement is disabled. +cpu-load runnable=20 procs=1 infrequent=true +---- +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-kv-response can get a token. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: 0 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-kv-response can get another token, even though tokens are exhausted. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-sql-response can get a token. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: 0 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-sql-response can get another token, even though tokens are exhausted. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: -1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# KV can get another slot even though slots are exhausted. +try-get work=kv +---- +kv: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 2, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: -1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 diff --git a/pkg/util/goschedstats/BUILD.bazel b/pkg/util/goschedstats/BUILD.bazel index 24c4ed5d81e0..32f01d9d7672 100644 --- a/pkg/util/goschedstats/BUILD.bazel +++ b/pkg/util/goschedstats/BUILD.bazel @@ -19,5 +19,9 @@ go_test( name = "goschedstats_test", srcs = ["runnable_test.go"], embed = [":goschedstats"], - deps = ["//pkg/testutils"], + deps = [ + "//pkg/testutils", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], ) diff --git a/pkg/util/goschedstats/runnable.go b/pkg/util/goschedstats/runnable.go index 19b463184512..9baa03f317eb 100644 --- a/pkg/util/goschedstats/runnable.go +++ b/pkg/util/goschedstats/runnable.go @@ -54,8 +54,22 @@ func RecentNormalizedRunnableGoroutines() float64 { // will have to add a new version of that file. var _ = numRunnableGoroutines -// We sample the number of runnable goroutines once per samplePeriod. -const samplePeriod = time.Millisecond +// We sample the number of runnable goroutines once per samplePeriodShort or +// samplePeriodLong (if the system is underloaded). Using samplePeriodLong can +// cause sluggish response to a load spike, from the perspective of +// RunnableCountCallback implementers (admission control), so it is not ideal. +// We support this behavior only because we have observed 5-10% of cpu +// utilization on CockroachDB nodes that are doing no other work, even though +// 1ms polling (samplePeriodShort) is extremely cheap. The cause may be a poor +// interaction with processor idle state +// https://github.com/golang/go/issues/30740#issuecomment-471634471. See +// #66881. +const samplePeriodShort = time.Millisecond +const samplePeriodLong = 250 * time.Millisecond + +// The system is underloaded if the number of runnable goroutines per proc +// is below this threshold. +const underloadedRunnablePerProcThreshold = 1 * toFixedPoint // We "report" the average value every reportingPeriod. // Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines() @@ -78,8 +92,8 @@ var total uint64 var ewma uint64 // RunnableCountCallback is provided the current value of runnable goroutines, -// and GOMAXPROCS. -type RunnableCountCallback func(numRunnable int, numProcs int) +// GOMAXPROCS, and the current sampling period. +type RunnableCountCallback func(numRunnable int, numProcs int, samplePeriod time.Duration) type callbackWithID struct { RunnableCountCallback @@ -95,7 +109,8 @@ var callbackInfo struct { } // RegisterRunnableCountCallback registers a callback to be run with the -// runnable and procs info every 1ms. This is exclusively for use by admission +// runnable and procs info, every 1ms, unless cpu load is very low (see the +// commentary for samplePeriodShort). This is exclusively for use by admission // control that wants to react extremely quickly to cpu changes. Past // experience in other systems (not CockroachDB) motivated not consuming a // smoothed signal for admission control. The CockroachDB setting may possibly @@ -140,48 +155,90 @@ func UnregisterRunnableCountCallback(id int64) { func init() { go func() { - lastTime := timeutil.Now() - // sum accumulates the sum of the number of runnable goroutines per CPU, - // multiplied by toFixedPoint, for all samples since the last reporting. - var sum uint64 - var numSamples int - - ticker := time.NewTicker(samplePeriod) - // We keep local versions of "total" and "ewma" and we just Store the - // updated values to the globals. - var localTotal, localEWMA uint64 + sst := schedStatsTicker{ + lastTime: timeutil.Now(), + curPeriod: samplePeriodShort, + numRunnableGoroutines: numRunnableGoroutines, + } + ticker := time.NewTicker(sst.curPeriod) for { t := <-ticker.C - if t.Sub(lastTime) > reportingPeriod { - if numSamples > 0 { - // We want the average value over the reporting period, so we divide - // by numSamples. - newValue := sum / uint64(numSamples) - localTotal += newValue - atomic.StoreUint64(&total, localTotal) - - // ewma(t) = c * value(t) + (1 - c) * ewma(t-1) - // We use c = 0.5. - localEWMA = (newValue + localEWMA) / 2 - atomic.StoreUint64(&ewma, localEWMA) - } - lastTime = t - sum = 0 - numSamples = 0 - } - runnable, numProcs := numRunnableGoroutines() callbackInfo.mu.Lock() cbs := callbackInfo.cbs callbackInfo.mu.Unlock() - for i := range cbs { - cbs[i].RunnableCountCallback(runnable, numProcs) - } - // The value of the sample is the ratio of runnable to numProcs (scaled - // for fixed-point arithmetic). - sum += uint64(runnable) * toFixedPoint / uint64(numProcs) - numSamples++ + sst.getStatsOnTick(t, cbs, ticker) } }() } +// timeTickerInterface abstracts time.Ticker for testing. +type timeTickerInterface interface { + Reset(d time.Duration) +} + +// schedStatsTicker contains the local state maintained across stats collection +// ticks. +type schedStatsTicker struct { + lastTime time.Time + curPeriod time.Duration + numRunnableGoroutines func() (numRunnable int, numProcs int) + // sum accumulates the sum of the number of runnable goroutines per CPU, + // multiplied by toFixedPoint, for all samples since the last reporting. + sum uint64 + // numSamples is the number of samples since the last reporting. + numSamples int + // We keep local versions of "total" and "ewma" and we just Store the + // updated values to the globals. + localTotal, localEWMA uint64 +} + +// getStatsOnTick gets scheduler stats as the ticker has ticked. +func (s *schedStatsTicker) getStatsOnTick( + t time.Time, cbs []callbackWithID, ticker timeTickerInterface, +) { + if t.Sub(s.lastTime) > reportingPeriod { + var avgValue uint64 + if s.numSamples > 0 { + // We want the average value over the reporting period, so we divide + // by numSamples. + avgValue = s.sum / uint64(s.numSamples) + s.localTotal += avgValue + atomic.StoreUint64(&total, s.localTotal) + + // ewma(t) = c * value(t) + (1 - c) * ewma(t-1) + // We use c = 0.5. + s.localEWMA = (avgValue + s.localEWMA) / 2 + atomic.StoreUint64(&ewma, s.localEWMA) + } + nextPeriod := samplePeriodShort + // Both the mean over the last 1s, and the exponentially weighted average + // must be low for the system to be considered underloaded. + if avgValue < underloadedRunnablePerProcThreshold && + s.localEWMA < underloadedRunnablePerProcThreshold { + // Underloaded, so switch to longer sampling period. + nextPeriod = samplePeriodLong + } + // We switch the sample period only at reportingPeriod boundaries + // since it ensures that all samples contributing to a reporting + // period were at equal intervals (this is desirable since we average + // them). It also naturally reduces the frequency at which we reset a + // ticker. + if nextPeriod != s.curPeriod { + ticker.Reset(nextPeriod) + s.curPeriod = nextPeriod + } + s.lastTime = t + s.sum = 0 + s.numSamples = 0 + } + runnable, numProcs := s.numRunnableGoroutines() + for i := range cbs { + cbs[i].RunnableCountCallback(runnable, numProcs, s.curPeriod) + } + // The value of the sample is the ratio of runnable to numProcs (scaled + // for fixed-point arithmetic). + s.sum += uint64(runnable) * toFixedPoint / uint64(numProcs) + s.numSamples++ +} + var _ = RecentNormalizedRunnableGoroutines diff --git a/pkg/util/goschedstats/runnable_test.go b/pkg/util/goschedstats/runnable_test.go index b9f32fac80fe..41d72ff6ed5b 100644 --- a/pkg/util/goschedstats/runnable_test.go +++ b/pkg/util/goschedstats/runnable_test.go @@ -14,8 +14,11 @@ import ( "fmt" "runtime" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" ) func TestNumRunnableGoroutines(t *testing.T) { @@ -39,3 +42,77 @@ func TestNumRunnableGoroutines(t *testing.T) { return nil }) } + +type testTimeTicker struct { + numResets int + lastResetDuration time.Duration +} + +func (t *testTimeTicker) Reset(d time.Duration) { + t.numResets++ + t.lastResetDuration = d +} + +func TestSchedStatsTicker(t *testing.T) { + runnable := 0 + numRunnable := func() (numRunnable int, numProcs int) { + return runnable, 1 + } + var callbackSamplePeriod time.Duration + var numCallbacks int + cb := func(numRunnable int, numProcs int, samplePeriod time.Duration) { + require.Equal(t, runnable, numRunnable) + require.Equal(t, 1, numProcs) + callbackSamplePeriod = samplePeriod + numCallbacks++ + } + cbs := []callbackWithID{{cb, 0}} + now := timeutil.UnixEpoch + startTime := now + sst := schedStatsTicker{ + lastTime: now, + curPeriod: samplePeriodShort, + numRunnableGoroutines: numRunnable, + } + tt := testTimeTicker{} + // Tick every 1ms until the reportingPeriod has elapsed. + for i := 1; ; i++ { + now = now.Add(samplePeriodShort) + sst.getStatsOnTick(now, cbs, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, i, numCallbacks) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) + } else { + break + } + } + // Since underloaded, the time ticker is reset to samplePeriodLong, and this + // period is provided to the latest callback. + require.Equal(t, 1, tt.numResets) + require.Equal(t, samplePeriodLong, tt.lastResetDuration) + require.Equal(t, samplePeriodLong, callbackSamplePeriod) + // Increase load so no longer underloaded. + runnable = 2 + startTime = now + tt.numResets = 0 + for i := 1; ; i++ { + now = now.Add(samplePeriodLong) + sst.getStatsOnTick(now, cbs, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, samplePeriodLong, callbackSamplePeriod) + } else { + break + } + } + // No longer underloaded, so the time ticker is reset to samplePeriodShort, + // and this period is provided to the latest callback. + require.Equal(t, 1, tt.numResets) + require.Equal(t, samplePeriodShort, tt.lastResetDuration) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) +}