Skip to content

Commit

Permalink
changes after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
soundvibe committed Jan 13, 2021
1 parent 4097249 commit e765375
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 115 deletions.
9 changes: 3 additions & 6 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"
)

Expand Down Expand Up @@ -168,7 +167,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.instrumentation.bootstrapFnFailed(i + 1)
m.instrumentation.bootstrapFailed(i + 1)
m.sleepFn(bootstrapRetryInterval)
continue
}
Expand Down Expand Up @@ -221,8 +220,7 @@ func (m *bootstrapManager) bootstrap() error {
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions,
instrCtx.logFn(err, "could not close bootstrap data accumulator"))
instrCtx.emitAndLogInvariantViolation(err, "could not close bootstrap data accumulator")
}
}
}()
Expand Down Expand Up @@ -321,8 +319,7 @@ func (m *bootstrapManager) bootstrap() error {
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions,
instrCtx.logFn(err, "bootstrap failed"))
instrCtx.emitAndLogInvariantViolation(err, "bootstrap failed")
return err
}

Expand Down
60 changes: 38 additions & 22 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
type peersSource struct {
opts Options
newPersistManager func() (persist.Manager, error)
log *zap.Logger
instrumentation *instrumentation
}

Expand All @@ -73,12 +74,14 @@ func newPeersSource(opts Options) (bootstrap.Source, error) {
return nil, err
}

instrumentation := newInstrumentation(opts)
return &peersSource{
opts: opts,
newPersistManager: func() (persist.Manager, error) {
return fs.NewPersistManager(opts.FilesystemOptions())
},
instrumentation: newInstrumentation(opts),
log: instrumentation.log,
instrumentation: instrumentation,
}, nil
}

Expand Down Expand Up @@ -170,7 +173,8 @@ func (s *peersSource) Read(
namespace := elem.Value()
md := namespace.Metadata
if !md.Options().IndexOptions().Enabled() {
instrCtx.bootstrapIndexSkipped(md.ID())
s.log.Info("skipping bootstrap for namespace based on options",
zap.Stringer("namespace", md.ID()))
continue
}

Expand Down Expand Up @@ -230,7 +234,7 @@ func (s *peersSource) readData(
result := result.NewDataBootstrapResult()
session, err := s.opts.AdminClient().DefaultAdminSession()
if err != nil {
s.instrumentation.getDefaultAdminSessionFailed(err)
s.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err))
result.SetUnfulfilled(shardTimeRanges)
return nil, err
}
Expand All @@ -251,6 +255,7 @@ func (s *peersSource) readData(
}

instrCtx := s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist)
defer instrCtx.bootstrapShardsCompleted()
if shouldPersist {
// Spin up persist workers.
for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ {
Expand Down Expand Up @@ -294,7 +299,6 @@ func (s *peersSource) readData(
}
}

instrCtx.bootstrapShardsCompleted()
return result, nil
}

Expand Down Expand Up @@ -353,7 +357,8 @@ func (s *peersSource) runPersistenceQueueWorkerLoop(
}

// Remove results and make unfulfilled if an error occurred.
s.instrumentation.persistenceFlushFailed(err)
s.log.Error("peers bootstrapper bootstrap with persistence flush encountered error",
zap.Error(err))

// Make unfulfilled.
lock.Lock()
Expand Down Expand Up @@ -431,14 +436,14 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers(
continue
}
unfulfill(currRange)
s.instrumentation.seriesCheckoutFailed(err)
s.log.Error("could not checkout series", zap.Error(err))
continue
}

for _, block := range entry.Blocks.AllBlocks() {
if err := ref.Series.LoadBlock(block, series.WarmWrite); err != nil {
unfulfill(currRange)
s.instrumentation.seriesLoadFailed(err)
s.log.Error("could not load series block", zap.Error(err))
}
}

