Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69379: admission,goschedstats: reduce scheduler sampling frequency when unde… r=sumeerbhola a=sumeerbhola

…rloaded

The kvSlotAdjuster keeps track of a history of CPULoad ticks
to decide whether sufficient fraction of that history was
underloaded or not. If underloaded we disable slot and token
enforcement in kvGranter and tokenGranter since the reduced
frequency of CPULoad could cause us to not adjust slots fast
enough. This information is fed back to goschedstats so that
it can adjust the period of the ticker.

Fixes #66881

Release justification: Fix for high-priority issue in new
functionality.

Release note: None

69438: build: point people to `dev generate bazel` over `make bazel-generate` r=rail a=rickystewart

These two methods for generating Bazel files do the exact same thing,
but making this change might train people to use `dev` a little bit.

Release justification: Non-production code change
Release note: None

69494: ccl/serverccl: increase http client timeout in tests r=dhartunian a=dhartunian

Tests run under stress would sometimes timeout waiting on HTTP calls to
return. This change creates a custom HTTP client with a 15s timeout. The
original client has a default 3s timeout.

Resolves #68879

Release justification: non-production code change

Release note: None

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
  • Loading branch information
4 people committed Aug 30, 2021
4 parents b78f3fc + 6a8467d + 9d7c9d7 + a9b0283 commit cb6889c
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 93 deletions.
12 changes: 4 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1785,14 +1785,10 @@ fuzz: ## Run fuzz tests.
fuzz: bin/fuzz
bin/fuzz $(TESTFLAGS) -tests $(TESTS) -timeout $(TESTTIMEOUT) $(PKG)

# Short hand to re-generate all bazel BUILD files.
#
# Even with --symlink_prefix, some sub-command somewhere hardcodes the
# creation of a "bazel-out" symlink. This bazel-out symlink can only
# be blocked by the existence of a file before the bazel command is
# invoked. For now, this is left as an exercise for the user.
#
bazel-generate: ## Generate all bazel BUILD files.
# Short hand to re-generate all bazel BUILD files. (Does the same thing as
# `./dev generate bazel`.)
.PHONY: bazel-generate
bazel-generate:
@echo 'Generating DEPS.bzl and BUILD files using gazelle'
./build/bazelutil/bazel-generate.sh

