From 088ab97205b9dcc48392491222741a4858e3b932 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Fri, 31 Jan 2025 11:26:07 -0500 Subject: [PATCH 1/4] br: mem dump Signed-off-by: Wenqi Mou --- br/cmd/br/cmd.go | 85 +++++++++++--- br/pkg/utils/memory_monitor.go | 175 +++++++++++++++++++++++++++ br/pkg/utils/memory_monitor_test.go | 176 ++++++++++++++++++++++++++++ 3 files changed, 417 insertions(+), 19 deletions(-) create mode 100644 br/pkg/utils/memory_monitor.go create mode 100644 br/pkg/utils/memory_monitor_test.go diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index 695d9975717a9..192a0f88c2f60 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -79,6 +79,17 @@ const ( flagVersion = "version" flagVersionShort = "V" + + // Memory management related constants + quarterGiB uint64 = 256 * size.MB + halfGiB uint64 = 512 * size.MB + fourGiB uint64 = 4 * size.GB + + // Environment variables + envBRHeapDumpDir = "BR_HEAP_DUMP_DIR" + + // Default paths + defaultHeapDumpDir = "/tmp/br_heap_dumps" ) func timestampLogFileName() string { @@ -113,10 +124,6 @@ func DefineCommonFlags(cmd *cobra.Command) { _ = cmd.PersistentFlags().MarkHidden(FlagRedactLog) } -const quarterGiB uint64 = 256 * size.MB -const halfGiB uint64 = 512 * size.MB -const fourGiB uint64 = 4 * size.GB - func calculateMemoryLimit(memleft uint64) uint64 { // memreserved = f(memleft) = 512MB * memleft / (memleft + 4GB) // * f(0) = 0 @@ -132,6 +139,57 @@ func calculateMemoryLimit(memleft uint64) uint64 { return memlimit } +// setupMemoryMonitoring configures memory limits and starts the memory monitor. +// It returns an error if the setup fails. +func setupMemoryMonitoring(ctx context.Context, memTotal, memUsed uint64) error { + if memUsed >= memTotal { + log.Warn("failed to obtain memory size, skip setting memory limit", + zap.Uint64("memused", memUsed), zap.Uint64("memtotal", memTotal)) + return nil + } + + memleft := memTotal - memUsed + memlimit := calculateMemoryLimit(memleft) + // BR command needs 256 MiB at least, if the left memory is less than 256 MiB, + // the memory limit cannot limit anyway and then finally OOM. + memlimit = max(memlimit, quarterGiB) + + log.Info("calculate the rest memory", + zap.Uint64("memtotal", memTotal), + zap.Uint64("memused", memUsed), + zap.Uint64("memlimit", memlimit)) + + // No need to set memory limit because the left memory is sufficient. + if memlimit >= uint64(math.MaxInt64) { + return nil + } + + debug.SetMemoryLimit(int64(memlimit)) + + // Configure and start memory monitoring + dumpDir := os.Getenv(envBRHeapDumpDir) + if dumpDir == "" { + dumpDir = defaultHeapDumpDir + } + + monitorCfg := utils.MemoryMonitorConfig{ + DumpDir: dumpDir, + MemoryLimit: memlimit, + MinDumpInterval: 1 * time.Minute, + } + + if err := utils.StartMemoryMonitor(ctx, monitorCfg); err != nil { + log.Warn("Failed to start memory monitor", zap.Error(err)) + return err + } + + log.Info("Memory monitor started", + zap.String("dump_dir", dumpDir), + zap.Uint64("memory_limit_mb", memlimit/1024/1024)) + + return nil +} + // Init initializes BR cli. func Init(cmd *cobra.Command) (err error) { initOnce.Do(func() { @@ -198,21 +256,10 @@ func Init(cmd *cobra.Command) (err error) { err = e return } - if memused >= memtotal { - log.Warn("failed to obtain memory size, skip setting memory limit", - zap.Uint64("memused", memused), zap.Uint64("memtotal", memtotal)) - } else { - memleft := memtotal - memused - memlimit := calculateMemoryLimit(memleft) - // BR command needs 256 MiB at least, if the left memory is less than 256 MiB, - // the memory limit cannot limit anyway and then finally OOM. - memlimit = max(memlimit, quarterGiB) - log.Info("calculate the rest memory", - zap.Uint64("memtotal", memtotal), zap.Uint64("memused", memused), zap.Uint64("memlimit", memlimit)) - // No need to set memory limit because the left memory is sufficient. - if memlimit < uint64(math.MaxInt64) { - debug.SetMemoryLimit(int64(memlimit)) - } + + if e := setupMemoryMonitoring(cmd.Context(), memtotal, memused); e != nil { + // only log the error, don't fail initialization + log.Error("Failed to setup memory monitoring", zap.Error(e)) } } diff --git a/br/pkg/utils/memory_monitor.go b/br/pkg/utils/memory_monitor.go new file mode 100644 index 0000000000000..3bc69eafb311c --- /dev/null +++ b/br/pkg/utils/memory_monitor.go @@ -0,0 +1,175 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +import ( + "context" + "fmt" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "sort" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const ( + MemoryWarningThreshold = 0.8 + MemoryDumpThreshold = 0.9 + + MonitorInterval = 1 * time.Second + DefaultDumpInterval = 5 * time.Minute + + DumpDirPerm = 0755 + + HeapDumpFilePattern = "br_heap_%s_%d.pprof" + TimeFormat = "20060102_150405" + + DefaultHeapDumpDir = "/tmp/heapdump" + + MaxHeapDumps = 3 +) + +// MemoryMonitorConfig holds configuration for memory monitoring +type MemoryMonitorConfig struct { + // DumpDir is where heap dumps will be written + DumpDir string + // MemoryLimit is the memory limit in bytes (if known) + MemoryLimit uint64 + // MinDumpInterval is the minimum time between heap dumps + MinDumpInterval time.Duration +} + +// cleanupOldHeapDumps removes old heap dumps if there are more than MaxHeapDumps +func cleanupOldHeapDumps(dumpDir string) { + files, err := filepath.Glob(filepath.Join(dumpDir, "br_heap_*.pprof")) + if err != nil { + log.Warn("Failed to list heap dumps for cleanup", zap.Error(err)) + return + } + + if len(files) <= MaxHeapDumps { + return + } + + // sort by filename which contains timestamp + sort.Strings(files) + + // remove older files (keeping the last MaxHeapDumps files) + for i := 0; i < len(files)-MaxHeapDumps; i++ { + if err := os.Remove(files[i]); err != nil { + log.Warn("Failed to remove old heap dump", + zap.String("file", files[i]), + zap.Error(err)) + continue + } + log.Info("Removed old heap dump", zap.String("file", files[i])) + } +} + +// StartMemoryMonitor starts monitoring memory usage and dumps heap when thresholds are exceeded +func StartMemoryMonitor(ctx context.Context, cfg MemoryMonitorConfig) error { + if cfg.DumpDir == "" { + cfg.DumpDir = DefaultHeapDumpDir + if err := os.MkdirAll(cfg.DumpDir, DumpDirPerm); err != nil { + log.Warn("Failed to create default heap dump directory, falling back to temp dir", + zap.String("default_dir", cfg.DumpDir), + zap.Error(err)) + cfg.DumpDir = os.TempDir() + } + } + if cfg.MinDumpInterval == 0 { + cfg.MinDumpInterval = DefaultDumpInterval + } + + // Ensure dump directory exists + if err := os.MkdirAll(cfg.DumpDir, DumpDirPerm); err != nil { + return fmt.Errorf("failed to create dump directory: %v", err) + } + + // Clean up any existing old heap dumps on startup + cleanupOldHeapDumps(cfg.DumpDir) + + log.Info("Memory monitor will store heap dumps in", + zap.String("dump_dir", cfg.DumpDir), + zap.Bool("using_temp_dir", cfg.DumpDir == os.TempDir()), + zap.Int("max_dumps", MaxHeapDumps)) + + var lastDump time.Time + var memStats runtime.MemStats + + go func() { + ticker := time.NewTicker(MonitorInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + runtime.ReadMemStats(&memStats) + + var memoryUsage float64 + if cfg.MemoryLimit > 0 { + memoryUsage = float64(memStats.Alloc) / float64(cfg.MemoryLimit) + } else { + // if no limit specified, use system memory as reference + memoryUsage = float64(memStats.Alloc) / float64(memStats.Sys) + } + + if memoryUsage >= MemoryWarningThreshold { + log.Warn("High memory usage detected", + zap.Float64("usage_percentage", memoryUsage*100), + zap.Uint64("alloc_bytes", memStats.Alloc), + zap.Uint64("sys_bytes", memStats.Sys)) + } + + // dump heap at dump threshold if enough time has passed since last dump + if memoryUsage >= MemoryDumpThreshold && time.Since(lastDump) >= cfg.MinDumpInterval { + dumpPath := filepath.Join(cfg.DumpDir, + fmt.Sprintf(HeapDumpFilePattern, + time.Now().Format(TimeFormat), + memStats.Alloc/units.MiB)) + + if err := dumpHeap(dumpPath); err != nil { + log.Error("Failed to dump heap", + zap.Error(err), + zap.String("path", dumpPath)) + continue + } + + log.Info("Heap dump created", + zap.String("path", dumpPath), + zap.Float64("usage_percentage", memoryUsage*100), + zap.Uint64("alloc_mb", memStats.Alloc/units.MiB)) + + lastDump = time.Now() + + // clean up old dumps after creating a new one + cleanupOldHeapDumps(cfg.DumpDir) + } + } + } + }() + + return nil +} + +// dumpHeap creates a heap profile at the specified path +func dumpHeap(path string) error { + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("failed to create heap dump file: %v", err) + } + defer f.Close() + + if err := pprof.WriteHeapProfile(f); err != nil { + return fmt.Errorf("failed to write heap profile: %v", err) + } + + return nil +} diff --git a/br/pkg/utils/memory_monitor_test.go b/br/pkg/utils/memory_monitor_test.go new file mode 100644 index 0000000000000..f36225ae5b516 --- /dev/null +++ b/br/pkg/utils/memory_monitor_test.go @@ -0,0 +1,176 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. + +package utils + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + testTimeGap = time.Minute + testMemoryLimit = 100 * 1024 * 1024 + smallMemoryLimit = 1024 + testWaitTime = 100 * time.Millisecond + testDumpContent = "test heap dump %d" + testFilePerm = 0644 + readOnlyDirPerm = 0555 +) + +func createTestHeapDumps(t *testing.T, dir string, count int, timeGap time.Duration) []string { + var files []string + baseTime := time.Now().Add(-timeGap * time.Duration(count)) + + for i := 0; i < count; i++ { + fileName := fmt.Sprintf("br_heap_%s_%d.pprof", + baseTime.Add(timeGap*time.Duration(i)).Format(TimeFormat), + 100+i) + path := filepath.Join(dir, fileName) + + err := os.WriteFile(path, []byte(fmt.Sprintf(testDumpContent, i)), testFilePerm) + require.NoError(t, err) + files = append(files, path) + } + return files +} + +func TestCleanupOldHeapDumps(t *testing.T) { + t.Run("cleanup works correctly", func(t *testing.T) { + dir := t.TempDir() + files := createTestHeapDumps(t, dir, MaxHeapDumps+2, testTimeGap) + sort.Strings(files) + + // verify all test files were created + for _, f := range files { + _, err := os.Stat(f) + require.NoError(t, err) + } + + cleanupOldHeapDumps(dir) + + // verify newest files exist + for i := len(files) - MaxHeapDumps; i < len(files); i++ { + _, err := os.Stat(files[i]) + require.NoError(t, err) + } + + // verify oldest files are gone + for i := 0; i < len(files)-MaxHeapDumps; i++ { + _, err := os.Stat(files[i]) + require.True(t, os.IsNotExist(err)) + } + }) + + t.Run("no cleanup needed when files <= MaxHeapDumps", func(t *testing.T) { + dir := t.TempDir() + files := createTestHeapDumps(t, dir, MaxHeapDumps, testTimeGap) + cleanupOldHeapDumps(dir) + + for _, f := range files { + _, err := os.Stat(f) + require.NoError(t, err) + } + }) + + t.Run("handles non-existent directory", func(t *testing.T) { + dir := t.TempDir() + nonExistentDir := filepath.Join(dir, "non-existent") + cleanupOldHeapDumps(nonExistentDir) + }) + + t.Run("handles empty directory", func(t *testing.T) { + dir := t.TempDir() + emptyDir := filepath.Join(dir, "empty") + err := os.MkdirAll(emptyDir, DumpDirPerm) + require.NoError(t, err) + cleanupOldHeapDumps(emptyDir) + }) +} + +func TestStartMemoryMonitor(t *testing.T) { + dir := t.TempDir() + + t.Run("creates dump directory if not exists", func(t *testing.T) { + dumpDir := filepath.Join(dir, "new-dir") + cfg := MemoryMonitorConfig{ + DumpDir: dumpDir, + MemoryLimit: testMemoryLimit, + } + + err := StartMemoryMonitor(context.Background(), cfg) + require.NoError(t, err) + + stat, err := os.Stat(dumpDir) + require.NoError(t, err) + require.True(t, stat.IsDir()) + }) + + t.Run("uses temp dir as fallback", func(t *testing.T) { + readOnlyDir := filepath.Join(dir, "readonly") + err := os.MkdirAll(readOnlyDir, readOnlyDirPerm) + require.NoError(t, err) + + cfg := MemoryMonitorConfig{ + DumpDir: readOnlyDir, + MemoryLimit: testMemoryLimit, + } + + err = StartMemoryMonitor(context.Background(), cfg) + require.NoError(t, err) + }) + + t.Run("respects context cancellation", func(t *testing.T) { + cfg := MemoryMonitorConfig{ + DumpDir: dir, + MemoryLimit: smallMemoryLimit, + } + + ctx, cancel := context.WithCancel(context.Background()) + err := StartMemoryMonitor(ctx, cfg) + require.NoError(t, err) + + time.Sleep(testWaitTime) + cancel() + + time.Sleep(2 * MonitorInterval) + files, err := filepath.Glob(filepath.Join(dir, "br_heap_*.pprof")) + require.NoError(t, err) + fileCount := len(files) + + time.Sleep(2 * MonitorInterval) + files, err = filepath.Glob(filepath.Join(dir, "br_heap_*.pprof")) + require.NoError(t, err) + require.Equal(t, fileCount, len(files)) + }) + + t.Run("enforces minimum dump interval", func(t *testing.T) { + cfg := MemoryMonitorConfig{ + DumpDir: dir, + MemoryLimit: smallMemoryLimit, + MinDumpInterval: time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := StartMemoryMonitor(ctx, cfg) + require.NoError(t, err) + + time.Sleep(testWaitTime) + files, err := filepath.Glob(filepath.Join(dir, "br_heap_*.pprof")) + require.NoError(t, err) + initialCount := len(files) + + time.Sleep(500 * time.Millisecond) + files, err = filepath.Glob(filepath.Join(dir, "br_heap_*.pprof")) + require.NoError(t, err) + require.Equal(t, initialCount, len(files)) + }) +} From e259425b31f087f7d9091eb5bcc74fa4b6be83db Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Fri, 31 Jan 2025 11:38:48 -0500 Subject: [PATCH 2/4] minor fix Signed-off-by: Wenqi Mou --- br/pkg/utils/BUILD.bazel | 4 +++- br/pkg/utils/memory_monitor_test.go | 14 +++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 85d5d3c36c7a1..71ed4b9d1b2ec 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "error_handling.go", "json.go", "key.go", + "memory_monitor.go", "misc.go", "pointer.go", "pprof.go", @@ -78,6 +79,7 @@ go_test( "json_test.go", "key_test.go", "main_test.go", + "memory_monitor_test.go", "misc_test.go", "progress_test.go", "register_test.go", @@ -86,7 +88,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 34, + shard_count = 36, deps = [ "//br/pkg/errors", "//pkg/kv", diff --git a/br/pkg/utils/memory_monitor_test.go b/br/pkg/utils/memory_monitor_test.go index f36225ae5b516..b54ddbc88ae21 100644 --- a/br/pkg/utils/memory_monitor_test.go +++ b/br/pkg/utils/memory_monitor_test.go @@ -98,13 +98,16 @@ func TestStartMemoryMonitor(t *testing.T) { dir := t.TempDir() t.Run("creates dump directory if not exists", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + dumpDir := filepath.Join(dir, "new-dir") cfg := MemoryMonitorConfig{ DumpDir: dumpDir, MemoryLimit: testMemoryLimit, } - err := StartMemoryMonitor(context.Background(), cfg) + err := StartMemoryMonitor(ctx, cfg) require.NoError(t, err) stat, err := os.Stat(dumpDir) @@ -113,6 +116,9 @@ func TestStartMemoryMonitor(t *testing.T) { }) t.Run("uses temp dir as fallback", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + readOnlyDir := filepath.Join(dir, "readonly") err := os.MkdirAll(readOnlyDir, readOnlyDirPerm) require.NoError(t, err) @@ -122,7 +128,7 @@ func TestStartMemoryMonitor(t *testing.T) { MemoryLimit: testMemoryLimit, } - err = StartMemoryMonitor(context.Background(), cfg) + err = StartMemoryMonitor(ctx, cfg) require.NoError(t, err) }) @@ -133,6 +139,8 @@ func TestStartMemoryMonitor(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := StartMemoryMonitor(ctx, cfg) require.NoError(t, err) @@ -158,7 +166,7 @@ func TestStartMemoryMonitor(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Cleanup(cancel) err := StartMemoryMonitor(ctx, cfg) require.NoError(t, err) From adbd4809c6aad37b3bc9e8485cd197ec8046b970 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Fri, 31 Jan 2025 12:18:28 -0500 Subject: [PATCH 3/4] fix leak Signed-off-by: Wenqi Mou --- br/cmd/br/cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index 192a0f88c2f60..354796b5ce2c8 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -257,7 +257,7 @@ func Init(cmd *cobra.Command) (err error) { return } - if e := setupMemoryMonitoring(cmd.Context(), memtotal, memused); e != nil { + if e := setupMemoryMonitoring(GetDefaultContext(), memtotal, memused); e != nil { // only log the error, don't fail initialization log.Error("Failed to setup memory monitoring", zap.Error(e)) } From e66a0c00dd3f08e9f782e24b21c1501a09b74823 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Fri, 31 Jan 2025 13:32:37 -0500 Subject: [PATCH 4/4] should cancel anyway Signed-off-by: Wenqi Mou --- br/cmd/br/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index cad081606a0ea..b0f1931d86ea2 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -13,6 +13,7 @@ import ( func main() { gCtx := context.Background() ctx, cancel := utils.StartExitSingleListener(gCtx) + defer cancel() rootCmd := &cobra.Command{ Use: "br", @@ -34,7 +35,6 @@ func main() { rootCmd.SetArgs(os.Args[1:]) if err := rootCmd.Execute(); err != nil { - cancel() log.Error("br failed", zap.Error(err)) os.Exit(1) // nolint:gocritic }