Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
use node priority for routing requests
Browse files Browse the repository at this point in the history
- add a priority attribute to the cluster nodes
- when getting peers needed to handle a query, use the nodes
  with the lowest priority.
- update kafkamdm to track kafka lag, and use this lag as the
  node priority.  So if a node has a large lag (either due to
  proecessing the backlog at startup, or because of a fault) it
  will stop recieving queries.  But if all nodes in the cluster
  have high lag, then queries will still be processed and the user
  will get all of the data available.
- closes #519
  • Loading branch information
woodsaj committed Feb 22, 2017
1 parent 3a1fdf5 commit a81c6b3
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 17 deletions.
49 changes: 38 additions & 11 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,34 @@ func Start() {
log.Info("CLU Start: joined to %d nodes in cluster", n)
}

type partitionCandidates struct {
priority int
nodes []Node
}

// return the list of nodes to broadcast requests to
// Only 1 member per partition is returned. This list includes
// ThisNode if it is capable of handling queries.
// If partitions are assinged to nodes in groups
// (a[0,1], b[0,1], c[2,3], d[2,3] as opposed to a[0,1], b[0,2], c[1,3], d[2,3]),
// only 1 member per partition is returned.
// The nodes are selected based on priority, prefering thisNode if it
// has the lowest prio, otherwise using a random selection from all
// nodes with the lowest prio.
func MembersForQuery() []Node {
thisNode := Manager.ThisNode()
// If we are running in single mode, just return thisNode
if Mode == ModeSingle {
return []Node{thisNode}
}

membersMap := make(map[int32][]Node)
// store the available nodes for each partition, grouped by
// priority
membersMap := make(map[int32]*partitionCandidates)
if thisNode.IsReady() {
for _, part := range thisNode.Partitions {
membersMap[part] = []Node{thisNode}
membersMap[part] = &partitionCandidates{
priority: thisNode.Priority,
nodes: []Node{thisNode},
}
}
}

Expand All @@ -101,7 +115,22 @@ func MembersForQuery() []Node {
continue
}
for _, part := range member.Partitions {
membersMap[part] = append(membersMap[part], member)
if _, ok := membersMap[part]; !ok {
membersMap[part] = &partitionCandidates{
priority: member.Priority,
nodes: []Node{member},
}
continue
}
if membersMap[part].priority == member.Priority {
membersMap[part].nodes = append(membersMap[part].nodes, member)
} else if membersMap[part].priority > member.Priority {
// this node has higher priority (lower number) then previously seen candidates
membersMap[part] = &partitionCandidates{
priority: member.Priority,
nodes: []Node{member},
}
}
}
}

