Skip to content

Commit

Permalink
small adjustments.
Browse files Browse the repository at this point in the history
* minimum GOGC on heap watchdog to prevent GC overscheduling.
* write heap _profiles_ instead of heap _dumps_.
* minor logging adjustments.
  • Loading branch information
raulk committed Jan 20, 2021
1 parent 8022773 commit 267fadc
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 55 deletions.
110 changes: 59 additions & 51 deletions watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"runtime/debug"
"runtime/pprof"
"sync"
"time"

Expand Down Expand Up @@ -38,24 +39,24 @@ var (
// NotifyGC, if non-nil, will be called when a GC has happened.
NotifyGC func() = func() {}

// HeapdumpThreshold sets the utilization threshold that will trigger a
// heap dump to be taken automatically. A zero value disables this feature.
// HeapProfileThreshold sets the utilization threshold that will trigger a
// heap profile to be taken automatically. A zero value disables this feature.
// By default, it is disabled.
HeapdumpThreshold float64
HeapProfileThreshold float64

// HeapdumpMaxCaptures sets the maximum amount of heapdumps a process will generate.
// HeapProfileMaxCaptures sets the maximum amount of heap profiles a process will generate.
// This limits the amount of episodes that will be captured, in case the
// utilization climbs repeatedly over the threshold. By default, it is 10.
HeapdumpMaxCaptures = uint(10)
HeapProfileMaxCaptures = uint(10)

// HeapdumpDir is the directory where the watchdog will write the heapdump.
// HeapProfileDir is the directory where the watchdog will write the heap profile.
// It will be created if it doesn't exist upon initialization. An error when
// creating the dir will not prevent heapdog initialization; it will just
// disable the heapdump capture feature. If zero-valued, the feature is
// disable the heap profile capture feature. If zero-valued, the feature is
// disabled.
//
// Heapdumps will be written to path <HeapdumpDir>/<RFC3339Nano formatted timestamp>.heap.
HeapdumpDir string
// HeapProfiles will be written to path <HeapProfileDir>/<RFC3339Nano formatted timestamp>.heap.
HeapProfileDir string
)