Expand Down
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ git_repository(

# Load up cockroachdb's go dependencies (the ones listed under go.mod). The
# `DEPS.bzl` file is kept up to date using the `update-repos` Gazelle command
# (see `make bazel-generate`).
# (see `build/bazelutil/bazel-generate.sh`).
#
# gazelle:repository_macro DEPS.bzl%go_deps
load("//:DEPS.bzl", "go_deps")
Expand Down
2 changes: 1 addition & 1 deletion build/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ is missing, ensure it is used in code. This can be a blank dependency, e.g.
`import _ "golang.org/api/compute/v1"`. These changes must then be committed in the submodule directory
(see [Working with Submodules](#working-with-submodules)).

Finally, run `make bazel-generate` to regenerate `DEPS.bzl` with the updated Go dependency information.
Finally, run `./dev generate bazel` to regenerate `DEPS.bzl` with the updated Go dependency information.

Programs can then be run using `go build ...` or `go test ...`.

Expand Down
5 changes: 5 additions & 0 deletions build/bazelutil/bazel-generate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

set -exuo pipefail

# Even with --symlink_prefix, some sub-command somewhere hardcodes the
# creation of a "bazel-out" symlink. This bazel-out symlink can only
# be blocked by the existence of a file before the bazel command is
# invoked. For now, this is left as an exercise for the user.

bazel run //:gazelle -- update-repos -from_file=go.mod -build_file_proto_mode=disable_global -to_macro=DEPS.bzl%go_deps -prune=true
bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && " > pkg/BUILD.bazel
bazel run //:gazelle
2 changes: 1 addition & 1 deletion build/teamcity-check-genfiles.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rm artifacts/buildshort.log
TEAMCITY_BAZEL_SUPPORT_GENERATE=1 # See teamcity-bazel-support.sh.
run run_bazel build/bazelutil/bazel-generate.sh &> artifacts/buildshort.log || (cat artifacts/buildshort.log && false)
rm artifacts/buildshort.log
check_clean "Run \`make bazel-generate\` to automatically regenerate these."
check_clean "Run \`./dev generate bazel\` to automatically regenerate these."
run build/builder.sh make generate &> artifacts/generate.log || (cat artifacts/generate.log && false)
rm artifacts/generate.log
check_clean "Run \`make generate\` to automatically regenerate these."
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/serverccl/tenant_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -38,6 +39,7 @@ func TestTenantGRPCServices(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
httpClient := httputil.NewClientWithTimeout(15 * time.Second)

serverParams, _ := tests.CreateTestServerParams()
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestTenantGRPCServices(t *testing.T) {
})

t.Run("gRPC Gateway is running", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements")
resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
Expand All @@ -98,7 +100,7 @@ func TestTenantGRPCServices(t *testing.T) {
defer connTenant2.Close()

t.Run("statements endpoint fans out request to multiple pods", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements")
resp, err := httpClient.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
Expand All @@ -115,7 +117,7 @@ func TestTenantGRPCServices(t *testing.T) {
defer connTenant3.Close()

t.Run("fanout of statements endpoint is segregated by tenant", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements")
resp, err := httpClient.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
Expand Down Expand Up @@ -160,7 +162,7 @@ func TestTenantGRPCServices(t *testing.T) {
})

t.Run("sessions endpoint is available", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions")
resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/dev/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (d *dev) generateBazel(cmd *cobra.Command) error {
if err != nil {
return err
}
_, err = d.exec.CommandContext(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh"))
return err
return d.exec.CommandContextInheritingStdStreams(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh"))
}

func (d *dev) generateDocs(cmd *cobra.Command) error {
Expand Down
6 changes: 0 additions & 6 deletions pkg/cmd/dev/io/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func WithRecording(r *recording.Recording) func(e *Exec) {
}
}

// CommandContext wraps around exec.CommandContext, executing the named program
// with the given arguments.
func (e *Exec) CommandContext(ctx context.Context, name string, args ...string) ([]byte, error) {
return e.commandContextImpl(ctx, nil, false, name, args...)
}

// CommandContextSilent is like CommandContext, but does not take over
// stdout/stderr. It's to be used for "internal" operations.
func (e *Exec) CommandContextSilent(
Expand Down
81 changes: 54 additions & 27 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type tokenGranter struct {
requester requester
availableBurstTokens int
maxBurstTokens int
skipTokenEnforcement bool
// Optional. Practically, both uses of tokenGranter, for SQLKVResponseWork
// and SQLSQLResponseWork have a non-nil value. We don't expect to use
// memory overload indicators here since memory accounting and disk spilling
Expand All @@ -396,8 +397,9 @@ func (tg *tokenGranter) getPairedRequester() requester {
return tg.requester
}

func (tg *tokenGranter) refillBurstTokens() {
func (tg *tokenGranter) refillBurstTokens(skipTokenEnforcement bool) {
tg.availableBurstTokens = tg.maxBurstTokens
tg.skipTokenEnforcement = skipTokenEnforcement
}

func (tg *tokenGranter) grantKind() grantKind {
Expand All @@ -412,7 +414,7 @@ func (tg *tokenGranter) tryGetLocked() grantResult {
if tg.cpuOverload != nil && tg.cpuOverload.isOverloaded() {
return grantFailDueToSharedResource
}
if tg.availableBurstTokens > 0 {
if tg.availableBurstTokens > 0 || tg.skipTokenEnforcement {
tg.availableBurstTokens--
return grantSuccess
}
Expand Down Expand Up @@ -446,10 +448,12 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
// KVWork, that are limited by slots (CPU bound work) and/or tokens (IO
// bound work).
type kvGranter struct {
coord *GrantCoordinator
requester requester
usedSlots int
totalSlots int
coord *GrantCoordinator
requester requester
usedSlots int
totalSlots int
skipSlotEnforcement bool

ioTokensEnabled bool
// There is no rate limiting in granting these tokens. That is, they are all
// burst tokens.
Expand Down Expand Up @@ -478,7 +482,7 @@ func (sg *kvGranter) tryGet() bool {
}

func (sg *kvGranter) tryGetLocked() grantResult {
if sg.usedSlots < sg.totalSlots {
if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement {
if !sg.ioTokensEnabled || sg.availableIOTokens > 0 {
sg.usedSlots++
if sg.usedSlotsMetric != nil {
Expand Down Expand Up @@ -555,8 +559,11 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) {
// StoreGrantCoordinators) for KVWork that uses that store. See the
// NewGrantCoordinators and NewGrantCoordinatorSQL functions.
type GrantCoordinator struct {
settings *cluster.Settings
settings *cluster.Settings
lastCPULoadSamplePeriod time.Duration

// mu is ordered before any mutex acquired in a requester implementation.
// TODO(sumeer): move everything covered by mu into a nested struct.
mu syncutil.Mutex
// NB: Some granters can be nil.
granters [numWorkKinds]granterWithLockedCalls
Expand Down Expand Up @@ -842,20 +849,38 @@ func (coord *GrantCoordinator) GetWorkQueue(workKind WorkKind) *WorkQueue {
return coord.queues[workKind].(*WorkQueue)
}

// CPULoad implements CPULoadListener and is called every 1ms. The same
// frequency is used for refilling the burst tokens since synchronizing the
// two means that the refilled burst can take into account the latest
// schedulers stats (indirectly, via the implementation of
// cpuOverloadIndicator).
// TODO(sumeer): after experimentation, possibly generalize the 1ms ticks used
// for CPULoad.
func (coord *GrantCoordinator) CPULoad(runnable int, procs int) {
// CPULoad implements CPULoadListener and is called periodically (see
// CPULoadListener for details). The same frequency is used for refilling the
// burst tokens since synchronizing the two means that the refilled burst can
// take into account the latest schedulers stats (indirectly, via the
// implementation of cpuOverloadIndicator).
func (coord *GrantCoordinator) CPULoad(runnable int, procs int, samplePeriod time.Duration) {
if coord.lastCPULoadSamplePeriod != 0 && coord.lastCPULoadSamplePeriod != samplePeriod &&
KVAdmissionControlEnabled.Get(&coord.settings.SV) {
log.Infof(context.Background(), "CPULoad switching to period %s", samplePeriod.String())
}
coord.lastCPULoadSamplePeriod = samplePeriod

coord.mu.Lock()
defer coord.mu.Unlock()
coord.numProcs = procs
coord.cpuLoadListener.CPULoad(runnable, procs)
coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens()
coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens()
coord.cpuLoadListener.CPULoad(runnable, procs, samplePeriod)

// Slot adjustment and token refilling requires 1ms periods to work well. If
// the CPULoad ticks are less frequent, there is no guarantee that the
// tokens or slots will be sufficient to service requests. This is
// particularly the case for slots where we dynamically adjust them, and
// high contention can suddenly result in high slot utilization even while
// cpu utilization stays low. We don't want to artificially bottleneck
// request processing when we are in this slow CPULoad ticks regime since we
// can't adjust slots or refill tokens fast enough. So we explicitly tell
// the granters to not do token or slot enforcement.
skipEnforcement := samplePeriod > time.Millisecond
coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement)
coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement)
if coord.granters[KVWork] != nil {
coord.granters[KVWork].(*kvGranter).skipSlotEnforcement = skipEnforcement
}
if coord.grantChainActive && !coord.tryTerminateGrantChain() {
return
}
Expand Down Expand Up @@ -1264,12 +1289,11 @@ type cpuOverloadIndicator interface {
}

// CPULoadListener listens to the latest CPU load information. Currently we
// expect this to be called every 1ms.
// TODO(sumeer): experiment with more smoothing. It is possible that rapid
// slot fluctuation may be resulting in under-utilization at a time scale that
// is not observable at our metrics frequency.
// expect this to be called every 1ms, unless the cpu is extremely
// underloaded. If the samplePeriod is > 1ms, admission control enforcement
// for CPU is disabled.
type CPULoadListener interface {
CPULoad(runnable int, procs int)
CPULoad(runnable int, procs int, samplePeriod time.Duration)
}

// kvSlotAdjuster is an implementer of CPULoadListener and
Expand All @@ -1293,8 +1317,9 @@ type kvSlotAdjuster struct {
var _ cpuOverloadIndicator = &kvSlotAdjuster{}
var _ CPULoadListener = &kvSlotAdjuster{}

func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) {
func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {
threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV))

// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
if runnable >= threshold*procs {
Expand Down Expand Up @@ -1332,7 +1357,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) {
}

func (kvsa *kvSlotAdjuster) isOverloaded() bool {
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement
}

// sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator
Expand Down Expand Up @@ -1559,7 +1584,9 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
var _ cpuOverloadIndicator = &sqlNodeCPUOverloadIndicator{}
var _ CPULoadListener = &sqlNodeCPUOverloadIndicator{}

func (sn *sqlNodeCPUOverloadIndicator) CPULoad(runnable int, procs int) {
func (sn *sqlNodeCPUOverloadIndicator) CPULoad(
runnable int, procs int, samplePeriod time.Duration,
) {
}

func (sn *sqlNodeCPUOverloadIndicator) isOverloaded() bool {
Expand Down
13 changes: 11 additions & 2 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (tr *testRequester) getAdmittedCount() uint64 {
// return-grant work=<kind>
// took-without-permission work=<kind>
// continue-grant-chain work=<kind>
// cpu-load runnable=<int> procs=<int>
// cpu-load runnable=<int> procs=<int> [infrequent=<bool>]
// set-io-tokens tokens=<int>
func TestGranterBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -159,7 +160,15 @@ func TestGranterBasic(t *testing.T) {
var runnable, procs int
d.ScanArgs(t, "runnable", &runnable)
d.ScanArgs(t, "procs", &procs)
coord.CPULoad(runnable, procs)
infrequent := false
if d.HasArg("infrequent") {
d.ScanArgs(t, "infrequent", &infrequent)
}
samplePeriod := time.Millisecond
if infrequent {
samplePeriod = 250 * time.Millisecond
}
coord.CPULoad(runnable, procs, samplePeriod)
return flushAndReset()

case "set-io-tokens":
Expand Down
Loading

0 comments on commit cb6889c

Please sign in to comment.