Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional metrics for peers bootstrapper #3060

Merged
merged 13 commits into from
Jan 14, 2021
4 changes: 2 additions & 2 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue,

if s.pools.multiReaderIteratorArray == nil {
s.pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{
pool.Bucket{
{
Capacity: replicas,
Count: s.opts.SeriesIteratorPoolSize(),
},
Expand Down Expand Up @@ -3853,7 +3853,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD
return nil, nil, errEnqueueChIsClosed
}
c.sending++ // NB(r): This is decremented by calling the returned enqueue done function
c.enqueued += (numToEnqueue)
c.enqueued += numToEnqueue
c.Unlock()
return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil
}
Expand Down
105 changes: 34 additions & 71 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -70,42 +68,38 @@ const (

type bootstrapFn func() error

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
}

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
opts Options
log *zap.Logger
bootstrapFn bootstrapFn
nowFn clock.NowFn
sleepFn sleepFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
sleepFn sleepFn
nowFn clock.NowFn
lastBootstrapCompletionTime xtime.UnixNano
instrumentation *bootstrapInstrumentation
}

func newBootstrapManager(
database database,
mediator databaseMediator,
opts Options,
) databaseBootstrapManager {
scope := opts.InstrumentOptions().MetricsScope()
m := &bootstrapManager{
database: database,
mediator: mediator,
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
processProvider: opts.BootstrapProcessProvider(),
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
database: database,
mediator: mediator,
processProvider: opts.BootstrapProcessProvider(),
sleepFn: time.Sleep,
nowFn: opts.ClockOptions().NowFn(),
instrumentation: newInstrumentation(opts),
}
m.bootstrapFn = m.bootstrap
return m
Expand Down Expand Up @@ -176,9 +170,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", i+1))
m.instrumentation.bootstrapFnFailed(i + 1)
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
m.sleepFn(bootstrapRetryInterval)
continue
}
Expand All @@ -202,22 +194,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
}

func (m *bootstrapManager) Report() {
if m.IsBootstrapped() {
m.status.Update(1)
} else {
m.status.Update(0)
}

if m.database.IsBootstrappedAndDurable() {
m.durableStatus.Update(1)
} else {
m.durableStatus.Update(0)
}
}

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
m.instrumentation.setIsBootstrapped(m.IsBootstrapped())
m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable())
}

func (m *bootstrapManager) bootstrap() error {
Expand All @@ -243,7 +221,7 @@ func (m *bootstrapManager) bootstrap() error {
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(),
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
Expand All @@ -252,9 +230,7 @@ func (m *bootstrapManager) bootstrap() error {
}
}()

start := m.nowFn()
m.log.Info("bootstrap prepare")

instrCtx := m.instrumentation.bootstrapPreparing()
var (
bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces))
prepareWg sync.WaitGroup
Expand Down Expand Up @@ -288,7 +264,7 @@ func (m *bootstrapManager) bootstrap() error {
prepareWg.Wait()

if err := prepareMultiErr.FinalError(); err != nil {
m.log.Error("bootstrap prepare failed", zap.Error(err))
m.instrumentation.bootstrapPrepareFailed(err)
return err
}

Expand Down Expand Up @@ -329,26 +305,17 @@ func (m *bootstrapManager) bootstrap() error {
})
}

logFields := []zapcore.Field{
zap.Int("numShards", len(uniqueShards)),
}
m.log.Info("bootstrap started", logFields...)

m.instrumentation.bootstrapStarted(instrCtx, len(uniqueShards))
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
// Run the bootstrap.
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
logFields = append(logFields,
zap.Duration("bootstrapDuration", bootstrapDuration))

bootstrapResult, err := process.Run(ctx, instrCtx.start, targets)
if err != nil {
m.log.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapFailed(instrCtx, err)
return err
}

m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...)
m.instrumentation.bootstrapSucceeded(instrCtx)

instrCtx = m.instrumentation.bootstrapNamespacesStarted(instrCtx)
// Use a multi-error here because we want to at least bootstrap
// as many of the namespaces as possible.
multiErr := xerrors.NewMultiError()
Expand All @@ -358,29 +325,25 @@ func (m *bootstrapManager) bootstrap() error {
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
i := m.opts.InstrumentOptions()
instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) {
l.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
})
instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("bootstrap failed",
append(instrCtx.logFields, zap.Error(err))...)
})
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
return err
}

if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
}...)...)
m.instrumentation.bootstrapNamespaceFailed(instrCtx, err, id)
multiErr = multiErr.Add(err)
}
}

if err := multiErr.FinalError(); err != nil {
m.log.Info("bootstrap namespaces failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapNamespacesFailed(instrCtx, err)
return err
}

m.log.Info("bootstrap success", logFields...)
m.instrumentation.bootstrapNamespacesSucceeded(instrCtx)
return nil
}
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package peers implements peers bootstrapping.
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
package peers

import (
Expand Down
Loading