var (
Expand Down Expand Up @@ -118,8 +119,8 @@ var _watchdog struct {

scope UtilizationType

hdleft uint // tracks the amount of heapdumps left.
hdcurr bool // tracks whether a heapdump has already been taken for this episode.
hpleft uint // tracks the amount of heap profiles left.
hpcurr bool // tracks whether a heap profile has already been taken for this episode.

closing chan struct{}
wg sync.WaitGroup
Expand Down Expand Up @@ -149,13 +150,12 @@ type Policy interface {
Evaluate(scope UtilizationType, used uint64) (next uint64)
}

// HeapDriven starts a singleton heap-driven watchdog.
// HeapDriven starts a singleton heap-driven watchdog, which adjusts GOGC
// dynamically after every GC, to honour the policy requirements.
//
// The heap-driven watchdog adjusts GOGC dynamically after every GC, to honour
// the policy requirements.
//
// A zero-valued limit will error.
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
// Providing a zero-valued limit will error. A minimum GOGC value is required,
// so as to avoid overscheduling GC, and overfitting to a specific target.
func HeapDriven(limit uint64, minGOGC int, policyCtor PolicyCtor) (err error, stopFn func()) {
if limit == 0 {
return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil
}
Expand Down Expand Up @@ -196,10 +196,12 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
// recompute the next trigger.
memstatsFn(&memstats)

maybeCaptureHeapdump(memstats.HeapAlloc, limit)
maybeCaptureHeapProfile(memstats.HeapAlloc, limit)

// heapMarked is the amount of heap that was marked as live by GC.
// it is inferred from our current GOGC and the new target picked.
//
// this accurately represents
heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100))
if heapMarked == 0 {
// this shouldn't happen, but just in case; avoiding a div by 0.
Expand All @@ -218,16 +220,16 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
Logger.Debugf("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC)
currGOGC = originalGOGC
} else {
if currGOGC < 1 {
currGOGC = 1
if currGOGC < minGOGC {
currGOGC = minGOGC // cap GOGC to avoid overscheduling.
}
Logger.Infof("heap watchdog: setting GOGC percent: %d", currGOGC)
Logger.Debugf("heap watchdog: setting GOGC percent: %d", currGOGC)
}

debug.SetGCPercent(currGOGC)

memstatsFn(&memstats)
Logger.Infof("heap watchdog stats: heap_alloc: %d, heap_marked: %d, next_gc: %d, policy_next_gc: %d, gogc: %d",
Logger.Infof("gc finished; heap watchdog stats: heap_alloc: %d, heap_marked: %d, next_gc: %d, policy_next_gc: %d, gogc: %d",
memstats.HeapAlloc, heapMarked, memstats.NextGC, next, currGOGC)
}
}()
Expand Down Expand Up @@ -324,8 +326,8 @@ func pollingWatchdog(policy Policy, frequency time.Duration, limit uint64, usage
continue
}

// evaluate if a heapdump needs to be captured.
maybeCaptureHeapdump(usage, limit)
// evaluate if a heap profile needs to be captured.
maybeCaptureHeapProfile(usage, limit)

if usage < threshold {
// nothing to do.
Expand Down Expand Up @@ -411,7 +413,7 @@ func start(scope UtilizationType) error {
_watchdog.scope = scope
_watchdog.closing = make(chan struct{})

initHeapdumpCapture()
initHeapProfileCapture()

return nil
}
Expand All @@ -429,58 +431,64 @@ func stop() {
_watchdog.state = stateUnstarted
}

func initHeapdumpCapture() {
if HeapdumpDir == "" || HeapdumpThreshold <= 0 {
Logger.Debugf("heapdump capture disabled")
func initHeapProfileCapture() {
if HeapProfileDir == "" || HeapProfileThreshold <= 0 {
Logger.Debugf("heap profile capture disabled")
return
}
if HeapdumpThreshold >= 1 {
Logger.Warnf("failed to initialize heapdump capture: threshold must be 0 < t < 1")
if HeapProfileThreshold >= 1 {
Logger.Warnf("failed to initialize heap profile capture: threshold must be 0 < t < 1")
return
}
if fi, err := os.Stat(HeapdumpDir); os.IsNotExist(err) {
if err := os.MkdirAll(HeapdumpDir, 0777); err != nil {
Logger.Warnf("failed to initialize heapdump capture: failed to create dir: %s; err: %s", HeapdumpDir, err)
if fi, err := os.Stat(HeapProfileDir); os.IsNotExist(err) {
if err := os.MkdirAll(HeapProfileDir, 0777); err != nil {
Logger.Warnf("failed to initialize heap profile capture: failed to create dir: %s; err: %s", HeapProfileDir, err)
return
}
} else if err != nil {
Logger.Warnf("failed to initialize heapdump capture: failed to stat path: %s; err: %s", HeapdumpDir, err)
Logger.Warnf("failed to initialize heap profile capture: failed to stat path: %s; err: %s", HeapProfileDir, err)
return
} else if !fi.IsDir() {
Logger.Warnf("failed to initialize heapdump capture: path exists but is not a directory: %s", HeapdumpDir)
Logger.Warnf("failed to initialize heap profile capture: path exists but is not a directory: %s", HeapProfileDir)
return
}
// all good, set the amount of heapdump captures left.
_watchdog.hdleft = HeapdumpMaxCaptures
Logger.Infof("initialized heap dump capture; threshold: %f; max captures: %d; dir: %s", HeapdumpThreshold, HeapdumpMaxCaptures, HeapdumpDir)
// all good, set the amount of heap profile captures left.
_watchdog.hpleft = HeapProfileMaxCaptures
Logger.Infof("initialized heap profile capture; threshold: %f; max captures: %d; dir: %s", HeapProfileThreshold, HeapProfileMaxCaptures, HeapProfileDir)
}

func maybeCaptureHeapdump(usage, limit uint64) {
if _watchdog.hdleft <= 0 {
func maybeCaptureHeapProfile(usage, limit uint64) {
if _watchdog.hpleft <= 0 {
// nothing to do; no captures remaining (or captures disabled), or
// already captured a heapdump for this episode.
// already captured a heap profile for this episode.
return
}
if float64(usage)/float64(limit) < HeapdumpThreshold {
// we are below the threshold, reset the hdcurr flag.
_watchdog.hdcurr = false
if float64(usage)/float64(limit) < HeapProfileThreshold {
// we are below the threshold, reset the hpcurr flag.
_watchdog.hpcurr = false
return
}
// we are above the threshold.
if _watchdog.hdcurr {
if _watchdog.hpcurr {
return // we've already captured this episode, skip.
}
path := filepath.Join(HeapdumpDir, time.Now().Format(time.RFC3339Nano)+".heap")

path := filepath.Join(HeapProfileDir, time.Now().Format(time.RFC3339Nano)+".heap")
file, err := os.Create(path)
if err != nil {
Logger.Warnf("failed to write heapdump; failed to create file in path %s; err: %s", path, err)
Logger.Warnf("failed to create heap profile file; path: %s; err: %s", path, err)
return
}
defer file.Close()
debug.WriteHeapDump(file.Fd())
Logger.Infof("heap dump captured; path: %s", path)
_watchdog.hdcurr = true
_watchdog.hdleft--

if err = pprof.WriteHeapProfile(file); err != nil {
Logger.Warnf("failed to write heap profile; path: %s; err: %s", path, err)
return
}

Logger.Infof("heap profile captured; path: %s", path)
_watchdog.hpcurr = true
_watchdog.hpleft--
}

func wdrecover() {
Expand Down
8 changes: 4 additions & 4 deletions watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestHeapDriven_Isolated(t *testing.T) {
}

// limit is 64MiB.
err, stopFn := HeapDriven(limit64MiB, NewAdaptivePolicy(0.5))
err, stopFn := HeapDriven(limit64MiB, 0, NewAdaptivePolicy(0.5))
require.NoError(t, err)
defer stopFn()

Expand Down Expand Up @@ -200,9 +200,9 @@ func TestHeapdumpCapture(t *testing.T) {
require.Len(t, glob, expected)
}

HeapdumpDir = dir
HeapdumpThreshold = 0.5
HeapdumpMaxCaptures = 5
HeapProfileDir = dir
HeapProfileThreshold = 0.5
HeapProfileMaxCaptures = 5

// mock clock.
clk := clock.NewMock()
Expand Down

0 comments on commit 267fadc

Please sign in to comment.