Skip to content

Commit

Permalink
changes after review.
Browse files Browse the repository at this point in the history
  • Loading branch information
soundvibe committed Jan 29, 2021
1 parent eb7a649 commit 95cb7e3
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type instrumentationContext struct {
bootstrapSnapshotsDuration tally.Timer
bootstrapCommitLogDuration tally.Timer
pOpts profiler.Options
pCtx profiler.ProfileContext
}

func newInstrumentationContext(
Expand All @@ -65,24 +64,37 @@ func newInstrumentationContext(
}
}

const (
snapshotsProfileName = "commitlog-snapshots"
readProfileName = "commitlog-read"
)

func (i *instrumentationContext) finish() {
i.span.Finish()
}

func (i *instrumentationContext) startProfileIfEnabled(name string) {
func (i *instrumentationContext) startCPUProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
profileContext, err := i.pOpts.Profiler().StartProfile(name)
err := i.pOpts.Profiler().StartCPUProfile(name)
if err != nil {
i.log.Error("unable to start profile", zap.Error(err))
i.log.Error("unable to start cpu profile", zap.Error(err))
}
i.pCtx = profileContext
}
}

func (i *instrumentationContext) stopProfileIfEnabled() {
if i.pOpts.Enabled() && i.pCtx != nil {
if err := i.pCtx.StopProfile(); err != nil {
i.log.Error("unable to stop profile", zap.Error(err))
func (i *instrumentationContext) stopCPUProfileIfEnabled() {
if i.pOpts.Enabled() {
if err := i.pOpts.Profiler().StopCPUProfile(); err != nil {
i.log.Error("unable to stop cpu profile", zap.Error(err))
}
}
}

func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
err := i.pOpts.Profiler().WriteHeapProfile(name)
if err != nil {
i.log.Error("unable to write heap profile", zap.Error(err))
}
}
}
Expand All @@ -91,30 +103,34 @@ func (i *instrumentationContext) bootstrapSnapshotsStarted() {
i.log.Info("read snapshots start")
i.span.LogFields(opentracinglog.String("event", "read_snapshots_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("commitlog-snapshots")
i.startCPUProfileIfEnabled(snapshotsProfileName)
i.writeHeapProfileIfEnabled(snapshotsProfileName)
}

func (i *instrumentationContext) bootstrapSnapshotsCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapSnapshotsDuration.Record(duration)
i.log.Info("read snapshots done", zap.Duration("took", duration))
i.span.LogFields(opentracinglog.String("event", "read_snapshots_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(snapshotsProfileName)
}

func (i *instrumentationContext) readCommitLogStarted() {
i.log.Info("read commit log start")
i.span.LogFields(opentracinglog.String("event", "read_commitlog_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("commitlog-read")
i.startCPUProfileIfEnabled(readProfileName)
i.writeHeapProfileIfEnabled(readProfileName)
}

func (i *instrumentationContext) readCommitLogCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapCommitLogDuration.Record(duration)
i.log.Info("read commit log done", zap.Duration("took", duration))
i.span.LogFields(opentracinglog.String("event", "read_commitlog_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(readProfileName)
}

type instrumentation struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type instrumentationContext struct {
bootstrapDataDuration tally.Timer
bootstrapIndexDuration tally.Timer
pOpts profiler.Options
pCtx profiler.ProfileContext
logFields []zapcore.Field
}

Expand All @@ -71,24 +70,37 @@ func newInstrumentationContext(
}
}

const (
dataProfile = "fs-data"
indexProfile = "fs-index"
)

func (i *instrumentationContext) finish() {
i.span.Finish()
}

func (i *instrumentationContext) startProfileIfEnabled(name string) {
func (i *instrumentationContext) startCPUProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
profileContext, err := i.pOpts.Profiler().StartProfile(name)
err := i.pOpts.Profiler().StartCPUProfile(name)
if err != nil {
i.log.Error("unable to start profile", zap.Error(err))
i.log.Error("unable to start cpu profile", zap.Error(err))
}
i.pCtx = profileContext
}
}

func (i *instrumentationContext) stopProfileIfEnabled() {
if i.pOpts.Enabled() && i.pCtx != nil {
if err := i.pCtx.StopProfile(); err != nil {
i.log.Error("unable to stop profile", zap.Error(err))
func (i *instrumentationContext) stopCPUProfileIfEnabled() {
if i.pOpts.Enabled() {
if err := i.pOpts.Profiler().StopCPUProfile(); err != nil {
i.log.Error("unable to stop cpu profile", zap.Error(err))
}
}
}

func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
err := i.pOpts.Profiler().WriteHeapProfile(name)
if err != nil {
i.log.Error("unable to write heap profile", zap.Error(err))
}
}
}
Expand All @@ -97,7 +109,8 @@ func (i *instrumentationContext) bootstrapDataStarted() {
i.log.Info("bootstrapping time series data start", i.logFields...)
i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("fs-data")
i.startCPUProfileIfEnabled(dataProfile)
i.writeHeapProfileIfEnabled(dataProfile)
}

func (i *instrumentationContext) bootstrapDataCompleted() {
Expand All @@ -106,22 +119,25 @@ func (i *instrumentationContext) bootstrapDataCompleted() {
i.log.Info("bootstrapping time series data success",
append(i.logFields, zap.Duration("took", duration))...)
i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(dataProfile)
}

func (i *instrumentationContext) bootstrapIndexStarted() {
i.log.Info("bootstrapping index metadata start")
i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("fs-index")
i.startCPUProfileIfEnabled(indexProfile)
i.writeHeapProfileIfEnabled(indexProfile)
}

func (i *instrumentationContext) bootstrapIndexCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapIndexDuration.Record(duration)
i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration))
i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(indexProfile)
}

type instrumentation struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type instrumentationContext struct {
bootstrapDataDuration tally.Timer
bootstrapIndexDuration tally.Timer
pOpts profiler.Options
pCtx profiler.ProfileContext
}

func newInstrumentationContext(
Expand All @@ -63,24 +62,37 @@ func newInstrumentationContext(
}
}

const (
dataProfile = "peers-data"
indexProfile = "peers-index"
)

func (i *instrumentationContext) finish() {
i.span.Finish()
}

func (i *instrumentationContext) startProfileIfEnabled(name string) {
func (i *instrumentationContext) startCPUProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
profileContext, err := i.pOpts.Profiler().StartProfile(name)
err := i.pOpts.Profiler().StartCPUProfile(name)
if err != nil {
i.log.Error("unable to start profile", zap.Error(err))
i.log.Error("unable to start cpu profile", zap.Error(err))
}
i.pCtx = profileContext
}
}

func (i *instrumentationContext) stopProfileIfEnabled() {
if i.pOpts.Enabled() && i.pCtx != nil {
if err := i.pCtx.StopProfile(); err != nil {
i.log.Error("unable to stop profile", zap.Error(err))
func (i *instrumentationContext) stopCPUProfileIfEnabled() {
if i.pOpts.Enabled() {
if err := i.pOpts.Profiler().StopCPUProfile(); err != nil {
i.log.Error("unable to stop cpu profile", zap.Error(err))
}
}
}

func (i *instrumentationContext) writeHeapProfileIfEnabled(name string) {
if i.pOpts.Enabled() {
err := i.pOpts.Profiler().WriteHeapProfile(name)
if err != nil {
i.log.Error("unable to write heap profile", zap.Error(err))
}
}
}
Expand All @@ -89,30 +101,34 @@ func (i *instrumentationContext) bootstrapDataStarted() {
i.log.Info("bootstrapping time series data start")
i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("peers-data")
i.startCPUProfileIfEnabled(dataProfile)
i.writeHeapProfileIfEnabled(dataProfile)
}

func (i *instrumentationContext) bootstrapDataCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapDataDuration.Record(duration)
i.log.Info("bootstrapping time series data success", zap.Duration("took", duration))
i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(dataProfile)
}

func (i *instrumentationContext) bootstrapIndexStarted() {
i.log.Info("bootstrapping index metadata start")
i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start"))
i.start = i.nowFn()
i.startProfileIfEnabled("peers-index")
i.startCPUProfileIfEnabled(indexProfile)
i.writeHeapProfileIfEnabled(indexProfile)
}

func (i *instrumentationContext) bootstrapIndexCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapIndexDuration.Record(duration)
i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration))
i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done"))
i.stopProfileIfEnabled()
i.stopCPUProfileIfEnabled()
i.writeHeapProfileIfEnabled(indexProfile)
}

type instrumentationReadShardsContext struct {
Expand Down
70 changes: 35 additions & 35 deletions src/dbnode/storage/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package profiler

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -36,18 +35,6 @@ const (
ProfileFileExtension = ".pb.gz"
)

// FileProfileContext is file profiler context data.
type FileProfileContext struct {
path string
profileName *profileName
}

// StopProfile stops started profile.
func (f FileProfileContext) StopProfile() error {
stopCPUProfile()
return writeHeapProfile(f.path, f.profileName)
}

// FileProfiler is profiler which writes its profiles to given path directory.
type FileProfiler struct {
path string
Expand All @@ -62,21 +49,31 @@ func NewFileProfiler(path string) *FileProfiler {
}
}

// StartProfile starts a new named profile.
func (f FileProfiler) StartProfile(name string) (ProfileContext, error) {
// StartCPUProfile starts named cpu profile.
func (f FileProfiler) StartCPUProfile(name string) error {
profileName := f.profileName(name)
return startCPUProfile(f.path, profileName)
}

// StopCPUProfile stops started cpu profile.
func (f FileProfiler) StopCPUProfile() error {
stopCPUProfile()
return nil
}

// WriteHeapProfile writes named heap profile.
func (f FileProfiler) WriteHeapProfile(name string) error {
profileName := f.profileName(name)
return writeHeapProfile(f.path, profileName)
}

func (f FileProfiler) profileName(name string) *profileName {
profileName, ok := f.profileNames[name]
if !ok {
profileName = newProfileName(name)
f.profileNames[name] = profileName
}
if err := startCPUProfile(f.path, profileName); err != nil {
return nil, err
}

return FileProfileContext{
path: f.path,
profileName: profileName,
}, nil
return profileName
}

type profileType int
Expand Down Expand Up @@ -129,15 +126,13 @@ func startCPUProfile(path string, profileName *profileName) error {
return err
}

forceStartCPUProfile(file)
return nil
}

func forceStartCPUProfile(writer io.Writer) {
if err := pprof.StartCPUProfile(writer); err != nil {
// cpu profile is already started, so we stop it and start our own.
pprof.StopCPUProfile()
forceStartCPUProfile(writer)
for {
if err := pprof.StartCPUProfile(file); err != nil {
// cpu profile is already started, so we stop it and start our own.
pprof.StopCPUProfile()
continue
}
return nil
}
}

Expand Down Expand Up @@ -170,7 +165,12 @@ func newProfileFile(path string, profileName *profileName, pType profileType) (*
return nil, err
}

filename := fmt.Sprintf("%s.%d%s",
profileName.withProfileType(pType), profileName.inc(pType), ProfileFileExtension)
return os.Create(filepath.Join(path, filename))
for {
filename := fmt.Sprintf("%s.%d%s",
profileName.withProfileType(pType), profileName.inc(pType), ProfileFileExtension)
// fails if a file with given name exists
if file, err := os.OpenFile(filepath.Join(path, filepath.Clean(filename)), os.O_CREATE|os.O_EXCL, 0600); err == nil {
return file, nil
}
}
}
Loading

0 comments on commit 95cb7e3

Please sign in to comment.