Skip to content

Commit

Permalink
tsdb: expand mem per worker based on sql pool size
Browse files Browse the repository at this point in the history
Previously, the memory limit per-`tsdb` worker was set at a static
64MiB. This cap created issues seen in cockroachdb#24018 where this limit was hit
on a 30 node cluster. To alleviate the issue, the number of workers was
reduced.

We've currently hit this limit again as part of load testing with larger
clusters and have decided to make the per-query worker memory limit
dynamic. The per-worker limit is now raised based on the amount of memory
available to the SQL Pool via the `MemoryPoolSize` configuration
variable. This is set to be 25% of the system memory by default. The
`tsdb` memory cap per-worker is now doubled until it reaches `1/128` of
the memory pool setting.

For example, on a node with 128 - 256 GiB of memory, this will
correspond to 512 MiB allocated per worker.

TODO(davidh): Can the tests be faster? They iterate on a server create
TODO(davidh): Is 1/128 a good setting? How do we validate this.
TODO(davidh): Should this behavior be gated behind a feature flag? It's
possible on some memory-constrained deployments a sudden spike in memory
usage by tsdb could cause problems.

Resolves cockroachdb#72986

Release note (ops change): customers running clusters with 240 nodes or
more can effectively access tsdb metrics.
  • Loading branch information
dhartunian committed Jan 11, 2022
1 parent 72c74dc commit ea73a12
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, cfg.MemoryPoolSize, stopper)

ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ func (ts *TestServer) TsDB() *ts.DB {
return nil
}

// TsServer returns the ts.TsServer instance used by the TestServer.
func (ts *TestServer) TsServer() *ts.Server {
if ts != nil {
return ts.tsServer
}
return nil
}

// DB returns the client.DB instance used by the TestServer.
func (ts *TestServer) DB() *kv.DB {
if ts != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/ts/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func MakeServer(
db *DB,
nodeCountFn ClusterNodeCountFn,
cfg ServerConfig,
memoryPoolSize int64,
stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
Expand All @@ -114,6 +115,13 @@ func MakeServer(
queryWorkerMax = cfg.QueryWorkerMax
}
queryMemoryMax := queryMemoryMax
// Double size until we hit 1/128 of the memory pool setting. Our
// typical default here is 64 MiB which corresponds to a pool of 8 GiB
// which corresponds to 32 GiB of system memory (assuming a default
// setting of 25% for the pool).
for queryMemoryMax < memoryPoolSize/128 {
queryMemoryMax *= 2
}
if cfg.QueryMemoryMax != 0 {
queryMemoryMax = cfg.QueryMemoryMax
}
Expand Down Expand Up @@ -446,3 +454,8 @@ func dumpTimeseriesAllSources(
}
return nil
}

// getQueryWorkerMax is used by tests to verify the memory caps.
func (s *Server) GetQueryWorkerMax() int64 {
return s.queryMemoryMax
}
26 changes: 26 additions & 0 deletions pkg/ts/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,32 @@ func TestServerQueryStarvation(t *testing.T) {
}
}

func TestServerMemoryCap(t *testing.T) {
defer leaktest.AfterTest(t)()

MiB := 1024 * 1024
GiB := MiB * 1024
tcs := []struct {
poolSize int64
expectedTsDBWorkerMax int64
}{
{poolSize: int64(2 * GiB), expectedTsDBWorkerMax: int64(64 * MiB)},
{poolSize: int64(32 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)},
{poolSize: int64(48 * GiB), expectedTsDBWorkerMax: int64(512 * MiB)},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("%d pool should have %d worker max memory", tc.poolSize, tc.expectedTsDBWorkerMax),
func(t *testing.T) {
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{SQLMemoryPoolSize: tc.poolSize})
defer s.Stopper().Stop(context.Background())

tsServer := s.(*server.TestServer).TsServer()
require.Equal(t, tc.expectedTsDBWorkerMax, tsServer.GetQueryWorkerMax())
})
}
}

// TestServerQueryMemoryManagement verifies that queries succeed under
// constrained memory requirements.
func TestServerQueryMemoryManagement(t *testing.T) {
Expand Down

0 comments on commit ea73a12

Please sign in to comment.