Skip to content

Commit

Permalink
server/heapprofiler: don't conside "goIdle" memory
Browse files Browse the repository at this point in the history
Base decisions about whether to take a heap profile or not on
RSS - goIdle, not RSS alone. goIdle is memory allocated from the OS
that's not actually in use. I've seen it be up to several gigs.
The rationale behind the patch is that we want profiles when the heap is
large more than when the RSS is large. I'm looking at a case where we
took a heap profile when the heap was 4.5 gigs with 2 gigs idle and then
never took one again because of how the heuristic works. And then we
OOMed when the heap was larger and the idle space was lower, but the RSS
was about the same. With this patch, we would have taken a profile at a
more interesting time.

I've also further changed the profile collection policy slighly: before,
once we got above the 85% threshold, we wouldn't take another profile
until we go below it and then again above it. With this patch, you take
one profile a minute while you're above 85%, period. Seems simpler and
it has a chance of catching some further heap increases.

Release note: None
  • Loading branch information
andreimatei committed May 16, 2019
1 parent cf0e3e9 commit 841a2ff
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 139 deletions.
39 changes: 39 additions & 0 deletions pkg/base/mem_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package base

import "time"

// GoMemStats represents information from the Go allocator.
// All units are bytes.
type GoMemStats struct {
// GoAllocated is bytes of allocated heap objects.
GoAllocated uint64
// GoIdle is space allocated from the OS but not used.
GoIdle uint64
// GoTotal is approximately GoAllocated + GoIdle (it also includes stacks and
// other things not included in GoAllocated).
GoTotal uint64

// Collected is the timestamp at which these values were collected.
Collected time.Time
}

// MemStats groups HeapStats and an rss measurement (rss referring to the whole
// process, beyond the Go parts).
type MemStats struct {
Go GoMemStats
RSSBytes uint64
}
95 changes: 45 additions & 50 deletions pkg/server/heapprofiler/heapprofiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -49,46 +50,17 @@ var (
)

type stats struct {
rssBytes int64
systemMemoryBytes int64
lastProfileTime time.Time
aboveSysMemThresholdSinceLastProfile bool
currentTime func() time.Time
}
memStats base.MemStats
systemMemoryBytes uint64
lastProfileTime time.Time

type heuristic struct {
name string
isTrue func(s *stats, st *cluster.Settings) (score int64, isTrue bool)
}

// fractionSystemMemoryHeuristic is true if latest Rss is more than
// systemMemoryThresholdFraction of system memory. No new profile is
// taken if Rss has been above threshold since the last time profile was taken,
// but a new profile will be triggered if Rss has dipped below threshold since
// the last profile. score is the latest value of Rss.
// At max one profile will be taken in minProfileInterval.
var fractionSystemMemoryHeuristic = heuristic{
name: "fraction_system_memory",
isTrue: func(s *stats, st *cluster.Settings) (score int64, isTrue bool) {
currentValue := s.rssBytes
if float64(currentValue)/float64(s.systemMemoryBytes) > systemMemoryThresholdFraction.Get(&st.SV) {
if s.currentTime().Sub(s.lastProfileTime) < minProfileInterval ||
s.aboveSysMemThresholdSinceLastProfile {
return 0, false
}
s.aboveSysMemThresholdSinceLastProfile = true
return currentValue, true
}
s.aboveSysMemThresholdSinceLastProfile = false
return 0, false
},
now func() time.Time
}