Expand All @@ -111,25 +140,23 @@ func MembersForQuery() []Node {
// needed to cover all partitions

LOOP:
for _, nodes := range membersMap {
// always prefer the local node which will be nodes[0]
// if it has this partition
if nodes[0].Name == thisNode.Name {
for _, candidates := range membersMap {
if candidates.nodes[0].Name == thisNode.Name {
if _, ok := selectedMembers[thisNode.Name]; !ok {
selectedMembers[thisNode.Name] = struct{}{}
answer = append(answer, thisNode)
}
continue LOOP
}

for _, n := range nodes {
for _, n := range candidates.nodes {
if _, ok := selectedMembers[n.Name]; ok {
continue LOOP
}
}
// if no nodes have been selected yet then grab a
// random node from the set of available nodes
selected := nodes[rand.Intn(len(nodes))]
selected := candidates.nodes[rand.Intn(len(candidates.nodes))]
selectedMembers[selected.Name] = struct{}{}
answer = append(answer, selected)
}
Expand Down
19 changes: 19 additions & 0 deletions cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
nodePrimary = stats.NewBool("cluster.self.state.primary")
// metric cluster.self.partitions is the number of partitions this instance consumes
nodePartitions = stats.NewGauge32("cluster.self.partitions")
// metric cluster.self.priority is the priority of the node. A lower number gives higher priority
nodePriority = stats.NewGauge32("cluster.self.priority")

// metric cluster.total.state.primary-ready is the number of nodes we know to be primary and ready
totalPrimaryReady = stats.NewGauge32("cluster.total.state.primary-ready")
Expand Down Expand Up @@ -309,3 +311,20 @@ func (c *ClusterManager) GetPartitions() []int32 {
defer c.RUnlock()
return c.members[c.nodeName].Partitions
}

// set the priority of this node.
// lower values == higher priority
func (c *ClusterManager) SetPriority(prio int) {
c.Lock()
if c.members[c.nodeName].Priority == prio {
c.Unlock()
return
}
node := c.members[c.nodeName]
node.Priority = prio
node.Updated = time.Now()
c.members[c.nodeName] = node
c.Unlock()
nodePriority.Set(prio)
c.BroadcastUpdate()
}
1 change: 1 addition & 0 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Node struct {
Primary bool `json:"primary"`
PrimaryChange time.Time `json:"primaryChange"`
State NodeState `json:"state"`
Priority int `json:"priority"`
Started time.Time `json:"started"`
StateChange time.Time `json:"stateChange"`
Partitions []int32 `json:"partitions"`
Expand Down
28 changes: 22 additions & 6 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ var metricsDecodeErr = stats.NewCounter32("input.kafka-mdm.metrics_decode_err")

type KafkaMdm struct {
input.Handler
consumer sarama.Consumer
client sarama.Client

wg sync.WaitGroup
consumer sarama.Consumer
client sarama.Client
lagMonitor *LagMonitor
wg sync.WaitGroup

// signal to PartitionConsumers to shutdown
stopConsuming chan struct{}
Expand Down Expand Up @@ -187,6 +187,7 @@ func New() *KafkaMdm {
k := KafkaMdm{
consumer: consumer,
client: client,
lagMonitor: NewLagMonitor(10, partitions),
stopConsuming: make(chan struct{}),
}

Expand Down Expand Up @@ -215,6 +216,8 @@ func (k *KafkaMdm) Start(handler input.Handler) {
go k.consumePartition(topic, partition, offset)
}
}

go k.setClusterPrio()
}

// this will continually consume from the topic until k.stopConsuming is triggered.
Expand Down Expand Up @@ -248,7 +251,6 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
log.Info("kafka-mdm: consuming from %s:%d from offset %d", topic, partition, currentOffset)
messages := pc.Messages()
ticker := time.NewTicker(offsetCommitInterval)

for {
select {
case msg := <-messages:
Expand All @@ -273,7 +275,9 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
}
partitionOffsetMetric.Set(int(currentOffset))
if err == nil {
partitionLagMetric.Set(int(offset - currentOffset))
lag := int(offset - currentOffset)
partitionLagMetric.Set(lag)
k.lagMonitor.Store(partition, lag)
}
case <-k.stopConsuming:
pc.Close()
Expand Down Expand Up @@ -307,3 +311,15 @@ func (k *KafkaMdm) Stop() {
k.client.Close()
offsetMgr.Close()
}

func (k *KafkaMdm) setClusterPrio() {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-k.stopConsuming:
return
case <-ticker.C:
cluster.Manager.SetPriority(k.lagMonitor.Metric())
}
}
}
89 changes: 89 additions & 0 deletions input/kafkamdm/lag_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kafkamdm

import (
"sync"
)

type lagLogger struct {
sync.Mutex
pos int
measurements []int
}

func newLagLogger(size int) *lagLogger {
return &lagLogger{
pos: 0,
measurements: make([]int, 0, size),
}
}

func (l *lagLogger) Store(lag int) {
l.Lock()
defer l.Unlock()
l.pos++
if len(l.measurements) < cap(l.measurements) {
l.measurements = append(l.measurements, lag)
return
}

if l.pos >= cap(l.measurements) {
l.pos = 0
}
l.measurements[l.pos] = lag
}

func (l *lagLogger) Min() int {
l.Lock()
defer l.Unlock()
min := -1
for _, m := range l.measurements {
if min < 0 || m < min {
min = m
}
}
if min < 0 {
min = 0
}
return min
}

/*
LagMonitor is used to determine how upToDate this node is.
We periodically collect the lag for each partition, keeping the last N
measurements in a moving window. Using those measurements we can then
compute a overall score for this node. The score is just the maximum minimum
lag of all partitions.
For each partition we get the minimum lag seen in the last N measurements.
Using minimum ensures that transient issues dont affect the health score.
From each of those per-partition values, we then get the maximum. This
ensures that overall health is based on the worst performing partition.
*/
type LagMonitor struct {
lag map[int32]*lagLogger
}

func NewLagMonitor(size int, partitions []int32) *LagMonitor {
m := &LagMonitor{
lag: make(map[int32]*lagLogger),
}
for _, p := range partitions {
m.lag[p] = newLagLogger(size)
}
return m
}

func (l *LagMonitor) Metric() int {
max := 0
for _, lag := range l.lag {
val := lag.Min()
if val > max {
max = val
}
}
return max
}

func (l *LagMonitor) Store(partition int32, val int) {
l.lag[partition].Store(val)
}
49 changes: 49 additions & 0 deletions input/kafkamdm/lag_monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafkamdm

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestLagLogger(t *testing.T) {
logger := newLagLogger(5)
Convey("with 0 measurements", t, func() {
So(logger.Min(), ShouldEqual, 0)
})
Convey("with 1 measurements", t, func() {
logger.Store(10)
So(logger.Min(), ShouldEqual, 10)
})
Convey("with 2 measurements", t, func() {
logger.Store(5)
So(logger.Min(), ShouldEqual, 5)
})
Convey("with lots of measurements", t, func() {
for i := 0; i < 100; i++ {
logger.Store(i)
}
So(logger.Min(), ShouldEqual, 95)
})
}

func TestLagMonitor(t *testing.T) {
mon := NewLagMonitor(10, []int32{0, 1, 2, 3})
Convey("with 0 measurements", t, func() {
So(mon.Metric(), ShouldEqual, 0)
})
Convey("with lots of measurements", t, func() {
for part := range mon.lag {
for i := 0; i < 100; i++ {
mon.Store(part, i)
}
}
So(mon.Metric(), ShouldEqual, 90)
})
Convey("metric should be worst partition", t, func() {
for part := range mon.lag {
mon.Store(part, 10+int(part))
}
So(mon.Metric(), ShouldEqual, 13)
})
}

0 comments on commit a81c6b3

Please sign in to comment.