From 95cb7e357d1189fdc765759f57b3ccb4537d5f85 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 29 Jan 2021 17:22:00 +0200 Subject: [PATCH] changes after review. --- .../commitlog/source_instrumentation.go | 42 +++++++---- .../bootstrapper/fs/source_instrumentation.go | 42 +++++++---- .../peers/source_instrumentation.go | 42 +++++++---- src/dbnode/storage/profiler/profiler.go | 70 +++++++++---------- src/dbnode/storage/profiler/profiler_test.go | 6 +- src/dbnode/storage/profiler/types.go | 38 +++------- 6 files changed, 134 insertions(+), 106 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_instrumentation.go index 74d5a117d9..8895706022 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_instrumentation.go @@ -43,7 +43,6 @@ type instrumentationContext struct { bootstrapSnapshotsDuration tally.Timer bootstrapCommitLogDuration tally.Timer pOpts profiler.Options - pCtx profiler.ProfileContext } func newInstrumentationContext( @@ -65,24 +64,37 @@ func newInstrumentationContext( } } +const ( + snapshotsProfileName = "commitlog-snapshots" + readProfileName = "commitlog-read" +) + func (i *instrumentationContext) finish() { i.span.Finish() } -func (i *instrumentationContext) startProfileIfEnabled(name string) { +func (i *instrumentationContext) startCPUProfileIfEnabled(name string) { if i.pOpts.Enabled() { - profileContext, err := i.pOpts.Profiler().StartProfile(name) + err := i.pOpts.Profiler().StartCPUProfile(name) if err != nil { - i.log.Error("unable to start profile", zap.Error(err)) + i.log.Error("unable to start cpu profile", zap.Error(err)) } - i.pCtx = profileContext } } -func (i *instrumentationContext) stopProfileIfEnabled() { - if i.pOpts.Enabled() && i.pCtx != nil { - if err := i.pCtx.StopProfile(); err != nil { - i.log.Error("unable to stop profile", zap.Error(err)) +func (i *instrumentationContext) stopCPUProfileIfEnabled() { + if i.pOpts.Enabled() { + if err := i.pOpts.Profiler().StopCPUProfile(); err != nil { + i.log.Error("unable to stop cpu profile", zap.Error(err)) + } + } +} + +func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) { + if i.pOpts.Enabled() { + err := i.pOpts.Profiler().WriteHeapProfile(name) + if err != nil { + i.log.Error("unable to write heap profile", zap.Error(err)) } } } @@ -91,7 +103,8 @@ func (i *instrumentationContext) bootstrapSnapshotsStarted() { i.log.Info("read snapshots start") i.span.LogFields(opentracinglog.String("event", "read_snapshots_start")) i.start = i.nowFn() - i.startProfileIfEnabled("commitlog-snapshots") + i.startCPUProfileIfEnabled(snapshotsProfileName) + i.writeHeapProfileIfEnabled(snapshotsProfileName) } func (i *instrumentationContext) bootstrapSnapshotsCompleted() { @@ -99,14 +112,16 @@ func (i *instrumentationContext) bootstrapSnapshotsCompleted() { i.bootstrapSnapshotsDuration.Record(duration) i.log.Info("read snapshots done", zap.Duration("took", duration)) i.span.LogFields(opentracinglog.String("event", "read_snapshots_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(snapshotsProfileName) } func (i *instrumentationContext) readCommitLogStarted() { i.log.Info("read commit log start") i.span.LogFields(opentracinglog.String("event", "read_commitlog_start")) i.start = i.nowFn() - i.startProfileIfEnabled("commitlog-read") + i.startCPUProfileIfEnabled(readProfileName) + i.writeHeapProfileIfEnabled(readProfileName) } func (i *instrumentationContext) readCommitLogCompleted() { @@ -114,7 +129,8 @@ func (i *instrumentationContext) readCommitLogCompleted() { i.bootstrapCommitLogDuration.Record(duration) i.log.Info("read commit log done", zap.Duration("took", duration)) i.span.LogFields(opentracinglog.String("event", "read_commitlog_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(readProfileName) } type instrumentation struct { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_instrumentation.go index db0cbb5d60..7016f6a6de 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_instrumentation.go @@ -45,7 +45,6 @@ type instrumentationContext struct { bootstrapDataDuration tally.Timer bootstrapIndexDuration tally.Timer pOpts profiler.Options - pCtx profiler.ProfileContext logFields []zapcore.Field } @@ -71,24 +70,37 @@ func newInstrumentationContext( } } +const ( + dataProfile = "fs-data" + indexProfile = "fs-index" +) + func (i *instrumentationContext) finish() { i.span.Finish() } -func (i *instrumentationContext) startProfileIfEnabled(name string) { +func (i *instrumentationContext) startCPUProfileIfEnabled(name string) { if i.pOpts.Enabled() { - profileContext, err := i.pOpts.Profiler().StartProfile(name) + err := i.pOpts.Profiler().StartCPUProfile(name) if err != nil { - i.log.Error("unable to start profile", zap.Error(err)) + i.log.Error("unable to start cpu profile", zap.Error(err)) } - i.pCtx = profileContext } } -func (i *instrumentationContext) stopProfileIfEnabled() { - if i.pOpts.Enabled() && i.pCtx != nil { - if err := i.pCtx.StopProfile(); err != nil { - i.log.Error("unable to stop profile", zap.Error(err)) +func (i *instrumentationContext) stopCPUProfileIfEnabled() { + if i.pOpts.Enabled() { + if err := i.pOpts.Profiler().StopCPUProfile(); err != nil { + i.log.Error("unable to stop cpu profile", zap.Error(err)) + } + } +} + +func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) { + if i.pOpts.Enabled() { + err := i.pOpts.Profiler().WriteHeapProfile(name) + if err != nil { + i.log.Error("unable to write heap profile", zap.Error(err)) } } } @@ -97,7 +109,8 @@ func (i *instrumentationContext) bootstrapDataStarted() { i.log.Info("bootstrapping time series data start", i.logFields...) i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) i.start = i.nowFn() - i.startProfileIfEnabled("fs-data") + i.startCPUProfileIfEnabled(dataProfile) + i.writeHeapProfileIfEnabled(dataProfile) } func (i *instrumentationContext) bootstrapDataCompleted() { @@ -106,14 +119,16 @@ func (i *instrumentationContext) bootstrapDataCompleted() { i.log.Info("bootstrapping time series data success", append(i.logFields, zap.Duration("took", duration))...) i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(dataProfile) } func (i *instrumentationContext) bootstrapIndexStarted() { i.log.Info("bootstrapping index metadata start") i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) i.start = i.nowFn() - i.startProfileIfEnabled("fs-index") + i.startCPUProfileIfEnabled(indexProfile) + i.writeHeapProfileIfEnabled(indexProfile) } func (i *instrumentationContext) bootstrapIndexCompleted() { @@ -121,7 +136,8 @@ func (i *instrumentationContext) bootstrapIndexCompleted() { i.bootstrapIndexDuration.Record(duration) i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(indexProfile) } type instrumentation struct { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go index 2d74d99875..96950a038d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -43,7 +43,6 @@ type instrumentationContext struct { bootstrapDataDuration tally.Timer bootstrapIndexDuration tally.Timer pOpts profiler.Options - pCtx profiler.ProfileContext } func newInstrumentationContext( @@ -63,24 +62,37 @@ func newInstrumentationContext( } } +const ( + dataProfile = "peers-data" + indexProfile = "peers-index" +) + func (i *instrumentationContext) finish() { i.span.Finish() } -func (i *instrumentationContext) startProfileIfEnabled(name string) { +func (i *instrumentationContext) startCPUProfileIfEnabled(name string) { if i.pOpts.Enabled() { - profileContext, err := i.pOpts.Profiler().StartProfile(name) + err := i.pOpts.Profiler().StartCPUProfile(name) if err != nil { - i.log.Error("unable to start profile", zap.Error(err)) + i.log.Error("unable to start cpu profile", zap.Error(err)) } - i.pCtx = profileContext } } -func (i *instrumentationContext) stopProfileIfEnabled() { - if i.pOpts.Enabled() && i.pCtx != nil { - if err := i.pCtx.StopProfile(); err != nil { - i.log.Error("unable to stop profile", zap.Error(err)) +func (i *instrumentationContext) stopCPUProfileIfEnabled() { + if i.pOpts.Enabled() { + if err := i.pOpts.Profiler().StopCPUProfile(); err != nil { + i.log.Error("unable to stop cpu profile", zap.Error(err)) + } + } +} + +func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) { + if i.pOpts.Enabled() { + err := i.pOpts.Profiler().WriteHeapProfile(name) + if err != nil { + i.log.Error("unable to write heap profile", zap.Error(err)) } } } @@ -89,7 +101,8 @@ func (i *instrumentationContext) bootstrapDataStarted() { i.log.Info("bootstrapping time series data start") i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) i.start = i.nowFn() - i.startProfileIfEnabled("peers-data") + i.startCPUProfileIfEnabled(dataProfile) + i.writeHeapProfileIfEnabled(dataProfile) } func (i *instrumentationContext) bootstrapDataCompleted() { @@ -97,14 +110,16 @@ func (i *instrumentationContext) bootstrapDataCompleted() { i.bootstrapDataDuration.Record(duration) i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(dataProfile) } func (i *instrumentationContext) bootstrapIndexStarted() { i.log.Info("bootstrapping index metadata start") i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) i.start = i.nowFn() - i.startProfileIfEnabled("peers-index") + i.startCPUProfileIfEnabled(indexProfile) + i.writeHeapProfileIfEnabled(indexProfile) } func (i *instrumentationContext) bootstrapIndexCompleted() { @@ -112,7 +127,8 @@ func (i *instrumentationContext) bootstrapIndexCompleted() { i.bootstrapIndexDuration.Record(duration) i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) - i.stopProfileIfEnabled() + i.stopCPUProfileIfEnabled() + i.writeHeapProfileIfEnabled(indexProfile) } type instrumentationReadShardsContext struct { diff --git a/src/dbnode/storage/profiler/profiler.go b/src/dbnode/storage/profiler/profiler.go index e4b3a13ac8..207c0090c3 100644 --- a/src/dbnode/storage/profiler/profiler.go +++ b/src/dbnode/storage/profiler/profiler.go @@ -23,7 +23,6 @@ package profiler import ( "fmt" - "io" "io/ioutil" "os" "path/filepath" @@ -36,18 +35,6 @@ const ( ProfileFileExtension = ".pb.gz" ) -// FileProfileContext is file profiler context data. -type FileProfileContext struct { - path string - profileName *profileName -} - -// StopProfile stops started profile. -func (f FileProfileContext) StopProfile() error { - stopCPUProfile() - return writeHeapProfile(f.path, f.profileName) -} - // FileProfiler is profiler which writes its profiles to given path directory. type FileProfiler struct { path string @@ -62,21 +49,31 @@ func NewFileProfiler(path string) *FileProfiler { } } -// StartProfile starts a new named profile. -func (f FileProfiler) StartProfile(name string) (ProfileContext, error) { +// StartCPUProfile starts named cpu profile. +func (f FileProfiler) StartCPUProfile(name string) error { + profileName := f.profileName(name) + return startCPUProfile(f.path, profileName) +} + +// StopCPUProfile stops started cpu profile. +func (f FileProfiler) StopCPUProfile() error { + stopCPUProfile() + return nil +} + +// WriteHeapProfile writes named heap profile. +func (f FileProfiler) WriteHeapProfile(name string) error { + profileName := f.profileName(name) + return writeHeapProfile(f.path, profileName) +} + +func (f FileProfiler) profileName(name string) *profileName { profileName, ok := f.profileNames[name] if !ok { profileName = newProfileName(name) f.profileNames[name] = profileName } - if err := startCPUProfile(f.path, profileName); err != nil { - return nil, err - } - - return FileProfileContext{ - path: f.path, - profileName: profileName, - }, nil + return profileName } type profileType int @@ -129,15 +126,13 @@ func startCPUProfile(path string, profileName *profileName) error { return err } - forceStartCPUProfile(file) - return nil -} - -func forceStartCPUProfile(writer io.Writer) { - if err := pprof.StartCPUProfile(writer); err != nil { - // cpu profile is already started, so we stop it and start our own. - pprof.StopCPUProfile() - forceStartCPUProfile(writer) + for { + if err := pprof.StartCPUProfile(file); err != nil { + // cpu profile is already started, so we stop it and start our own. + pprof.StopCPUProfile() + continue + } + return nil } } @@ -170,7 +165,12 @@ func newProfileFile(path string, profileName *profileName, pType profileType) (* return nil, err } - filename := fmt.Sprintf("%s.%d%s", - profileName.withProfileType(pType), profileName.inc(pType), ProfileFileExtension) - return os.Create(filepath.Join(path, filename)) + for { + filename := fmt.Sprintf("%s.%d%s", + profileName.withProfileType(pType), profileName.inc(pType), ProfileFileExtension) + // fails if a file with given name exists + if file, err := os.OpenFile(filepath.Join(path, filepath.Clean(filename)), os.O_CREATE|os.O_EXCL, 0600); err == nil { + return file, nil + } + } } diff --git a/src/dbnode/storage/profiler/profiler_test.go b/src/dbnode/storage/profiler/profiler_test.go index 859e69b685..fa95080833 100644 --- a/src/dbnode/storage/profiler/profiler_test.go +++ b/src/dbnode/storage/profiler/profiler_test.go @@ -72,10 +72,12 @@ func TestFileProfile(t *testing.T) { sut := NewFileProfiler(tmpDir) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - profile, err := sut.StartProfile(tt.name) + err := sut.StartCPUProfile(tt.name) require.NoError(t, err) time.Sleep(100 * time.Millisecond) - err = profile.StopProfile() + err = sut.StopCPUProfile() + require.NoError(t, err) + err = sut.WriteHeapProfile(tt.name) require.NoError(t, err) for _, fileName := range tt.fileNames { diff --git a/src/dbnode/storage/profiler/types.go b/src/dbnode/storage/profiler/types.go index f288405622..f11b580902 100644 --- a/src/dbnode/storage/profiler/types.go +++ b/src/dbnode/storage/profiler/types.go @@ -18,40 +18,18 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -/* - * // Copyright (c) 2021 Uber Technologies, Inc. - * // - * // Permission is hereby granted, free of charge, to any person obtaining a copy - * // of this software and associated documentation files (the "Software"), to deal - * // in the Software without restriction, including without limitation the rights - * // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * // copies of the Software, and to permit persons to whom the Software is - * // furnished to do so, subject to the following conditions: - * // - * // The above copyright notice and this permission notice shall be included in - * // all copies or substantial portions of the Software. - * // - * // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * // THE SOFTWARE. - */ - package profiler -// ProfileContext represents the context of started profile. -type ProfileContext interface { - // StopProfile stops previously started profile. - StopProfile() error -} - // Profiler represents profiler for profiling long-running tasks. type Profiler interface { - // StartProfile starts the named profile and returns profile context. - StartProfile(name string) (ProfileContext, error) + // StartCPUProfile starts the named cpu profile. + StartCPUProfile(name string) error + + // StopCPUProfile stops started cpu profile. + StopCPUProfile() error + + // WriteHeapProfile writes heap profile. + WriteHeapProfile(name string) error } // Options represents the profiler options.