Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: mem dump when about to OOM #59234

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 66 additions & 19 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ const (

flagVersion = "version"
flagVersionShort = "V"

// Memory management related constants
quarterGiB uint64 = 256 * size.MB
halfGiB uint64 = 512 * size.MB
fourGiB uint64 = 4 * size.GB

// Environment variables
envBRHeapDumpDir = "BR_HEAP_DUMP_DIR"

// Default paths
defaultHeapDumpDir = "/tmp/br_heap_dumps"
)

func timestampLogFileName() string {
Expand Down Expand Up @@ -113,10 +124,6 @@ func DefineCommonFlags(cmd *cobra.Command) {
_ = cmd.PersistentFlags().MarkHidden(FlagRedactLog)
}

const quarterGiB uint64 = 256 * size.MB
const halfGiB uint64 = 512 * size.MB
const fourGiB uint64 = 4 * size.GB

func calculateMemoryLimit(memleft uint64) uint64 {
// memreserved = f(memleft) = 512MB * memleft / (memleft + 4GB)
// * f(0) = 0
Expand All @@ -132,6 +139,57 @@ func calculateMemoryLimit(memleft uint64) uint64 {
return memlimit
}

// setupMemoryMonitoring configures memory limits and starts the memory monitor.
// It returns an error if the setup fails.
func setupMemoryMonitoring(ctx context.Context, memTotal, memUsed uint64) error {
if memUsed >= memTotal {
log.Warn("failed to obtain memory size, skip setting memory limit",
zap.Uint64("memused", memUsed), zap.Uint64("memtotal", memTotal))
return nil
}

memleft := memTotal - memUsed
memlimit := calculateMemoryLimit(memleft)
// BR command needs 256 MiB at least, if the left memory is less than 256 MiB,
// the memory limit cannot limit anyway and then finally OOM.
memlimit = max(memlimit, quarterGiB)

log.Info("calculate the rest memory",
zap.Uint64("memtotal", memTotal),
zap.Uint64("memused", memUsed),
zap.Uint64("memlimit", memlimit))

// No need to set memory limit because the left memory is sufficient.
if memlimit >= uint64(math.MaxInt64) {
return nil
}

debug.SetMemoryLimit(int64(memlimit))

// Configure and start memory monitoring
dumpDir := os.Getenv(envBRHeapDumpDir)
if dumpDir == "" {
dumpDir = defaultHeapDumpDir
}

monitorCfg := utils.MemoryMonitorConfig{
DumpDir: dumpDir,
MemoryLimit: memlimit,
MinDumpInterval: 1 * time.Minute,
}

if err := utils.StartMemoryMonitor(ctx, monitorCfg); err != nil {
log.Warn("Failed to start memory monitor", zap.Error(err))
return err
}

log.Info("Memory monitor started",
zap.String("dump_dir", dumpDir),
zap.Uint64("memory_limit_mb", memlimit/1024/1024))

return nil
}

// Init initializes BR cli.
func Init(cmd *cobra.Command) (err error) {
initOnce.Do(func() {
Expand Down Expand Up @@ -198,21 +256,10 @@ func Init(cmd *cobra.Command) (err error) {
err = e
return
}
if memused >= memtotal {
log.Warn("failed to obtain memory size, skip setting memory limit",
zap.Uint64("memused", memused), zap.Uint64("memtotal", memtotal))
} else {
memleft := memtotal - memused
memlimit := calculateMemoryLimit(memleft)
// BR command needs 256 MiB at least, if the left memory is less than 256 MiB,
// the memory limit cannot limit anyway and then finally OOM.
memlimit = max(memlimit, quarterGiB)
log.Info("calculate the rest memory",
zap.Uint64("memtotal", memtotal), zap.Uint64("memused", memused), zap.Uint64("memlimit", memlimit))
// No need to set memory limit because the left memory is sufficient.
if memlimit < uint64(math.MaxInt64) {
debug.SetMemoryLimit(int64(memlimit))
}

if e := setupMemoryMonitoring(GetDefaultContext(), memtotal, memused); e != nil {
// only log the error, don't fail initialization
log.Error("Failed to setup memory monitoring", zap.Error(e))
}
}

Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func main() {
gCtx := context.Background()
ctx, cancel := utils.StartExitSingleListener(gCtx)
defer cancel()

rootCmd := &cobra.Command{
Use: "br",
Expand All @@ -34,7 +35,6 @@ func main() {

rootCmd.SetArgs(os.Args[1:])
if err := rootCmd.Execute(); err != nil {
cancel()
log.Error("br failed", zap.Error(err))
os.Exit(1) // nolint:gocritic
}
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"error_handling.go",
"json.go",
"key.go",
"memory_monitor.go",
"misc.go",
"pointer.go",
"pprof.go",
Expand Down Expand Up @@ -78,6 +79,7 @@ go_test(
"json_test.go",
"key_test.go",
"main_test.go",
"memory_monitor_test.go",
"misc_test.go",
"progress_test.go",
"register_test.go",
Expand All @@ -86,7 +88,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 34,
shard_count = 36,
deps = [
"//br/pkg/errors",
"//pkg/kv",
Expand Down
175 changes: 175 additions & 0 deletions br/pkg/utils/memory_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package utils

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sort"
"time"

"github.com/docker/go-units"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
MemoryWarningThreshold = 0.8
MemoryDumpThreshold = 0.9

MonitorInterval = 1 * time.Second
DefaultDumpInterval = 5 * time.Minute

DumpDirPerm = 0755

HeapDumpFilePattern = "br_heap_%s_%d.pprof"
TimeFormat = "20060102_150405"

DefaultHeapDumpDir = "/tmp/heapdump"

MaxHeapDumps = 3
)

// MemoryMonitorConfig holds configuration for memory monitoring
type MemoryMonitorConfig struct {
// DumpDir is where heap dumps will be written
DumpDir string
// MemoryLimit is the memory limit in bytes (if known)
MemoryLimit uint64
// MinDumpInterval is the minimum time between heap dumps
MinDumpInterval time.Duration
}

// cleanupOldHeapDumps removes old heap dumps if there are more than MaxHeapDumps
func cleanupOldHeapDumps(dumpDir string) {
files, err := filepath.Glob(filepath.Join(dumpDir, "br_heap_*.pprof"))
if err != nil {
log.Warn("Failed to list heap dumps for cleanup", zap.Error(err))
return
}

if len(files) <= MaxHeapDumps {
return
}

// sort by filename which contains timestamp
sort.Strings(files)

// remove older files (keeping the last MaxHeapDumps files)
for i := 0; i < len(files)-MaxHeapDumps; i++ {
if err := os.Remove(files[i]); err != nil {
log.Warn("Failed to remove old heap dump",
zap.String("file", files[i]),
zap.Error(err))
continue
}
log.Info("Removed old heap dump", zap.String("file", files[i]))
}
}

// StartMemoryMonitor starts monitoring memory usage and dumps heap when thresholds are exceeded
func StartMemoryMonitor(ctx context.Context, cfg MemoryMonitorConfig) error {
if cfg.DumpDir == "" {
cfg.DumpDir = DefaultHeapDumpDir
if err := os.MkdirAll(cfg.DumpDir, DumpDirPerm); err != nil {
log.Warn("Failed to create default heap dump directory, falling back to temp dir",
zap.String("default_dir", cfg.DumpDir),
zap.Error(err))
cfg.DumpDir = os.TempDir()
}
}
if cfg.MinDumpInterval == 0 {
cfg.MinDumpInterval = DefaultDumpInterval
}

// Ensure dump directory exists
if err := os.MkdirAll(cfg.DumpDir, DumpDirPerm); err != nil {
return fmt.Errorf("failed to create dump directory: %v", err)
}

// Clean up any existing old heap dumps on startup
cleanupOldHeapDumps(cfg.DumpDir)

log.Info("Memory monitor will store heap dumps in",
zap.String("dump_dir", cfg.DumpDir),
zap.Bool("using_temp_dir", cfg.DumpDir == os.TempDir()),
zap.Int("max_dumps", MaxHeapDumps))

var lastDump time.Time
var memStats runtime.MemStats

go func() {
ticker := time.NewTicker(MonitorInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runtime.ReadMemStats(&memStats)

var memoryUsage float64
if cfg.MemoryLimit > 0 {
memoryUsage = float64(memStats.Alloc) / float64(cfg.MemoryLimit)
} else {
// if no limit specified, use system memory as reference
memoryUsage = float64(memStats.Alloc) / float64(memStats.Sys)
}

if memoryUsage >= MemoryWarningThreshold {
log.Warn("High memory usage detected",
zap.Float64("usage_percentage", memoryUsage*100),
zap.Uint64("alloc_bytes", memStats.Alloc),
zap.Uint64("sys_bytes", memStats.Sys))
}

// dump heap at dump threshold if enough time has passed since last dump
if memoryUsage >= MemoryDumpThreshold && time.Since(lastDump) >= cfg.MinDumpInterval {
dumpPath := filepath.Join(cfg.DumpDir,
fmt.Sprintf(HeapDumpFilePattern,
time.Now().Format(TimeFormat),
memStats.Alloc/units.MiB))

if err := dumpHeap(dumpPath); err != nil {
log.Error("Failed to dump heap",
zap.Error(err),
zap.String("path", dumpPath))
continue
}

log.Info("Heap dump created",
zap.String("path", dumpPath),
zap.Float64("usage_percentage", memoryUsage*100),
zap.Uint64("alloc_mb", memStats.Alloc/units.MiB))

lastDump = time.Now()

// clean up old dumps after creating a new one
cleanupOldHeapDumps(cfg.DumpDir)
}
}
}
}()

return nil
}

// dumpHeap creates a heap profile at the specified path
func dumpHeap(path string) error {
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create heap dump file: %v", err)
}
defer f.Close()

if err := pprof.WriteHeapProfile(f); err != nil {
return fmt.Errorf("failed to write heap profile: %v", err)
}

return nil
}
Loading
Loading