Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add cluster metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
woodsaj committed Jan 16, 2017
1 parent 13a13f0 commit b0d033d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
8 changes: 3 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/hashicorp/memberlist"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
)

Expand All @@ -26,10 +25,9 @@ func validMode(m string) bool {
}

var (
Mode ModeType
Manager *ClusterManager
cfg *memberlist.Config
clusterPrimary = stats.NewBool("cluster.primary")
Mode ModeType
Manager *ClusterManager
cfg *memberlist.Config
)

func Init(name, version string, started time.Time, apiScheme string, apiPort int) {
Expand Down
70 changes: 67 additions & 3 deletions cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,35 @@ import (
"time"

"github.com/hashicorp/memberlist"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
)

var (
// total number of nodes this instance thinks are in the cluster.
nodeCount = stats.NewGauge32("cluster.node_count")

// number of partitions this node is consuming
nodePartitionCount = stats.NewGauge32("cluster.node_partition_count")

// number of partitions in the cluster this node is aware of.
clusterPartitionCount = stats.NewGauge32("cluster.cluster_partition_count")

// count of cluster join,update,leave events seen by this node.
joinEvents = stats.NewCounter32("cluster.join_events")
updateEvents = stats.NewCounter32("cluster.update_events")
leaveEvents = stats.NewCounter32("cluster.leave_events")

// the current state of this node.
nodeReady = stats.NewBool("cluster.ready")
nodePrimary = stats.NewBool("cluster.primary")

// Number of ready nodes in the cluster
clusterReadyNodes = stats.NewGauge32("cluster.ready_nodes")
// number of primary nodes in the cluster
clusterPrimaryNodes = stats.NewGauge32("cluster.primary_nodes")
)

type ClusterManager struct {
sync.RWMutex
Peers map[string]Node
Expand Down Expand Up @@ -40,7 +66,32 @@ func (c *ClusterManager) PeersList() []Node {
return list
}

// report the cluster stats every time there is a change to the cluster state.
// it is assumed that the lock is acquired before calling this method.
func (c *ClusterManager) clusterStats() {
readyCount := 0
primaryCount := 0
partitions := make(map[int32]int)
for _, p := range c.Peers {
if p.Primary {
primaryCount++
}
if p.IsReady() {
readyCount++
}
for _, partition := range p.Partitions {
partitions[partition]++
}
}

clusterReadyNodes.Set(readyCount)
clusterPrimaryNodes.Set(primaryCount)
clusterPartitionCount.Set(len(partitions))
nodeCount.Set(len(c.Peers))
}

func (c *ClusterManager) NotifyJoin(node *memberlist.Node) {
joinEvents.Inc()
c.Lock()
defer c.Unlock()
if len(node.Meta) == 0 {
Expand All @@ -58,16 +109,20 @@ func (c *ClusterManager) NotifyJoin(node *memberlist.Node) {
peer.local = true
}
c.Peers[node.Name] = peer
c.clusterStats()
}

func (c *ClusterManager) NotifyLeave(node *memberlist.Node) {
leaveEvents.Inc()
c.Lock()
defer c.Unlock()
log.Info("CLU manager: Node %s has left the cluster", node.Name)
delete(c.Peers, node.Name)
c.clusterStats()
}

func (c *ClusterManager) NotifyUpdate(node *memberlist.Node) {
updateEvents.Inc()
c.Lock()
defer c.Unlock()
if len(node.Meta) == 0 {
Expand All @@ -93,6 +148,7 @@ func (c *ClusterManager) NotifyUpdate(node *memberlist.Node) {
}
c.Peers[node.Name] = peer
log.Info("CLU manager: Node %s at %s has been updated - %s", node.Name, node.Addr.String(), node.Meta)
c.clusterStats()
}

func (c *ClusterManager) BroadcastUpdate() {
Expand Down Expand Up @@ -170,6 +226,7 @@ func (c *ClusterManager) MergeRemoteState(buf []byte, join bool) {
c.Peers[name] = meta
}
}
c.clusterStats()
c.Unlock()
}

Expand All @@ -183,14 +240,20 @@ func (c *ClusterManager) IsReady() bool {

// mark this node as ready to accept requests from users.
func (c *ClusterManager) SetReady() {
c.SetState(NodeReady)
}

// Set the state of this node.
func (c *ClusterManager) SetState(state NodeState) {
c.Lock()
if c.node.State == NodeReady {
if c.node.State == state {
c.Unlock()
return
}
c.node.State = NodeReady
c.node.State = state
c.node.Updated = time.Now()
c.Unlock()
nodeReady.Set(state == NodeReady)
c.BroadcastUpdate()
}

Expand Down Expand Up @@ -222,7 +285,7 @@ func (c *ClusterManager) SetPrimary(p bool) {
c.node.PrimaryChange = time.Now()
c.node.Updated = time.Now()
c.Unlock()
clusterPrimary.Set(p)
nodePrimary.Set(p)
c.BroadcastUpdate()
}

Expand All @@ -232,6 +295,7 @@ func (c *ClusterManager) SetPartitions(part []int32) {
c.node.Partitions = part
c.node.Updated = time.Now()
c.Unlock()
nodePartitionCount.Set(len(part))
c.BroadcastUpdate()
}

Expand Down
20 changes: 20 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,23 @@ how long it takes to persist a chunk (and chunks preceeding it)
this is subject to backpressure from the store when the store's queue runs full
* `tank.total_points`:
the number of points currently held in the in-memory ringbuffer
* `cluster.node_count`:
total number of nodes this instance thinks are in the cluster.
* `cluster.node_partition_count`:
number of partitions this node is consuming from
* `cluster.cluster_partition_count`:
number of partitions in the cluster which this node is aware of.
* `cluster.join_events`:
count of cluster join events processed
* `cluster.update_events`:
count of cluster update events processed
* `cluster.leave_events`:
count of cluster leave events processed
* `cluster.ready`:
flag of whether the node is ready to accept queries or not.
* `cluster.primary`:
flag of whether the node is a primary node who writes to cassandra.
* `cluster.ready_nodes`:
number of nodes in the cluster that are currently in the ready stat
* `cluster.primary_node`:
number of nodes int the cluster that are currently a primary

0 comments on commit b0d033d

Please sign in to comment.