Expand All @@ -456,7 +461,10 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome(
err error,
) {
if err != nil {
s.instrumentation.fetchBootstrapBlocksFailed(err, shard)
s.log.Error("error fetching bootstrap blocks",
zap.Uint32("shard", shard),
zap.Error(err),
)
return
}

Expand All @@ -469,7 +477,11 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome(
}

for block, numSeries := range shardBlockSeriesCounter {
s.instrumentation.shardBootstrapped(shard, numSeries, block.ToTime())
s.log.Info("peer bootstrapped shard",
zap.Uint32("shard", shard),
zap.Int64("numSeries", numSeries),
zap.Time("blockStart", block.ToTime()),
)
}
}

Expand Down Expand Up @@ -681,7 +693,8 @@ func (s *peersSource) readIndex(
indexSegmentConcurrency = s.opts.IndexSegmentConcurrency()
readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency)
)
s.instrumentation.peersBootstrapperIndexForRanges(count)
s.log.Info("peers bootstrapper bootstrapping index for ranges",
zap.Int("shards", count))

go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{
NsMD: ns,
Expand Down Expand Up @@ -861,7 +874,8 @@ func (s *peersSource) processReaders(
xtime.NewRanges(timeRange),
))
} else {
s.instrumentation.processingReadersFailed(err, start)
s.log.Error("error processing readers", zap.Error(err),
zap.Time("timeRange.start", start))
timesWithErrors = append(timesWithErrors, timeRange.Start)
}
}
Expand Down Expand Up @@ -905,7 +919,7 @@ func (s *peersSource) processReaders(
zap.String("remainingRanges", remainingRanges.SummaryString()),
}
if shouldPersist {
s.instrumentation.buildingFileSetIndexSegmentStarted(buildIndexLogFields)
s.log.Debug("building file set index segment", buildIndexLogFields...)
indexBlock, err = bootstrapper.PersistBootstrapIndexSegment(
ns,
requestedRanges,
Expand All @@ -932,7 +946,7 @@ func (s *peersSource) processReaders(
})
}
} else {
s.instrumentation.buildingInMemoryIndexSegmentStarted(buildIndexLogFields)
s.log.Info("building in-memory index segment", buildIndexLogFields...)
indexBlock, err = bootstrapper.BuildBootstrapIndexSegment(
ns,
requestedRanges,
Expand Down Expand Up @@ -1019,9 +1033,9 @@ func (s *peersSource) markRunResultErrorsAndUnfulfilled(
for i := range timesWithErrors {
timesWithErrorsString[i] = timesWithErrors[i].String()
}
s.instrumentation.errorsForRangeEncountered(
remainingRanges.SummaryString(),
timesWithErrorsString)
s.log.Info("encountered errors for range",
zap.String("requestedRanges", remainingRanges.SummaryString()),
zap.Strings("timesWithErrors", timesWithErrorsString))
}

if !remainingRanges.IsEmpty() {
Expand Down Expand Up @@ -1099,17 +1113,19 @@ func (s *peersSource) peerAvailability(

if available == 0 {
// Can't peer bootstrap if there are no available peers.
s.instrumentation.noPeersAvailable(total, shardIDUint)
s.log.Debug("0 available peers, unable to peer bootstrap",
zap.Int("total", total),
zap.Uint32("shard", shardIDUint))
continue
}

if !topology.ReadConsistencyAchieved(
bootstrapConsistencyLevel, majorityReplicas, total, available) {
s.instrumentation.readConsistencyNotAchieved(
bootstrapConsistencyLevel,
majorityReplicas,
total,
available)
s.log.Debug("read consistency not achieved, unable to peer bootstrap",
zap.Any("level", bootstrapConsistencyLevel),
zap.Int("replicas", majorityReplicas),
zap.Int("total", total),
zap.Int("available", available))
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
)

type instrumentationContext struct {
Expand Down Expand Up @@ -83,11 +81,6 @@ func (i *instrumentationContext) bootstrapIndexStarted() {
i.start = i.nowFn()
}

func (i *instrumentationContext) bootstrapIndexSkipped(namespaceID ident.ID) {
i.log.Info("skipping bootstrap for namespace based on options",
zap.Stringer("namespace", namespaceID))
}

func (i *instrumentationContext) bootstrapIndexCompleted() {
duration := i.nowFn().Sub(i.start)
i.bootstrapIndexDuration.Record(duration)
Expand Down Expand Up @@ -157,10 +150,6 @@ func (i *instrumentation) peersBootstrapperSourceReadStarted(
)
}

func (i *instrumentation) getDefaultAdminSessionFailed(err error) {
i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err))
}

func (i *instrumentation) bootstrapShardsStarted(
count int,
concurrency int,
Expand All @@ -177,79 +166,7 @@ func (i *instrumentation) bootstrapShardsStarted(
)
}

func (i *instrumentation) persistenceFlushFailed(err error) {
i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error",
zap.Error(err))
}

func (i *instrumentation) seriesCheckoutFailed(err error) {
i.log.Error("could not checkout series", zap.Error(err))
}

func (i *instrumentation) seriesLoadFailed(err error) {
i.log.Error("could not load series block", zap.Error(err))
}

func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) {
i.log.Info("peer bootstrapped shard",
zap.Uint32("shard", shard),
zap.Int64("numSeries", numSeries),
zap.Time("blockStart", blockTime),
)
}

func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) {
i.log.Error("error fetching bootstrap blocks",
zap.Uint32("shard", shard),
zap.Error(err),
)
}

