Skip to content

Commit

Permalink
tsdb: expand mem per worker based on sql pool size
Browse files Browse the repository at this point in the history
Previously, the memory limit for all `tsdb` workers was set at a static
64MiB. This cap created issues seen in #24018 where this limit was hit
on a 30 node cluster. To alleviate the issue, the number of workers was
reduced, raising the per-worker allocation.

We've currently hit this limit again as part of load testing with larger
clusters and have decided to make the per-query worker memory limit
dynamic.

This PR introduces a new CLI flag `--max-tsdb-memory` to mirror the
functionality of the `--max-sql-memory` flag by accepting bytes or a
percentage of system memory. The default is set to be `1%` of system
memory or 64 MiB, whichever is greater. This ensures that performance
after this PR is equal or better for timeseries queries without eating
too far into memory budgets for SQL.

In addition, the ts server is now connected to the same `BytesMonitor`
instance as the SQL memory monitor and workers will becapped at double
the query limit. Results are monitored as before but a cap is not
introduced there since we didn't have one present previously.

Resolves #72986

Release note (cli change, ops change): A new CLI flag `--max-tsdb-memory`
is now available, that can set the memory budget for timeseries queries
when processing requests from the Metrics page in DB Console. Most
customers should not need to tweak this setting as the default of 1% of
system memory or 64 MiB, whichever is greater, is adequate for most
deployments. In the case where a deployment of hundreds of nodes has
low per-node memory available (below 8 GiB for instance) it may be
necessary to increase this value to `2%` or higher in order to render
timeseries graphs for the cluster using the DB Console. Otherwise, the
default settings will be adequate for the vast majority of deployments.
  • Loading branch information
dhartunian committed Feb 2, 2022
1 parent baeba80 commit 97150df
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 70 deletions.
15 changes: 15 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ percentage of physical memory (e.g. .25). If left unspecified, defaults to 25% o
physical memory.`,
}

TSDBMem = FlagInfo{
Name: "max-tsdb-memory",
Description: `
Maximum memory capacity available to store temporary data for use by the
time-series database to display metrics in the DB Console. Accepts numbers
interpreted as bytes, size suffixes (e.g. 1GB and 1GiB) or a
percentage of physical memory (e.g. 0.01). If left unspecified, defaults to
1% of physical memory or 64MiB whichever is greater. It maybe necessary to
manually increase this value on a cluster with hundreds of nodes where
individual nodes have very limited memory available. This can constrain
the ability of the DB Console to process time-series queries used to render
metrics for the entire cluster. This capacity constraint does not affect
SQL query execution.`,
}

SQLTempStorage = FlagInfo{
Name: "max-disk-temp-storage",
Description: `
Expand Down
11 changes: 11 additions & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -127,6 +128,16 @@ func setServerContextDefaults() {
if bytes, _ := memoryPercentResolver(25); bytes != 0 {
serverCfg.SQLConfig.MemoryPoolSize = bytes
}

// Attempt to set serverCfg.TimeSeriesServerConfig.QueryMemoryMax to
// the default (64MiB) or 1% of system memory, whichever is greater.
if bytes, _ := memoryPercentResolver(1); bytes != 0 {
if bytes > ts.DefaultQueryMemoryMax {
serverCfg.TimeSeriesServerConfig.QueryMemoryMax = bytes
} else {
serverCfg.TimeSeriesServerConfig.QueryMemoryMax = ts.DefaultQueryMemoryMax
}
}
}

