diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 6c8bdda825..4ac0b3002b 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -3857,7 +3857,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD return nil, nil, errEnqueueChIsClosed } c.sending++ // NB(r): This is decremented by calling the returned enqueue done function - c.enqueued += (numToEnqueue) + c.enqueued += numToEnqueue c.Unlock() return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil } diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a154e09493..8481389cf9 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -30,12 +30,7 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - - "github.com/uber-go/tally" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var ( @@ -70,23 +65,24 @@ const ( type bootstrapFn func() error +type bootstrapNamespace struct { + namespace databaseNamespace + shards []databaseShard +} + type bootstrapManager struct { sync.RWMutex database database mediator databaseMediator - opts Options - log *zap.Logger bootstrapFn bootstrapFn - nowFn clock.NowFn - sleepFn sleepFn processProvider bootstrap.ProcessProvider state BootstrapState hasPending bool - status tally.Gauge - bootstrapDuration tally.Timer - durableStatus tally.Gauge + sleepFn sleepFn + nowFn clock.NowFn lastBootstrapCompletionTime xtime.UnixNano + instrumentation *bootstrapInstrumentation } func newBootstrapManager( @@ -94,18 +90,13 @@ func newBootstrapManager( mediator databaseMediator, opts Options, ) databaseBootstrapManager { - scope := opts.InstrumentOptions().MetricsScope() m := &bootstrapManager{ - database: database, - mediator: mediator, - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - processProvider: opts.BootstrapProcessProvider(), - status: scope.Gauge("bootstrapped"), - bootstrapDuration: scope.Timer("bootstrap-duration"), - durableStatus: scope.Gauge("bootstrapped-durable"), + database: database, + mediator: mediator, + processProvider: opts.BootstrapProcessProvider(), + sleepFn: time.Sleep, + nowFn: opts.ClockOptions().NowFn(), + instrumentation: newBootstrapInstrumentation(opts), } m.bootstrapFn = m.bootstrap return m @@ -176,9 +167,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(r): Last bootstrap failed, since this could be due to transient // failure we retry the bootstrap again. This is to avoid operators // needing to manually intervene for cases where failures are transient. - m.log.Warn("retrying bootstrap after backoff", - zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", i+1)) + m.instrumentation.bootstrapFailed(i + 1) m.sleepFn(bootstrapRetryInterval) continue } @@ -202,22 +191,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { } func (m *bootstrapManager) Report() { - if m.IsBootstrapped() { - m.status.Update(1) - } else { - m.status.Update(0) - } - - if m.database.IsBootstrappedAndDurable() { - m.durableStatus.Update(1) - } else { - m.durableStatus.Update(0) - } -} - -type bootstrapNamespace struct { - namespace databaseNamespace - shards []databaseShard + m.instrumentation.setIsBootstrapped(m.IsBootstrapped()) + m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable()) } func (m *bootstrapManager) bootstrap() error { @@ -236,6 +211,8 @@ func (m *bootstrapManager) bootstrap() error { return err } + instrCtx := m.instrumentation.bootstrapPreparing() + accmulators := make([]bootstrap.NamespaceDataAccumulator, 0, len(namespaces)) defer func() { // Close all accumulators at bootstrap completion, only error @@ -243,18 +220,11 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("could not close bootstrap data accumulator", - zap.Error(err)) - }) + instrCtx.emitAndLogInvariantViolation(err, "could not close bootstrap data accumulator") } } }() - start := m.nowFn() - m.log.Info("bootstrap prepare") - var ( bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces)) prepareWg sync.WaitGroup @@ -288,7 +258,7 @@ func (m *bootstrapManager) bootstrap() error { prepareWg.Wait() if err := prepareMultiErr.FinalError(); err != nil { - m.log.Error("bootstrap prepare failed", zap.Error(err)) + m.instrumentation.bootstrapPrepareFailed(err) return err } @@ -329,26 +299,17 @@ func (m *bootstrapManager) bootstrap() error { }) } - logFields := []zapcore.Field{ - zap.Int("numShards", len(uniqueShards)), - } - m.log.Info("bootstrap started", logFields...) - + instrCtx.bootstrapStarted(len(uniqueShards)) // Run the bootstrap. - bootstrapResult, err := process.Run(ctx, start, targets) - - bootstrapDuration := m.nowFn().Sub(start) - m.bootstrapDuration.Record(bootstrapDuration) - logFields = append(logFields, - zap.Duration("bootstrapDuration", bootstrapDuration)) - + bootstrapResult, err := process.Run(ctx, instrCtx.start, targets) if err != nil { - m.log.Error("bootstrap failed", - append(logFields, zap.Error(err))...) + instrCtx.bootstrapFailed(err) return err } - m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...) + instrCtx.bootstrapSucceeded() + + instrCtx.bootstrapNamespacesStarted() // Use a multi-error here because we want to at least bootstrap // as many of the namespaces as possible. multiErr := xerrors.NewMultiError() @@ -358,29 +319,21 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - i := m.opts.InstrumentOptions() - instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) { - l.Error("bootstrap failed", - append(logFields, zap.Error(err))...) - }) + instrCtx.emitAndLogInvariantViolation(err, "bootstrap failed") return err } if err := namespace.Bootstrap(ctx, result); err != nil { - m.log.Info("bootstrap error", append(logFields, []zapcore.Field{ - zap.String("namespace", id.String()), - zap.Error(err), - }...)...) + instrCtx.bootstrapNamespaceFailed(err, id) multiErr = multiErr.Add(err) } } if err := multiErr.FinalError(); err != nil { - m.log.Info("bootstrap namespaces failed", - append(logFields, zap.Error(err))...) + instrCtx.bootstrapNamespacesFailed(err) return err } - m.log.Info("bootstrap success", logFields...) + instrCtx.bootstrapNamespacesSucceeded() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go index fbca789c3f..cb536f5edc 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package peers implements peers bootstrapping. package peers import ( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index c8d409cfad..d451dca727 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -28,7 +28,6 @@ import ( "time" "github.com/opentracing/opentracing-go" - "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -45,11 +44,9 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" - "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -60,14 +57,9 @@ import ( type peersSource struct { opts Options - log *zap.Logger newPersistManager func() (persist.Manager, error) - nowFn clock.NowFn - metrics peersSourceMetrics -} - -type peersSourceMetrics struct { - persistedIndexBlocksOutOfRetention tally.Counter + log *zap.Logger + instrumentation *instrumentation } type persistenceFlush struct { @@ -82,19 +74,14 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { return nil, err } - iopts := opts.ResultOptions().InstrumentOptions() - scope := iopts.MetricsScope().SubScope("peers-bootstrapper") - iopts = iopts.SetMetricsScope(scope) + instrumentation := newInstrumentation(opts) return &peersSource{ opts: opts, - log: iopts.Logger().With(zap.String("bootstrapper", "peers")), newPersistManager: func() (persist.Manager, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - metrics: peersSourceMetrics{ - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - }, + log: instrumentation.log, + instrumentation: instrumentation, }, nil } @@ -132,8 +119,8 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) - defer span.Finish() + instrCtx := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) + defer instrCtx.finish() timeRangesEmpty := true for _, elem := range namespaces.Namespaces.Iter() { @@ -157,10 +144,8 @@ func (s *peersSource) Read( } // NB(r): Perform all data bootstrapping first then index bootstrapping - // to more clearly deliniate which process is slower than the other. - start := s.nowFn() - s.log.Info("bootstrapping time series data start") - span.LogEvent("bootstrap_data_start") + // to more clearly delineate which process is slower than the other. + instrCtx.bootstrapDataStarted() for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -178,30 +163,24 @@ func (s *peersSource) Read( DataResult: r, }) } - s.log.Info("bootstrapping time series data success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_data_done") - + instrCtx.bootstrapDataCompleted() // NB(bodu): We need to evict the info file cache before reading index data since we've // maybe fetched blocks from peers so the cached info file state is now stale. cache.Evict() - start = s.nowFn() - s.log.Info("bootstrapping index metadata start") - span.LogEvent("bootstrap_index_start") + + instrCtx.bootstrapIndexStarted() for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata if !md.Options().IndexOptions().Enabled() { s.log.Info("skipping bootstrap for namespace based on options", zap.Stringer("namespace", md.ID())) - - // Not bootstrapping for index. continue } r, err := s.readIndex(md, namespace.IndexRunOptions.ShardTimeRanges, - span, + instrCtx.span, cache, namespace.IndexRunOptions.RunOptions, ) @@ -220,9 +199,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } - s.log.Info("bootstrapping index metadata success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_index_done") + instrCtx.bootstrapIndexCompleted() return results, nil } @@ -277,10 +254,8 @@ func (s *peersSource) readData( concurrency = s.opts.ShardPersistenceConcurrency() } - s.log.Info("peers bootstrapper bootstrapping shards for ranges", - zap.Int("shards", count), - zap.Int("concurrency", concurrency), - zap.Bool("shouldPersist", shouldPersist)) + instrCtx := s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) + defer instrCtx.bootstrapShardsCompleted() if shouldPersist { // Spin up persist workers. for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { @@ -485,27 +460,28 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( shardResult result.ShardResult, err error, ) { - if err == nil { - shardBlockSeriesCounter := map[xtime.UnixNano]int64{} - for _, entry := range shardResult.AllSeries().Iter() { - series := entry.Value() - for blockStart := range series.Blocks.AllBlocks() { - shardBlockSeriesCounter[blockStart]++ - } - } - - for block, numSeries := range shardBlockSeriesCounter { - s.log.Info("peer bootstrapped shard", - zap.Uint32("shard", shard), - zap.Int64("numSeries", numSeries), - zap.Time("blockStart", block.ToTime()), - ) - } - } else { + if err != nil { s.log.Error("error fetching bootstrap blocks", zap.Uint32("shard", shard), zap.Error(err), ) + return + } + + shardBlockSeriesCounter := map[xtime.UnixNano]int64{} + for _, entry := range shardResult.AllSeries().Iter() { // nolint + series := entry.Value() + for blockStart := range series.Blocks.AllBlocks() { + shardBlockSeriesCounter[blockStart]++ + } + } + + for block, numSeries := range shardBlockSeriesCounter { + s.log.Info("peer bootstrapped shard", + zap.Uint32("shard", shard), + zap.Int64("numSeries", numSeries), + zap.Time("blockStart", block.ToTime()), + ) } } @@ -718,8 +694,7 @@ func (s *peersSource) readIndex( readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) ) s.log.Info("peers bootstrapper bootstrapping index for ranges", - zap.Int("shards", count), - ) + zap.Int("shards", count)) go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ NsMD: ns, @@ -733,9 +708,9 @@ func (s *peersSource) readIndex( // NB(bodu): We only read metadata when performing a peers bootstrap // so we do not need to sort the data fileset reader. OptimizedReadMetadataOnly: true, - Logger: s.log, + Logger: s.instrumentation.log, Span: span, - NowFn: s.nowFn, + NowFn: s.instrumentation.nowFn, Cache: cache, }) @@ -960,8 +935,7 @@ func (s *peersSource) processReaders( // Bail early if the index segment is already out of retention. // This can happen when the edge of requested ranges at time of data bootstrap // is now out of retention. - s.log.Debug("skipping out of retention index segment", buildIndexLogFields...) - s.metrics.persistedIndexBlocksOutOfRetention.Inc(1) + s.instrumentation.outOfRetentionIndexSegmentSkipped(buildIndexLogFields) return remainingRanges, timesWithErrors } else if err != nil { instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { @@ -984,7 +958,6 @@ func (s *peersSource) processReaders( blockEnd, ) if err != nil { - iopts := s.opts.ResultOptions().InstrumentOptions() instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { l.Error("build fs index bootstrap failed", zap.Stringer("namespace", ns.ID()), @@ -1060,8 +1033,8 @@ func (s *peersSource) markRunResultErrorsAndUnfulfilled( for i := range timesWithErrors { timesWithErrorsString[i] = timesWithErrors[i].String() } - s.log.Info("encounted errors for range", - zap.String("requestedRanges", requestedRanges.SummaryString()), + s.log.Info("encountered errors for range", + zap.String("requestedRanges", remainingRanges.SummaryString()), zap.Strings("timesWithErrors", timesWithErrorsString)) } @@ -1140,16 +1113,15 @@ func (s *peersSource) peerAvailability( if available == 0 { // Can't peer bootstrap if there are no available peers. - s.log.Debug( - "0 available peers, unable to peer bootstrap", - zap.Int("total", total), zap.Uint32("shard", shardIDUint)) + s.log.Debug("0 available peers, unable to peer bootstrap", + zap.Int("total", total), + zap.Uint32("shard", shardIDUint)) continue } if !topology.ReadConsistencyAchieved( bootstrapConsistencyLevel, majorityReplicas, total, available) { - s.log.Debug( - "read consistency not achieved, unable to peer bootstrap", + s.log.Debug("read consistency not achieved, unable to peer bootstrap", zap.Any("level", bootstrapConsistencyLevel), zap.Int("replicas", majorityReplicas), zap.Int("total", total), diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go new file mode 100644 index 0000000000..72d2cd12bd --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -0,0 +1,172 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package peers + +import ( + "time" + + "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" + "github.com/uber-go/tally" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/dbnode/tracepoint" + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/context" +) + +type instrumentationContext struct { + nowFn clock.NowFn + log *zap.Logger + start time.Time + span opentracing.Span + bootstrapDataDuration tally.Timer + bootstrapIndexDuration tally.Timer +} + +func newInstrumentationContext( + nowFn clock.NowFn, + log *zap.Logger, + span opentracing.Span, + scope tally.Scope, +) *instrumentationContext { + return &instrumentationContext{ + nowFn: nowFn, + log: log, + span: span, + bootstrapDataDuration: scope.Timer("data-duration"), + bootstrapIndexDuration: scope.Timer("index-duration"), + } +} + +func (i *instrumentationContext) finish() { + i.span.Finish() +} + +func (i *instrumentationContext) bootstrapDataStarted() { + i.log.Info("bootstrapping time series data start") + i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) + i.start = i.nowFn() +} + +func (i *instrumentationContext) bootstrapDataCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapDataDuration.Record(duration) + i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) + i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) +} + +func (i *instrumentationContext) bootstrapIndexStarted() { + i.log.Info("bootstrapping index metadata start") + i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) + i.start = i.nowFn() +} + +func (i *instrumentationContext) bootstrapIndexCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapIndexDuration.Record(duration) + i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) + i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) +} + +type instrumentationReadShardsContext struct { + nowFn clock.NowFn + log *zap.Logger + start time.Time + bootstrapShardsDuration tally.Timer +} + +func newInstrumentationReadShardsContext( + nowFn clock.NowFn, + log *zap.Logger, + scope tally.Scope, +) *instrumentationReadShardsContext { + return &instrumentationReadShardsContext{ + nowFn: nowFn, + log: log, + start: nowFn(), + bootstrapShardsDuration: scope.Timer("shards-duration"), + } +} + +func (i *instrumentationReadShardsContext) bootstrapShardsCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapShardsDuration.Record(duration) + i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) +} + +type instrumentation struct { + opts Options + scope tally.Scope + log *zap.Logger + nowFn clock.NowFn + persistedIndexBlocksOutOfRetention tally.Counter +} + +func newInstrumentation(opts Options) *instrumentation { + var ( + scope = opts.ResultOptions().InstrumentOptions(). + MetricsScope().SubScope("peers-bootstrapper") + instrumentOptions = opts.ResultOptions().InstrumentOptions().SetMetricsScope(scope) + ) + + return &instrumentation{ + opts: opts, + scope: scope, + log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + } +} + +func (i *instrumentation) peersBootstrapperSourceReadStarted( + ctx context.Context, +) *instrumentationContext { + _, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + return newInstrumentationContext( + i.nowFn, + i.log, + span, + i.scope, + ) +} + +func (i *instrumentation) bootstrapShardsStarted( + count int, + concurrency int, + shouldPersist bool, +) *instrumentationReadShardsContext { + i.log.Info("peers bootstrapper bootstrapping shards for ranges", + zap.Int("shards", count), + zap.Int("concurrency", concurrency), + zap.Bool("shouldPersist", shouldPersist)) + return newInstrumentationReadShardsContext( + i.nowFn, + i.log, + i.scope, + ) +} + +func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { + i.log.Debug("skipping out of retention index segment", fields...) + i.persistedIndexBlocksOutOfRetention.Inc(1) +} diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go new file mode 100644 index 0000000000..37e593fd92 --- /dev/null +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -0,0 +1,167 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "time" + + "github.com/uber-go/tally" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" +) + +type instrumentationContext struct { + start time.Time + log *zap.Logger + logFields []zapcore.Field + bootstrapDuration tally.Timer + bootstrapNamespacesDuration tally.Timer + nowFn clock.NowFn + instrumentOptions instrument.Options +} + +func newInstrumentationContext( + nowFn clock.NowFn, + log *zap.Logger, + scope tally.Scope, + opts Options, +) *instrumentationContext { + return &instrumentationContext{ + start: nowFn(), + log: log, + nowFn: nowFn, + bootstrapDuration: scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), + instrumentOptions: opts.InstrumentOptions(), + } +} + +func (i *instrumentationContext) bootstrapStarted(shards int) { + i.logFields = append(i.logFields, zap.Int("numShards", shards)) + i.log.Info("bootstrap started", i.logFields...) +} + +func (i *instrumentationContext) bootstrapSucceeded() { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...) +} + +func (i *instrumentationContext) bootstrapFailed(err error) { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentationContext) bootstrapNamespacesStarted() { + i.start = i.nowFn() + i.log.Info("bootstrap namespaces start", i.logFields...) +} + +func (i *instrumentationContext) bootstrapNamespacesSucceeded() { + duration := i.nowFn().Sub(i.start) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces success", i.logFields...) +} + +func (i *instrumentationContext) bootstrapNamespaceFailed( + err error, + namespaceID ident.ID, +) { + i.log.Info("bootstrap namespace error", append(i.logFields, + zap.String("namespace", namespaceID.String()), + zap.Error(err))...) +} + +func (i *instrumentationContext) bootstrapNamespacesFailed(err error) { + duration := i.nowFn().Sub(i.start) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentationContext) emitAndLogInvariantViolation(err error, msg string) { + instrument.EmitAndLogInvariantViolation(i.instrumentOptions, func(l *zap.Logger) { + l.Error(msg, append(i.logFields, zap.Error(err))...) + }) +} + +type bootstrapInstrumentation struct { + opts Options + scope tally.Scope + log *zap.Logger + nowFn clock.NowFn + status tally.Gauge + durableStatus tally.Gauge + numRetries tally.Counter +} + +func newBootstrapInstrumentation(opts Options) *bootstrapInstrumentation { + scope := opts.InstrumentOptions().MetricsScope() + return &bootstrapInstrumentation{ + opts: opts, + scope: scope, + log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), + status: scope.Gauge("bootstrapped"), + durableStatus: scope.Gauge("bootstrapped-durable"), + numRetries: scope.Counter("bootstrap-retries"), + } +} + +func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { + i.log.Info("bootstrap prepare") + return newInstrumentationContext(i.nowFn, i.log, i.scope, i.opts) +} + +func (i *bootstrapInstrumentation) bootstrapFailed(retry int) { + i.numRetries.Inc(1) + i.log.Warn("retrying bootstrap after backoff", + zap.Duration("backoff", bootstrapRetryInterval), + zap.Int("numRetries", retry)) +} + +func (i *bootstrapInstrumentation) bootstrapPrepareFailed(err error) { + i.log.Error("bootstrap prepare failed", zap.Error(err)) +} + +func (i *bootstrapInstrumentation) setIsBootstrapped(isBootstrapped bool) { + var status float64 = 0 + if isBootstrapped { + status = 1 + } + i.status.Update(status) +} + +func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) { + var status float64 = 0 + if isBootstrappedAndDurable { + status = 1 + } + i.durableStatus.Update(status) +}