Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #9761 #9942

14 changes: 14 additions & 0 deletions etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ var (
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "heartbeat_send_failures_total",
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
})
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Expand Down Expand Up @@ -96,6 +108,8 @@ func init() {
prometheus.MustRegister(hasLeader)
prometheus.MustRegister(isLeader)
prometheus.MustRegister(leaderChanges)
prometheus.MustRegister(heartbeatSendFailures)
prometheus.MustRegister(slowApplies)
prometheus.MustRegister(proposalsCommitted)
prometheus.MustRegister(proposalsApplied)
prometheus.MustRegister(proposalsPending)
Expand Down
1 change: 1 addition & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
heartbeatSendFailures.Inc()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref
result = resp
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
slowApplies.Inc()
}
}

Expand Down
5 changes: 5 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func (b *backend) Defrag() error {
}

func (b *backend) defrag() error {
now := time.Now()

// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
Expand Down Expand Up @@ -351,6 +353,9 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

took := time.Since(now)
defragDurations.Observe(took.Seconds())

return nil
}

Expand Down
12 changes: 7 additions & 5 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
t.Unlock()
}

// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
t.Unlock()
}

func (t *batchTx) Unlock() {
Expand All @@ -167,9 +167,11 @@ func (t *batchTx) commit(stop bool) {
}

start := time.Now()

// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}

commitDurations.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)

Expand Down Expand Up @@ -214,21 +216,21 @@ func (t *batchTxBuffered) Unlock() {

func (t *batchTxBuffered) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
t.Unlock()
}

func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
t.Unlock()
}

func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
defer t.backend.readTx.mu.Unlock()
t.unsafeCommit(stop)
t.backend.readTx.mu.Unlock()
}

func (t *batchTxBuffered) unsafeCommit(stop bool) {
Expand Down
22 changes: 20 additions & 2 deletions mvcc/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,38 @@ var (
Subsystem: "disk",
Name: "backend_commit_duration_seconds",
Help: "The latency distributions of commit called by backend.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_defrag_duration_seconds",
Help: "The latency distribution of backend defragmentation.",

// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
})

snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_snapshot_duration_seconds",
Help: "The latency distribution of backend snapshots.",
// 10 ms -> 655 seconds

// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
)

func init() {
prometheus.MustRegister(commitDurations)
prometheus.MustRegister(defragDurations)
prometheus.MustRegister(snapshotDurations)
}
4 changes: 4 additions & 0 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
}

func (s *store) Hash() (hash uint32, revision int64, err error) {
start := time.Now()

s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)

hashDurations.Observe(time.Since(start).Seconds())
return h, s.currentRev, err
}

Expand Down
15 changes: 14 additions & 1 deletion mvcc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,19 @@ var (
)
// overridden by mvcc initialization
reportDbTotalSizeInUseInBytesMu sync.RWMutex
reportDbTotalSizeInUseInBytes = func() float64 { return 0 }
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }

hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "hash_duration_seconds",
Help: "The latency distribution of storage hash operation.",

// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})
)

func init() {
Expand All @@ -193,6 +205,7 @@ func init() {
prometheus.MustRegister(dbTotalSizeDebugging)
prometheus.MustRegister(dbTotalSize)
prometheus.MustRegister(dbTotalSizeInUse)
prometheus.MustRegister(hashDurations)
}

// ReportEventReceived reports that an event is received.
Expand Down