Skip to content

Commit

Permalink
tsdb: expand mem per worker based on sql pool size
Browse files Browse the repository at this point in the history
Previously, the memory limit for all `tsdb` workers was set at a static
64MiB. This cap created issues seen in cockroachdb#24018 where this limit was hit
on a 30 node cluster. To alleviate the issue, the number of workers was
reduced, raising the per-worker allocation.

We've currently hit this limit again as part of load testing with larger
clusters and have decided to make the per-query worker memory limit
dynamic. The per-worker limit is now raised based on the amount of memory
available to the SQL Pool via the `MemoryPoolSize` configuration
variable. This is set to be 25% of the system memory by default. The
`tsdb` memory cap per-worker is now doubled until it reaches `1/128` of
the memory pool setting.

For example, on a node with 128 - 256 GiB of memory, this will
correspond to 512 MiB allocated for all running `tsdb` queries.

In addition, the ts server is now connected to the same `BytesMonitor`
instance as the SQL memory monitor and workers will becapped at double
the query limit. Results are monitored as before but a cap is not
introduced there since we didn't have one present previously.

This behavior is gated behind a private cluster setting that's enabled
by default.

TODO(davidh): Can the tests be faster? They iterate on a server create
TODO(davidh): Is 1/128 a good setting? How do we validate this.

Resolves cockroachdb#72986

Release note (ops change): customers running clusters with 240 nodes or
more can effectively access tsdb metrics.
  • Loading branch information
dhartunian committed Jan 14, 2022
1 parent 72c74dc commit 10f488b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 22 deletions.
18 changes: 11 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper,
)

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)

ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
Expand Down Expand Up @@ -603,6 +596,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
kvMemoryMonitor.Stop(ctx)
}))

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(
cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig,
cfg.MemoryPoolSize, sqlMonitorAndMetrics.rootSQLMemoryMonitor,
cfg.Settings, stopper,
)

storeCfg := kvserver.StoreConfig{
DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(),
Settings: st,
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ func (ts *TestServer) TsDB() *ts.DB {
return nil
}

// TsServer returns the ts.TsServer instance used by the TestServer.
func (ts *TestServer) TsServer() *ts.Server {
if ts != nil {
return ts.tsServer
}
return nil
}

// DB returns the client.DB instance used by the TestServer.
func (ts *TestServer) DB() *kv.DB {
if ts != nil {
Expand Down
69 changes: 54 additions & 15 deletions pkg/ts/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/ts/catalog"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -46,6 +48,16 @@ const (
dumpBatchSize = 100
)

var TSDBAutoMemoryGrowthEnabled = func() *settings.BoolSetting {
s := settings.RegisterBoolSetting(
settings.TenantWritable,
"server.ts.auto_memory_growth.enabled",
"enables or disables automatic growth of memory allocated to the TSDB's query facility based on the SQL memory pool size",
true,
)
return s
}()

// ClusterNodeCountFn is a function that returns the number of nodes active on
// the cluster.
type ClusterNodeCountFn func() int64
Expand Down Expand Up @@ -104,50 +116,66 @@ func MakeServer(
db *DB,
nodeCountFn ClusterNodeCountFn,
cfg ServerConfig,
memoryPoolSize int64,
memoryMonitor *mon.BytesMonitor,
settings *cluster.Settings,
stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
ctx := ambient.AnnotateCtx(context.Background())

// Override default values from configuration.
queryWorkerMax := queryWorkerMax
if cfg.QueryWorkerMax != 0 {
queryWorkerMax = cfg.QueryWorkerMax
}
queryMemoryMax := queryMemoryMax
if TSDBAutoMemoryGrowthEnabled.Get(&settings.SV) {
// Double size until we hit 1/128 of the memory pool setting. Our
// typical default here is 64 MiB which corresponds to a pool of 2 GiB
// which corresponds to 8 GiB of system memory (assuming a default
// setting of 25% for the pool).
if queryMemoryMax < memoryPoolSize/128 {
queryMemoryMax = queryMemoryMax << int(math.Log2((float64(memoryPoolSize)/128)/float64(queryMemoryMax)))
log.Infof(ctx, "ts: setting query memory max to %d bytes", queryMemoryMax)
}
}
if cfg.QueryMemoryMax != 0 {
queryMemoryMax = cfg.QueryMemoryMax
}
workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax))
stopper.AddCloser(workerSem.Closer("stopper"))
return Server{
s := Server{
AmbientContext: ambient,
db: db,
stopper: stopper,
nodeCountFn: nodeCountFn,
workerMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
workerMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-workers",
mon.MemoryResource,
nil,
nil,
// Begin logging messages if we exceed our planned memory usage by
// more than double.
queryMemoryMax*2,
db.st,
memoryMonitor,
),
resultMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
resultMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-results",
mon.MemoryResource,
nil,
nil,
math.MaxInt64,
db.st,
memoryMonitor,
),
queryMemoryMax: queryMemoryMax,
queryWorkerMax: queryWorkerMax,
workerSem: workerSem,
}

s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.workerMemMonitor.Stop(ctx)
}))

s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.resultMemMonitor.Stop(ctx)
}))

return s
}

// RegisterService registers the GRPC service.
Expand Down Expand Up @@ -446,3 +474,14 @@ func dumpTimeseriesAllSources(
}
return nil
}

// GetQueryWorkerMax is used by tests to verify the memory caps.
func (s *Server) GetQueryWorkerMax() int64 {
return s.queryMemoryMax
}

// GetQueryMemoryMax returns the soft memory limit on all running
// queries.
func (s *Server) GetQueryMemoryMax() int64 {
return s.queryMemoryMax
}
27 changes: 27 additions & 0 deletions pkg/ts/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,33 @@ func TestServerQueryStarvation(t *testing.T) {
}
}

func TestServerMemoryCap(t *testing.T) {
defer leaktest.AfterTest(t)()

MiB := 1024 * 1024
GiB := MiB * 1024
tcs := []struct {
poolSize int64
expectedTsDBWorkerMax int64
}{
{poolSize: int64(2 * GiB), expectedTsDBWorkerMax: int64(64 * MiB)},
{poolSize: int64(32 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)},
{poolSize: int64(48 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)},
{poolSize: int64(64 * GiB), expectedTsDBWorkerMax: int64(512 * MiB)},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("%d pool should have %d worker max memory", tc.poolSize, tc.expectedTsDBWorkerMax),
func(t *testing.T) {
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{SQLMemoryPoolSize: tc.poolSize})
defer s.Stopper().Stop(context.Background())

tsServer := s.(*server.TestServer).TsServer()
require.Equal(t, tc.expectedTsDBWorkerMax, tsServer.GetQueryWorkerMax())
})
}
}

// TestServerQueryMemoryManagement verifies that queries succeed under
// constrained memory requirements.
func TestServerQueryMemoryManagement(t *testing.T) {
Expand Down

0 comments on commit 10f488b

Please sign in to comment.