// baseCfg points to the base.Config inside serverCfg.
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func init() {
// Engine flags.
varFlag(f, cacheSizeValue, cliflags.Cache)
varFlag(f, sqlSizeValue, cliflags.SQLMem)
varFlag(f, tsdbSizeValue, cliflags.TSDBMem)
// N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after
// the stores flag has been parsed and the storage device that a percentage
// refers to becomes known.
Expand Down Expand Up @@ -986,6 +987,7 @@ func init() {

// Engine flags.
varFlag(f, sqlSizeValue, cliflags.SQLMem)
varFlag(f, tsdbSizeValue, cliflags.TSDBMem)
// N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after
// the stores flag has been parsed and the storage device that a percentage
// refers to becomes known.
Expand Down
88 changes: 49 additions & 39 deletions pkg/cli/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,51 +140,61 @@ func TestClusterNameFlag(t *testing.T) {
}
}

func TestSQLMemoryPoolFlagValue(t *testing.T) {
func TestMemoryPoolFlagValues(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Avoid leaking configuration changes after the test ends.
defer initCLIDefaults()

f := startCmd.Flags()

// Check absolute values.
testCases := []struct {
value string
expected int64
for _, tc := range []struct {
flag string
config *int64
}{
{"100MB", 100 * 1000 * 1000},
{".5GiB", 512 * 1024 * 1024},
{"1.3", 1},
}
for _, c := range testCases {
args := []string{"--max-sql-memory", c.value}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
if c.expected != serverCfg.MemoryPoolSize {
t.Errorf("expected %d, but got %d", c.expected, serverCfg.MemoryPoolSize)
}
}
{flag: "--max-sql-memory", config: &serverCfg.MemoryPoolSize},
{flag: "--max-tsdb-memory", config: &serverCfg.TimeSeriesServerConfig.QueryMemoryMax},
} {
t.Run(tc.flag, func(t *testing.T) {
// Avoid leaking configuration changes after the test ends.
defer initCLIDefaults()

f := startCmd.Flags()

// Check absolute values.
testCases := []struct {
value string
expected int64
}{
{"100MB", 100 * 1000 * 1000},
{".5GiB", 512 * 1024 * 1024},
{"1.3", 1},
}
for _, c := range testCases {
args := []string{tc.flag, c.value}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
if c.expected != *tc.config {
t.Errorf("expected %d, but got %d", c.expected, tc.config)
}
}

for _, c := range []string{".30", "0.3"} {
args := []string{"--max-sql-memory", c}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}
for _, c := range []string{".30", "0.3"} {
args := []string{tc.flag, c}
if err := f.Parse(args); err != nil {
t.Fatal(err)
}

// Check fractional values.
maxMem, err := status.GetTotalMemory(context.Background())
if err != nil {
t.Logf("total memory unknown: %v", err)
return
}
expectedLow := (maxMem * 28) / 100
expectedHigh := (maxMem * 32) / 100
if serverCfg.MemoryPoolSize < expectedLow || serverCfg.MemoryPoolSize > expectedHigh {
t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, serverCfg.MemoryPoolSize)
}
// Check fractional values.
maxMem, err := status.GetTotalMemory(context.Background())
if err != nil {
t.Logf("total memory unknown: %v", err)
return
}
expectedLow := (maxMem * 28) / 100
expectedHigh := (maxMem * 32) / 100
if *tc.config < expectedLow || *tc.config > expectedHigh {
t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, *tc.config)
}
}
})
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func initTraceDir(ctx context.Context, dir string) {
var cacheSizeValue = newBytesOrPercentageValue(&serverCfg.CacheSize, memoryPercentResolver)
var sqlSizeValue = newBytesOrPercentageValue(&serverCfg.MemoryPoolSize, memoryPercentResolver)
var diskTempStorageSizeValue = newBytesOrPercentageValue(nil /* v */, nil /* percentResolver */)
var tsdbSizeValue = newBytesOrPercentageValue(&serverCfg.TimeSeriesServerConfig.QueryMemoryMax, memoryPercentResolver)

func initExternalIODir(ctx context.Context, firstStore base.StoreSpec) (string, error) {
externalIODir := startCtx.externalIODir
Expand Down Expand Up @@ -1089,12 +1090,12 @@ func maybeWarnMemorySizes(ctx context.Context) {

// Check that the total suggested "max" memory is well below the available memory.
if maxMemory, err := status.GetTotalMemory(ctx); err == nil {
requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize
requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize + serverCfg.TimeSeriesServerConfig.QueryMemoryMax
maxRecommendedMem := int64(.75 * float64(maxMemory))
if requestedMem > maxRecommendedMem {
log.Ops.Shoutf(ctx, severity.WARNING,
"the sum of --max-sql-memory (%s) and --cache (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.",
sqlSizeValue, cacheSizeValue, humanizeutil.IBytes(maxRecommendedMem))
"the sum of --max-sql-memory (%s), --cache (%s), and --max-tsdb-memory (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.",
sqlSizeValue, cacheSizeValue, tsdbSizeValue, humanizeutil.IBytes(maxRecommendedMem))
}
}
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper,
)

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)

ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
Expand Down Expand Up @@ -609,6 +602,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
kvMemoryMonitor.Stop(ctx)
}))

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(
cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig,
sqlMonitorAndMetrics.rootSQLMemoryMonitor, stopper,
)

storeCfg := kvserver.StoreConfig{
DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(),
Settings: st,
Expand Down
46 changes: 25 additions & 21 deletions pkg/ts/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ const (
// queryWorkerMax is the default maximum number of worker goroutines that
// the time series server can use to service incoming queries.
queryWorkerMax = 8
// queryMemoryMax is a soft limit for the amount of total memory used by
// time series queries. This is not currently enforced, but is used for
// monitoring purposes.
queryMemoryMax = int64(64 * 1024 * 1024) // 64MiB
// DefaultQueryMemoryMax is a soft limit for the amount of total
// memory used by time series queries. This is not currently enforced,
// but is used for monitoring purposes.
DefaultQueryMemoryMax = int64(64 * 1024 * 1024) // 64MiB
// dumpBatchSize is the number of keys processed in each batch by the dump
// command.
dumpBatchSize = 100
Expand Down Expand Up @@ -104,50 +104,54 @@ func MakeServer(
db *DB,
nodeCountFn ClusterNodeCountFn,
cfg ServerConfig,
memoryMonitor *mon.BytesMonitor,
stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
ctx := ambient.AnnotateCtx(context.Background())

// Override default values from configuration.
queryWorkerMax := queryWorkerMax
if cfg.QueryWorkerMax != 0 {
queryWorkerMax = cfg.QueryWorkerMax
}
queryMemoryMax := queryMemoryMax
if cfg.QueryMemoryMax != 0 {
queryMemoryMax := DefaultQueryMemoryMax
if cfg.QueryMemoryMax > DefaultQueryMemoryMax {
queryMemoryMax = cfg.QueryMemoryMax
}
workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax))
stopper.AddCloser(workerSem.Closer("stopper"))
return Server{
s := Server{
AmbientContext: ambient,
db: db,
stopper: stopper,
nodeCountFn: nodeCountFn,
workerMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
workerMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-workers",
mon.MemoryResource,
nil,
nil,
// Begin logging messages if we exceed our planned memory usage by
// more than double.
queryMemoryMax*2,
db.st,
memoryMonitor,
),
resultMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
resultMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-results",
mon.MemoryResource,
nil,
nil,
math.MaxInt64,
db.st,
memoryMonitor,
),
queryMemoryMax: queryMemoryMax,
queryWorkerMax: queryWorkerMax,
workerSem: workerSem,
}

s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.workerMemMonitor.Stop(ctx)
}))

s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.resultMemMonitor.Stop(ctx)
}))

return s
}

// RegisterService registers the GRPC service.
Expand Down

0 comments on commit 97150df

Please sign in to comment.