diff --git a/cluster/cluster.go b/cluster/cluster.go index 41140034b8..2a41153cee 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -79,9 +79,18 @@ func Start() { log.Info("CLU Start: joined to %d nodes in cluster", n) } +type partitionCandidates struct { + priority int + nodes []Node +} + // return the list of nodes to broadcast requests to -// Only 1 member per partition is returned. This list includes -// ThisNode if it is capable of handling queries. +// If partitions are assinged to nodes in groups +// (a[0,1], b[0,1], c[2,3], d[2,3] as opposed to a[0,1], b[0,2], c[1,3], d[2,3]), +// only 1 member per partition is returned. +// The nodes are selected based on priority, prefering thisNode if it +// has the lowest prio, otherwise using a random selection from all +// nodes with the lowest prio. func MembersForQuery() []Node { thisNode := Manager.ThisNode() // If we are running in single mode, just return thisNode @@ -89,10 +98,15 @@ func MembersForQuery() []Node { return []Node{thisNode} } - membersMap := make(map[int32][]Node) + // store the available nodes for each partition, grouped by + // priority + membersMap := make(map[int32]*partitionCandidates) if thisNode.IsReady() { for _, part := range thisNode.Partitions { - membersMap[part] = []Node{thisNode} + membersMap[part] = &partitionCandidates{ + priority: thisNode.Priority, + nodes: []Node{thisNode}, + } } } @@ -101,7 +115,22 @@ func MembersForQuery() []Node { continue } for _, part := range member.Partitions { - membersMap[part] = append(membersMap[part], member) + if _, ok := membersMap[part]; !ok { + membersMap[part] = &partitionCandidates{ + priority: member.Priority, + nodes: []Node{member}, + } + continue + } + if membersMap[part].priority == member.Priority { + membersMap[part].nodes = append(membersMap[part].nodes, member) + } else if membersMap[part].priority > member.Priority { + // this node has higher priority (lower number) then previously seen candidates + membersMap[part] = &partitionCandidates{ + priority: member.Priority, + nodes: []Node{member}, + } + } } } @@ -111,10 +140,8 @@ func MembersForQuery() []Node { // needed to cover all partitions LOOP: - for _, nodes := range membersMap { - // always prefer the local node which will be nodes[0] - // if it has this partition - if nodes[0].Name == thisNode.Name { + for _, candidates := range membersMap { + if candidates.nodes[0].Name == thisNode.Name { if _, ok := selectedMembers[thisNode.Name]; !ok { selectedMembers[thisNode.Name] = struct{}{} answer = append(answer, thisNode) @@ -122,14 +149,14 @@ LOOP: continue LOOP } - for _, n := range nodes { + for _, n := range candidates.nodes { if _, ok := selectedMembers[n.Name]; ok { continue LOOP } } // if no nodes have been selected yet then grab a // random node from the set of available nodes - selected := nodes[rand.Intn(len(nodes))] + selected := candidates.nodes[rand.Intn(len(candidates.nodes))] selectedMembers[selected.Name] = struct{}{} answer = append(answer, selected) } diff --git a/cluster/manager.go b/cluster/manager.go index 24b23fa4b4..f4b9f1660e 100644 --- a/cluster/manager.go +++ b/cluster/manager.go @@ -24,6 +24,8 @@ var ( nodePrimary = stats.NewBool("cluster.self.state.primary") // metric cluster.self.partitions is the number of partitions this instance consumes nodePartitions = stats.NewGauge32("cluster.self.partitions") + // metric cluster.self.priority is the priority of the node. A lower number gives higher priority + nodePriority = stats.NewGauge32("cluster.self.priority") // metric cluster.total.state.primary-ready is the number of nodes we know to be primary and ready totalPrimaryReady = stats.NewGauge32("cluster.total.state.primary-ready") @@ -309,3 +311,20 @@ func (c *ClusterManager) GetPartitions() []int32 { defer c.RUnlock() return c.members[c.nodeName].Partitions } + +// set the priority of this node. +// lower values == higher priority +func (c *ClusterManager) SetPriority(prio int) { + c.Lock() + if c.members[c.nodeName].Priority == prio { + c.Unlock() + return + } + node := c.members[c.nodeName] + node.Priority = prio + node.Updated = time.Now() + c.members[c.nodeName] = node + c.Unlock() + nodePriority.Set(prio) + c.BroadcastUpdate() +} diff --git a/cluster/node.go b/cluster/node.go index c0269d08cb..f6de6f8ecd 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -77,6 +77,7 @@ type Node struct { Primary bool `json:"primary"` PrimaryChange time.Time `json:"primaryChange"` State NodeState `json:"state"` + Priority int `json:"priority"` Started time.Time `json:"started"` StateChange time.Time `json:"stateChange"` Partitions []int32 `json:"partitions"` diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index c0464ec1ae..e9c74035c6 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -27,10 +27,10 @@ var metricsDecodeErr = stats.NewCounter32("input.kafka-mdm.metrics_decode_err") type KafkaMdm struct { input.Handler - consumer sarama.Consumer - client sarama.Client - - wg sync.WaitGroup + consumer sarama.Consumer + client sarama.Client + lagMonitor *LagMonitor + wg sync.WaitGroup // signal to PartitionConsumers to shutdown stopConsuming chan struct{} @@ -187,6 +187,7 @@ func New() *KafkaMdm { k := KafkaMdm{ consumer: consumer, client: client, + lagMonitor: NewLagMonitor(10, partitions), stopConsuming: make(chan struct{}), } @@ -215,6 +216,8 @@ func (k *KafkaMdm) Start(handler input.Handler) { go k.consumePartition(topic, partition, offset) } } + + go k.setClusterPrio() } // this will continually consume from the topic until k.stopConsuming is triggered. @@ -248,7 +251,6 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset log.Info("kafka-mdm: consuming from %s:%d from offset %d", topic, partition, currentOffset) messages := pc.Messages() ticker := time.NewTicker(offsetCommitInterval) - for { select { case msg := <-messages: @@ -273,7 +275,9 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset } partitionOffsetMetric.Set(int(currentOffset)) if err == nil { - partitionLagMetric.Set(int(offset - currentOffset)) + lag := int(offset - currentOffset) + partitionLagMetric.Set(lag) + k.lagMonitor.Store(partition, lag) } case <-k.stopConsuming: pc.Close() @@ -307,3 +311,15 @@ func (k *KafkaMdm) Stop() { k.client.Close() offsetMgr.Close() } + +func (k *KafkaMdm) setClusterPrio() { + ticker := time.NewTicker(time.Second * 10) + for { + select { + case <-k.stopConsuming: + return + case <-ticker.C: + cluster.Manager.SetPriority(k.lagMonitor.Metric()) + } + } +} diff --git a/input/kafkamdm/lag_monitor.go b/input/kafkamdm/lag_monitor.go new file mode 100644 index 0000000000..9206392250 --- /dev/null +++ b/input/kafkamdm/lag_monitor.go @@ -0,0 +1,89 @@ +package kafkamdm + +import ( + "sync" +) + +type lagLogger struct { + sync.Mutex + pos int + measurements []int +} + +func newLagLogger(size int) *lagLogger { + return &lagLogger{ + pos: 0, + measurements: make([]int, 0, size), + } +} + +func (l *lagLogger) Store(lag int) { + l.Lock() + defer l.Unlock() + l.pos++ + if len(l.measurements) < cap(l.measurements) { + l.measurements = append(l.measurements, lag) + return + } + + if l.pos >= cap(l.measurements) { + l.pos = 0 + } + l.measurements[l.pos] = lag +} + +func (l *lagLogger) Min() int { + l.Lock() + defer l.Unlock() + min := -1 + for _, m := range l.measurements { + if min < 0 || m < min { + min = m + } + } + if min < 0 { + min = 0 + } + return min +} + +/* + LagMonitor is used to determine how upToDate this node is. + We periodically collect the lag for each partition, keeping the last N + measurements in a moving window. Using those measurements we can then + compute a overall score for this node. The score is just the maximum minimum + lag of all partitions. + + For each partition we get the minimum lag seen in the last N measurements. + Using minimum ensures that transient issues dont affect the health score. + From each of those per-partition values, we then get the maximum. This + ensures that overall health is based on the worst performing partition. +*/ +type LagMonitor struct { + lag map[int32]*lagLogger +} + +func NewLagMonitor(size int, partitions []int32) *LagMonitor { + m := &LagMonitor{ + lag: make(map[int32]*lagLogger), + } + for _, p := range partitions { + m.lag[p] = newLagLogger(size) + } + return m +} + +func (l *LagMonitor) Metric() int { + max := 0 + for _, lag := range l.lag { + val := lag.Min() + if val > max { + max = val + } + } + return max +} + +func (l *LagMonitor) Store(partition int32, val int) { + l.lag[partition].Store(val) +} diff --git a/input/kafkamdm/lag_monitor_test.go b/input/kafkamdm/lag_monitor_test.go new file mode 100644 index 0000000000..0a9b4c90b9 --- /dev/null +++ b/input/kafkamdm/lag_monitor_test.go @@ -0,0 +1,49 @@ +package kafkamdm + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestLagLogger(t *testing.T) { + logger := newLagLogger(5) + Convey("with 0 measurements", t, func() { + So(logger.Min(), ShouldEqual, 0) + }) + Convey("with 1 measurements", t, func() { + logger.Store(10) + So(logger.Min(), ShouldEqual, 10) + }) + Convey("with 2 measurements", t, func() { + logger.Store(5) + So(logger.Min(), ShouldEqual, 5) + }) + Convey("with lots of measurements", t, func() { + for i := 0; i < 100; i++ { + logger.Store(i) + } + So(logger.Min(), ShouldEqual, 95) + }) +} + +func TestLagMonitor(t *testing.T) { + mon := NewLagMonitor(10, []int32{0, 1, 2, 3}) + Convey("with 0 measurements", t, func() { + So(mon.Metric(), ShouldEqual, 0) + }) + Convey("with lots of measurements", t, func() { + for part := range mon.lag { + for i := 0; i < 100; i++ { + mon.Store(part, i) + } + } + So(mon.Metric(), ShouldEqual, 90) + }) + Convey("metric should be worst partition", t, func() { + for part := range mon.lag { + mon.Store(part, 10+int(part)) + } + So(mon.Metric(), ShouldEqual, 13) + }) +}