Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional metrics for peers bootstrapper #3060

Merged
merged 13 commits into from
Jan 14, 2021
223 changes: 150 additions & 73 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,42 +70,154 @@ const (

type bootstrapFn func() error

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
type instrumentation struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(here, and elsewhere) I would vote for extracting instrumentation to its own file (as you suggested in PR comments) to keep these non-functional concerns separately (IMHO that would be one of the benefits of this new pattern).
But let's also hear other reviewers opinions before deciding.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

opts Options
log *zap.Logger
bootstrapFn bootstrapFn
nowFn clock.NowFn
sleepFn sleepFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
bootstrapDuration tally.Timer
bootstrapNamespacesDuration tally.Timer
durableStatus tally.Gauge
lastBootstrapCompletionTime xtime.UnixNano
start time.Time
startNamespaces time.Time
logFields []zapcore.Field
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *instrumentation) bootstrapFnFailed(retry int) {
i.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", retry+1))
i.sleepFn(bootstrapRetryInterval)
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *instrumentation) bootstrapPreparing() {
i.start = i.nowFn()
i.log.Info("bootstrap prepare")
}

func (i *instrumentation) bootstrapPrepareFailed(err error) {
i.log.Error("bootstrap prepare failed", zap.Error(err))
}

func (i *instrumentation) bootstrapStarted(shards int) {
i.logFields = []zapcore.Field{
zap.Int("numShards", shards),
}
i.log.Info("bootstrap started", i.logFields...)
}

func (i *instrumentation) bootstrapSucceeded() {
bootstrapDuration := i.nowFn().Sub(i.start)
i.bootstrapDuration.Record(bootstrapDuration)
i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration))
i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...)
}

func (i *instrumentation) bootstrapFailed(err error) {
bootstrapDuration := i.nowFn().Sub(i.start)
i.bootstrapDuration.Record(bootstrapDuration)
i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration))
i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...)
}

func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceID string) {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{
zap.String("namespace", namespaceID),
zap.Error(err),
}...)...)
}

func (i *instrumentation) bootstrapNamespacesFailed(err error) {
duration := i.nowFn().Sub(i.startNamespaces)
i.bootstrapNamespacesDuration.Record(duration)
i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration))
i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...)
}

func (i *instrumentation) bootstrapNamespacesStarted() {
i.startNamespaces = i.nowFn()
i.log.Info("bootstrap namespaces start", i.logFields...)
}

func (i *instrumentation) bootstrapNamespacesSucceeded() {
duration := i.nowFn().Sub(i.startNamespaces)
i.bootstrapNamespacesDuration.Record(duration)
i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration))
i.log.Info("bootstrap namespaces success", i.logFields...)
}

func (i *instrumentation) bootstrapCompletion() {
i.lastBootstrapCompletionTime = xtime.ToUnixNano(i.nowFn())
}

func (i *instrumentation) setIsBootstrapped(isBootstrapped bool) {
if isBootstrapped {
i.status.Update(1)
} else {
i.status.Update(0)
}
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *instrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) {
if isBootstrappedAndDurable {
i.durableStatus.Update(1)
} else {
i.durableStatus.Update(0)
}
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *instrumentation) missingNamespaceFromResult(err error) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) {
l.Error("bootstrap failed", append(i.logFields, zap.Error(err))...)
})
}

func (i *instrumentation) bootstrapDataAccumulatorCloseFailed(err error) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
}
soundvibe marked this conversation as resolved.
Show resolved Hide resolved

func newInstrumentation(opts Options) *instrumentation {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
scope := opts.InstrumentOptions().MetricsScope()
return &instrumentation{
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
}
}

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
bootstrapFn bootstrapFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
instrumentation *instrumentation
}

func newBootstrapManager(
database database,
mediator databaseMediator,
opts Options,
) databaseBootstrapManager {
scope := opts.InstrumentOptions().MetricsScope()
m := &bootstrapManager{
database: database,
mediator: mediator,
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
processProvider: opts.BootstrapProcessProvider(),
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
database: database,
mediator: mediator,
processProvider: opts.BootstrapProcessProvider(),
instrumentation: newInstrumentation(opts),
}
m.bootstrapFn = m.bootstrap
return m
Expand All @@ -120,7 +232,7 @@ func (m *bootstrapManager) IsBootstrapped() bool {

func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) {
m.RLock()
bsTime := m.lastBootstrapCompletionTime
bsTime := m.instrumentation.lastBootstrapCompletionTime
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
m.RUnlock()
return bsTime, bsTime > 0
}
Expand Down Expand Up @@ -176,10 +288,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", i+1))
m.sleepFn(bootstrapRetryInterval)
m.instrumentation.bootstrapFnFailed(i + 1)
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand All @@ -196,23 +305,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.instrumentation.bootstrapCompletion()
m.Unlock()
return result, nil
}

func (m *bootstrapManager) Report() {
if m.IsBootstrapped() {
m.status.Update(1)
} else {
m.status.Update(0)
}

if m.database.IsBootstrappedAndDurable() {
m.durableStatus.Update(1)
} else {
m.durableStatus.Update(0)
}
m.instrumentation.setIsBootstrapped(m.IsBootstrapped())
m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable())
}

type bootstrapNamespace struct {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -243,18 +343,12 @@ func (m *bootstrapManager) bootstrap() error {
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
m.instrumentation.bootstrapDataAccumulatorCloseFailed(err)
}
}
}()

start := m.nowFn()
m.log.Info("bootstrap prepare")

m.instrumentation.bootstrapPreparing()
var (
bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces))
prepareWg sync.WaitGroup
Expand Down Expand Up @@ -288,7 +382,7 @@ func (m *bootstrapManager) bootstrap() error {
prepareWg.Wait()

if err := prepareMultiErr.FinalError(); err != nil {
m.log.Error("bootstrap prepare failed", zap.Error(err))
m.instrumentation.bootstrapPrepareFailed(err)
return err
}

Expand Down Expand Up @@ -329,58 +423,41 @@ func (m *bootstrapManager) bootstrap() error {
})
}

logFields := []zapcore.Field{
zap.Int("numShards", len(uniqueShards)),
}
m.log.Info("bootstrap started", logFields...)

m.instrumentation.bootstrapStarted(len(uniqueShards))
// Run the bootstrap.
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
logFields = append(logFields,
zap.Duration("bootstrapDuration", bootstrapDuration))

bootstrapResult, err := process.Run(ctx, m.instrumentation.start, targets)
if err != nil {
m.log.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapFailed(err)
return err
}

m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...)
m.instrumentation.bootstrapSucceeded()
// Use a multi-error here because we want to at least bootstrap
// as many of the namespaces as possible.

m.instrumentation.bootstrapNamespacesStarted()
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
multiErr := xerrors.NewMultiError()
for _, namespace := range namespaces {
id := namespace.ID()
result, ok := bootstrapResult.Results.Get(id)
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
i := m.opts.InstrumentOptions()
instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) {
l.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
})
m.instrumentation.missingNamespaceFromResult(err)
return err
}

if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
}...)...)
m.instrumentation.bootstrapNamespaceFailed(err, id.String())
multiErr = multiErr.Add(err)
}
}

if err := multiErr.FinalError(); err != nil {
m.log.Info("bootstrap namespaces failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapNamespacesFailed(err)
return err
}

m.log.Info("bootstrap success", logFields...)
m.instrumentation.bootstrapNamespacesSucceeded()
return nil
}
Loading