// HeapProfiler is used to take heap profiles if an OOM situation is
// detected. It stores relevant functions and stats for heuristics to use.
type HeapProfiler struct {
*stats
heuristics []heuristic
takeHeapProfile func(ctx context.Context, dir string, prefix string, suffix string)
gcProfiles func(ctx context.Context, dir, prefix string, maxCount int64)
dir string
Expand All @@ -99,30 +71,54 @@ const memprof = "memprof."
// MaybeTakeProfile takes a heap profile if an OOM situation is detected using
// heuristics enabled in o. At max one profile is taken in a call of this
// function. This function is also responsible for updating stats in o.
func (o *HeapProfiler) MaybeTakeProfile(ctx context.Context, st *cluster.Settings, rssBytes int64) {
o.rssBytes = rssBytes
func (o *HeapProfiler) MaybeTakeProfile(
ctx context.Context, st *cluster.Settings, memStats base.MemStats,
) {
o.memStats = memStats
profileTaken := false
for _, h := range o.heuristics {
if score, isTrue := h.isTrue(o.stats, st); isTrue {
if !profileTaken {
prefix := memprof + h.name + "."
const format = "2006-01-02T15_04_05.999"
suffix := fmt.Sprintf("%018d_%s", score, o.currentTime().Format(format))
o.takeHeapProfile(ctx, o.dir, prefix, suffix)
o.lastProfileTime = o.currentTime()
profileTaken = true
if o.gcProfiles != nil {
o.gcProfiles(ctx, o.dir, memprof, maxProfiles.Get(&st.SV))
}
if score, isTrue := o.fractionSystemMemoryHeuristic(st); isTrue {
if !profileTaken {
prefix := memprof
const format = "2006-01-02T15_04_05.999"
suffix := fmt.Sprintf("%018d_%s", score, o.now().Format(format))
o.takeHeapProfile(ctx, o.dir, prefix, suffix)
o.lastProfileTime = o.now()
profileTaken = true
if o.gcProfiles != nil {
o.gcProfiles(ctx, o.dir, memprof, maxProfiles.Get(&st.SV))
}
}
}
}

// fractionSystemMemoryHeuristic is true if latest (RSS - goIdle) is more than
// systemMemoryThresholdFraction of system memory. No new profile is taken if
// Rss has been above threshold since the last time profile was taken.
// At max one profile will be taken per minProfileInterval.
func (o *HeapProfiler) fractionSystemMemoryHeuristic(
st *cluster.Settings,
) (score uint64, isTrue bool) {
threshold := systemMemoryThresholdFraction.Get(&st.SV)
curMemUse := o.stats.memStats.RSSBytes - o.stats.memStats.Go.GoIdle
if float64(curMemUse)/float64(o.stats.systemMemoryBytes) < threshold {
// Plenty of memory left in the system. No profile.
return 0, false
}

if o.stats.now().Sub(o.stats.lastProfileTime) > minProfileInterval {
// It's been a while since the last profile. Let's take a new one.
return curMemUse, true
}

// Looks like we had another recent profile that was good enough. No need to
// take a new one.
return 0, false
}

// NewHeapProfiler returns a HeapProfiler which has
// systemMemoryThresholdFraction heuristic enabled. dir is the directory in
// which profiles are stored.
func NewHeapProfiler(dir string, systemMemoryBytes int64) (*HeapProfiler, error) {
func NewHeapProfiler(dir string, systemMemoryBytes uint64) (*HeapProfiler, error) {
if dir == "" {
return nil, errors.New("directory to store profiles could not be determined")
}
Expand All @@ -133,9 +129,8 @@ func NewHeapProfiler(dir string, systemMemoryBytes int64) (*HeapProfiler, error)
hp := &HeapProfiler{
stats: &stats{
systemMemoryBytes: systemMemoryBytes,
currentTime: timeutil.Now,
now: timeutil.Now,
},
heuristics: []heuristic{fractionSystemMemoryHeuristic},
takeHeapProfile: takeHeapProfile,
gcProfiles: gcProfiles,
dir: dir,
Expand Down
52 changes: 28 additions & 24 deletions pkg/server/heapprofiler/heapprofiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,36 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/stretchr/testify/assert"
)

type rssVal struct {
secs time.Duration // secs is the time at which this rss value was emitted
rss int64
secs time.Duration // secs is the time at which this rss value was emitted
rssBytes uint64
goIdleBytes uint64
}

func testHelper(
ctx context.Context,
t *testing.T,
hp *HeapProfiler,
st *cluster.Settings,
rssValues []rssVal,
expectedScores []int64,
expectedPrefixes []string,
) {
baseTime := time.Time{}
numProfiles := 0
mockHeapProfile := func(
ctx context.Context, dir string, prefix string, suffix string,
) {
assert.Equal(t, prefix, expectedPrefixes[numProfiles])
score, err := strconv.ParseInt(strings.Split(suffix, "_")[0], 10, 64)
assert.Nil(t, err)
if len(expectedScores) <= numProfiles {
t.Fatalf("unexected profile: %d", numProfiles)
}
assert.Equal(t, expectedScores[numProfiles], score)
numProfiles++
}
Expand All @@ -58,42 +62,42 @@ func testHelper(
return currentTime
}
hp.takeHeapProfile = mockHeapProfile
hp.currentTime = now
hp.now = now
// set a large negative time so that first profile is triggered correctly
// since we start time from 0 in test.
// Not needed in main code as time will never be 0.
hp.lastProfileTime = time.Time{}.Add(-1000 * time.Second)

ctx := context.TODO()
for _, r := range rssValues {
currentTime = baseTime.Add(time.Second * r.secs)
hp.MaybeTakeProfile(ctx, st, r.rss)
ms := base.MemStats{RSSBytes: r.rssBytes}
ms.Go.GoIdle = r.goIdleBytes
hp.MaybeTakeProfile(ctx, st, ms)
}
assert.Equal(t, numProfiles, len(expectedScores))
}

func TestPercentSystemMemoryHeuristic(t *testing.T) {
rssValues := []rssVal{
{0, 30}, {20, 40}, // random small values
{30, 88}, // should trigger
{80, 89}, // should not trigger as less than 60s before last profile
{130, 10}, {140, 4}, // random small values
{150, 90}, // should trigger
{260, 92}, // should not trigger as continues above threshold
{290, 30}, // random small value
{380, 99}, // should trigger
{390, 30}, // random small value
{430, 91}, // should not trigger as less than 60s before last profile
{500, 95}, // should trigger
{0, 30, 0}, {20, 40, 0}, // random small values
{30, 88, 0}, // should trigger
{80, 89, 0}, // should not trigger as less than 60s before last profile
{130, 10, 0}, {140, 4, 0}, // random small values
{150, 90, 0}, // should trigger
{290, 30, 0}, // random small value
{380, 99, 0}, // should trigger
{390, 30, 0}, // random small value
{430, 91, 0}, // should not trigger as less than 60s before last profile
{500, 95, 0}, // should trigger
{600, 30, 0}, // random small value
{700, 100, 90}, // should not trigger; large idle heap which is discounted
{700, 100, 10}, // should trigger; some idle heap but not big enough to matter
}
expectedScores := []int64{88, 90, 99, 95}
prefix := "memprof.fraction_system_memory."
expectedPrefixes := []string{prefix, prefix, prefix, prefix}
expectedScores := []int64{88, 90, 99, 95, 90}
hp := &HeapProfiler{
stats: &stats{systemMemoryBytes: 100},
heuristics: []heuristic{fractionSystemMemoryHeuristic},
stats: &stats{systemMemoryBytes: 100},
}
st := &cluster.Settings{}
systemMemoryThresholdFraction.Override(&st.SV, .85)
testHelper(t, hp, st, rssValues, expectedScores, expectedPrefixes)
testHelper(context.Background(), t, hp, st, rssValues, expectedScores)
}
63 changes: 38 additions & 25 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,52 +1920,65 @@ func (s *Server) startSampleEnvironment(frequency time.Duration) {
if err != nil {
log.Warningf(ctx, "Could not compute system memory due to: %s", err)
} else {
heapProfiler, err = heapprofiler.NewHeapProfiler(s.cfg.HeapProfileDirName, systemMemoryBytes)
heapProfiler, err = heapprofiler.NewHeapProfiler(
s.cfg.HeapProfileDirName, uint64(systemMemoryBytes))
if err != nil {
log.Infof(ctx, "Could not start heap profiler worker due to: %s", err)
}
}
}

// We run two separate sampling loops, one for memory stats (via
// ReadMemStats) and one for all other runtime stats. This is necessary
// because as of go1.11, runtime.ReadMemStats() "stops the world" and
// requires waiting for any current GC run to finish. With a large heap, a
// single GC run may take longer than the default sampling period (10s).
s.stopper.RunWorker(ctx, func(ctx context.Context) {
var goMemStats atomic.Value // *base.GoMemStats
goMemStats.Store(&base.GoMemStats{})
var collectingMemStats int32 // atomic, 1 when stats call is ongoing

timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(frequency)

for {
select {
case <-timer.C:
timer.Read = true
s.runtime.SampleMemStats(ctx)
timer.Reset(frequency)
case <-s.stopper.ShouldStop():
return
}
}
})

s.stopper.RunWorker(ctx, func(ctx context.Context) {
timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(frequency)
for {
select {
case <-timer.C:
timer.Read = true
s.runtime.SampleEnvironment(ctx)
timer.Reset(frequency)

// We read the heap stats on another goroutine and give up after 1s.
// This is necessary because as of Go 1.12, runtime.ReadMemStats()
// "stops the world" and that requires first waiting for any current GC
// run to finish. With a large heap, a single GC run may take longer
// than the default sampling period (10s).
statsCollected := make(chan struct{})
if atomic.CompareAndSwapInt32(&collectingMemStats, 0, 1) {
if err := s.stopper.RunTask(ctx, "mem-stats-sample", func(ctx context.Context) {
curStats := s.runtime.SampleMemStats(ctx)
goMemStats.Store(&curStats)
atomic.StoreInt32(&collectingMemStats, 0)
close(statsCollected)
}); err != nil {
close(statsCollected)
}
}

select {
case <-statsCollected:
// Good; we managed to read the Go memory stats quickly enough.
case <-time.After(time.Second):
log.Infof(ctx, "SampleMemStats hasn't finished in one second; "+
"will use previously collected heap stats")
}

curStats := goMemStats.Load().(*base.GoMemStats)
memStats := s.runtime.SampleEnvironment(ctx, *curStats)
if goroutineDumper != nil {
goroutineDumper.MaybeDump(ctx, s.ClusterSettings(), s.runtime.Goroutines.Value())
}
if heapProfiler != nil {
heapProfiler.MaybeTakeProfile(ctx, s.ClusterSettings(), s.runtime.RSSBytes.Value())
heapProfiler.MaybeTakeProfile(ctx, s.ClusterSettings(), memStats)
}
timer.Reset(frequency)
case <-s.stopper.ShouldStop():
return

}
}
})
Expand Down
Loading

0 comments on commit 841a2ff

Please sign in to comment.