Skip to content

Commit

Permalink
feat(metrics): Add Raft leadership metrics. (#7338)
Browse files Browse the repository at this point in the history
* dgraph_raft_has_leader

* dgraph_raft_is_leader

* dgraph_raft_leader_changes_total
  • Loading branch information
danielmai authored Jan 25, 2021
1 parent fde7e64 commit eba6e5a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 30 deletions.
3 changes: 3 additions & 0 deletions dgraph/cmd/alpha/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func TestMetrics(t *testing.T) {
"dgraph_active_mutations_total", "dgraph_pending_proposals_total",
"dgraph_pending_queries_total",
"dgraph_num_queries_total", "dgraph_alpha_health_status",

// Raft metrics
"dgraph_raft_has_leader", "dgraph_raft_is_leader", "dgraph_raft_leader_changes_total",
}
for _, requiredM := range requiredMetrics {
_, ok := metricsMap[requiredM]
Expand Down
27 changes: 25 additions & 2 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"sync"
"time"

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -39,6 +37,9 @@ import (
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
otrace "go.opencensus.io/trace"
)

var raftDefault = "idx=1; learner=false"
Expand Down Expand Up @@ -809,6 +810,12 @@ func (n *node) calculateAndProposeSnapshot() error {
const tickDur = 100 * time.Millisecond

func (n *node) Run() {
// lastLead is for detecting leadership changes
//
// etcd has a similar mechanism for tracking leader changes, with their
// raftReadyHandler.getLead() function that returns the previous leader
lastLead := uint64(math.MaxUint64)

var leader bool
licenseApplied := false
ticker := time.NewTicker(tickDur)
Expand Down Expand Up @@ -861,6 +868,22 @@ func (n *node) Run() {
n.server.updateLeases()
}
leader = rd.RaftState == raft.StateLeader
// group id hardcoded as 0
ctx, _ := tag.New(n.ctx, tag.Upsert(x.KeyGroup, "0"))
if rd.SoftState.Lead != lastLead {
lastLead = rd.SoftState.Lead
ostats.Record(ctx, x.RaftLeaderChanges.M(1))
}
if rd.SoftState.Lead != raft.None {
ostats.Record(ctx, x.RaftHasLeader.M(1))
} else {
ostats.Record(ctx, x.RaftHasLeader.M(0))
}
if leader {
ostats.Record(ctx, x.RaftIsLeader.M(1))
} else {
ostats.Record(ctx, x.RaftIsLeader.M(0))
}
// Oracle stream would close the stream once it steps down as leader
// predicate move would cancel any in progress move on stepping down.
n.triggerLeaderChange()
Expand Down
24 changes: 24 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1052,6 +1053,12 @@ const tickDur = 100 * time.Millisecond
func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

// lastLead is for detecting leadership changes
//
// etcd has a similar mechanism for tracking leader changes, with their
// raftReadyHandler.getLead() function that returns the previous leader
lastLead := uint64(math.MaxUint64)

firstRun := true
var leader bool
// See also our configuration of HeartbeatTick and ElectionTick.
Expand Down Expand Up @@ -1106,6 +1113,23 @@ func (n *node) Run() {
if rd.SoftState != nil {
groups().triggerMembershipSync()
leader = rd.RaftState == raft.StateLeader
// create context with group id
ctx, _ := tag.New(n.ctx, tag.Upsert(x.KeyGroup, fmt.Sprintf("%d", n.gid)))
// detect leadership changes
if rd.SoftState.Lead != lastLead {
lastLead = rd.SoftState.Lead
ostats.Record(ctx, x.RaftLeaderChanges.M(1))
}
if rd.SoftState.Lead != raft.None {
ostats.Record(ctx, x.RaftHasLeader.M(1))
} else {
ostats.Record(ctx, x.RaftHasLeader.M(0))
}
if leader {
ostats.Record(ctx, x.RaftIsLeader.M(1))
} else {
ostats.Record(ctx, x.RaftIsLeader.M(0))
}
}
if leader {
// Leader can send messages in parallel with writing to disk.
Expand Down
91 changes: 63 additions & 28 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,25 @@ var (
// PLCacheHitRatio records the hit ratio of posting list cache.
PLCacheHitRatio = stats.Float64("hit_ratio_posting_cache",
"Hit ratio of posting list cache", stats.UnitDimensionless)
// RaftHasLeader records whether this instance has a leader
RaftHasLeader = stats.Int64("raft_has_leader",
"Whether or not a leader exists for the group", stats.UnitDimensionless)
// RaftIsLeader records whether this instance is the leader
RaftIsLeader = stats.Int64("raft_is_leader",
"Whether or not this instance is the leader of the group", stats.UnitDimensionless)
// RaftLeaderChanges records the total number of leader changes seen.
RaftLeaderChanges = stats.Int64("raft_leader_changes_total",
"Total number of leader changes seen", stats.UnitDimensionless)

// Conf holds the metrics config.
// TODO: Request statistics, latencies, 500, timeouts
Conf *expvar.Map

// Tag keys.

// KeyGroup is the tag key used to record the group for Raft metrics.
KeyGroup, _ = tag.NewKey("group")

// KeyStatus is the tag key used to record the status of the server.
KeyStatus, _ = tag.NewKey("status")
// KeyMethod is the tag key used to record the method (e.g read or mutate).
Expand All @@ -141,6 +153,8 @@ var (
KeyStatus, KeyMethod,
}

allRaftKeys = []tag.Key{KeyGroup}

allViews = []*view.View{
{
Name: LatencyMs.Name(),
Expand All @@ -163,34 +177,6 @@ var (
Aggregation: view.Count(),
TagKeys: allTagKeys,
},
{
Name: RaftAppliedIndex.Name(),
Measure: RaftAppliedIndex,
Description: RaftAppliedIndex.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: RaftApplyCh.Name(),
Measure: RaftApplyCh,
Description: RaftApplyCh.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: RaftPendingSize.Name(),
Measure: RaftPendingSize,
Description: RaftPendingSize.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: TxnAborts.Name(),
Measure: TxnAborts,
Expand Down Expand Up @@ -277,6 +263,55 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: RaftAppliedIndex.Name(),
Measure: RaftAppliedIndex,
Description: RaftAppliedIndex.Description(),
Aggregation: view.LastValue(),
TagKeys: allRaftKeys,
},
{
Name: RaftApplyCh.Name(),
Measure: RaftApplyCh,
Description: RaftApplyCh.Description(),
Aggregation: view.LastValue(),
TagKeys: allRaftKeys,
},
{
Name: RaftPendingSize.Name(),
Measure: RaftPendingSize,
Description: RaftPendingSize.Description(),
Aggregation: view.LastValue(),
TagKeys: allRaftKeys,
},
{
Name: RaftHasLeader.Name(),
Measure: RaftHasLeader,
Description: RaftHasLeader.Description(),
Aggregation: view.LastValue(),
TagKeys: allRaftKeys,
},
{
Name: RaftIsLeader.Name(),
Measure: RaftIsLeader,
Description: RaftIsLeader.Description(),
Aggregation: view.LastValue(),
TagKeys: allRaftKeys,
},
{
Name: RaftLeaderChanges.Name(),
Measure: RaftLeaderChanges,
Description: RaftLeaderChanges.Description(),
Aggregation: view.Count(),
TagKeys: allRaftKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
}
)

Expand Down

0 comments on commit eba6e5a

Please sign in to comment.