func (i *instrumentation) peersBootstrapperIndexForRanges(count int) {
i.log.Info("peers bootstrapper bootstrapping index for ranges",
zap.Int("shards", count),
)
}

func (i *instrumentation) processingReadersFailed(err error, start time.Time) {
i.log.Error("error processing readers", zap.Error(err),
zap.Time("timeRange.start", start))
}

func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) {
i.log.Debug("building file set index segment", fields...)
}

func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) {
i.log.Debug("skipping out of retention index segment", fields...)
i.persistedIndexBlocksOutOfRetention.Inc(1)
}

func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) {
i.log.Info("building in-memory index segment", fields...)
}

func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) {
i.log.Info("encountered errors for range",
zap.String("requestedRanges", summaryString),
zap.Strings("timesWithErrors", errorsString))
}

func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) {
i.log.Debug("0 available peers, unable to peer bootstrap",
zap.Int("total", total),
zap.Uint32("shard", shardIDUint))
}

func (i *instrumentation) readConsistencyNotAchieved(
bootstrapConsistencyLevel topology.ReadConsistencyLevel,
majorityReplicas int,
total int,
available int,
) {
i.log.Debug("read consistency not achieved, unable to peer bootstrap",
zap.Any("level", bootstrapConsistencyLevel),
zap.Int("replicas", majorityReplicas),
zap.Int("total", total),
zap.Int("available", available))
}
8 changes: 4 additions & 4 deletions src/dbnode/storage/bootstrap_instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ func (i *instrumentationContext) bootstrapNamespacesFailed(err error) {
i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...)
}

func (i *instrumentationContext) logFn(err error, msg string) func(l *zap.Logger) {
return func(l *zap.Logger) {
func (i *instrumentationContext) emitAndLogInvariantViolation(err error, msg string) {
instrument.EmitAndLogInvariantViolation(i.instrumentOptions, func(l *zap.Logger) {
l.Error(msg, append(i.logFields, zap.Error(err))...)
}
})
}

type bootstrapInstrumentation struct {
Expand Down Expand Up @@ -139,7 +139,7 @@ func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext
return newInstrumentationContext(i.nowFn, i.log, i.scope, i.opts)
}

func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) {
func (i *bootstrapInstrumentation) bootstrapFailed(retry int) {
i.numRetries.Inc(1)
i.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
Expand Down

0 comments on commit e765375

Please sign in to comment.