Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add kafka cluster implementation next to nsq
Browse files Browse the repository at this point in the history
note that we needed to move the config parameters out of mdata package,
otherwise the names were conflicting
  • Loading branch information
Dieterbe committed Jul 18, 2016
1 parent 5e52db2 commit 02a8a92
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 34 deletions.
171 changes: 171 additions & 0 deletions mdata/clkafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package mdata

import (
"bytes"
"encoding/binary"
"encoding/json"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/raintank/met"
cfg "github.com/raintank/metrictank/mdata/clkafka"
"github.com/raintank/worldping-api/pkg/log"
)

type ClKafka struct {
in chan SavedChunk
buf []SavedChunk
wg sync.WaitGroup
instance string
consumer *cluster.Consumer
producer sarama.SyncProducer
StopChan chan int
Cl
}

func NewKafka(instance string, metrics Metrics, stats met.Backend) *ClKafka {
consumer, err := cluster.NewConsumer(cfg.Brokers, cfg.Group, cfg.Topics, cfg.CConfig)
if err != nil {
log.Fatal(2, "kafka-cluster failed to start consumer: %s", err)
}
log.Info("kafka-cluster consumer started without error")

producer, err := sarama.NewSyncProducer(cfg.Brokers, cfg.PConfig)
if err != nil {
log.Fatal(2, "kafka-cluster failed to start producer: %s", err)
}

c := ClKafka{
in: make(chan SavedChunk),
consumer: consumer,
producer: producer,
instance: instance,
Cl: Cl{
instance: instance,
metrics: metrics,
},
StopChan: make(chan int),
}
go c.notifications()
go c.consume()
go c.produce()

return &c
}

func (c *ClKafka) consume() {
c.wg.Add(1)
messageChan := c.consumer.Messages()
for msg := range messageChan {
if LogLevel < 2 {
log.Debug("CLU kafka-cluster received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
}
c.Handle(msg.Value)
c.consumer.MarkOffset(msg, "")
}
log.Info("CLU kafka-cluster consumer ended.")
c.wg.Done()
}

func (c *ClKafka) notifications() {
c.wg.Add(1)
for msg := range c.consumer.Notifications() {
if len(msg.Claimed) > 0 {
for topic, partitions := range msg.Claimed {
log.Info("CLU kafka-cluster consumer claimed %d partitions on topic: %s", len(partitions), topic)
}
}
if len(msg.Released) > 0 {
for topic, partitions := range msg.Released {
log.Info("CLU kafka-cluster consumer released %d partitions on topic: %s", len(partitions), topic)
}
}

if len(msg.Current) == 0 {
log.Info("CLU kafka-cluster consumer is no longer consuming from any partitions.")
} else {
log.Info("CLU kafka-cluster Current partitions:")
for topic, partitions := range msg.Current {
log.Info("CLU kafka-cluster Current partitions: %s: %v", topic, partitions)
}
}
}
log.Info("CLU kafka-cluster notification processing stopped")
c.wg.Done()
}

// Stop will initiate a graceful stop of the Consumer (permanent)
//
// NOTE: receive on StopChan to block until this process completes
func (c *ClKafka) Stop() {
// closes notifications and messages channels, amongst others
c.consumer.Close()
c.producer.Close()

go func() {
c.wg.Wait()
close(c.StopChan)
}()
}

func (c *ClKafka) Send(sc SavedChunk) {
c.in <- sc
}

func (c *ClKafka) produce() {
ticker := time.NewTicker(time.Second)
max := 5000
for {
select {
case chunk := <-c.in:
c.buf = append(c.buf, chunk)
if len(c.buf) == max {
c.flush()
}
case <-ticker.C:
c.flush()
}
}
}

// flush makes sure the batch gets sent, asynchronously.
func (c *ClKafka) flush() {
if len(c.buf) == 0 {
return
}

msg := PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf}
c.buf = nil

go func() {
log.Debug("CLU kafka-cluster sending %d batch metricPersist messages", len(msg.SavedChunks))

data, err := json.Marshal(&msg)
if err != nil {
log.Fatal(4, "CLU kafka-cluster failed to marshal persistMessage to json.")
}
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1))
buf.Write(data)
messagesSize.Value(int64(buf.Len()))
payload := &sarama.ProducerMessage{
Topic: cfg.Topic,
Value: sarama.ByteEncoder(buf.Bytes()),
}

sent := false
for !sent {
// note: currently we don't do partitioning yet for cluster msgs, so no key needed
_, _, err := c.producer.SendMessage(payload)
if err != nil {
log.Warn("CLU kafka-cluster publisher %s", err)
} else {
sent = true
}
time.Sleep(time.Second)
}
messagesPublished.Inc(1)
}()
}
57 changes: 57 additions & 0 deletions mdata/clkafka/cfg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package clkafka

import (
"flag"
"log"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/rakyll/globalconf"
)

var Enabled bool
var broker string
var topic string
var Brokers []string
var Topic string
var Topics []string
var Group string
var CConfig *cluster.Config
var PConfig *sarama.Config

func ConfigSetup() {
inKafkaMdam := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
inKafkaMdam.BoolVar(&Enabled, "enabled", false, "")
inKafkaMdam.StringVar(&broker, "broker", "kafka:9092", "tcp address for kafka")
inKafkaMdam.StringVar(&Topic, "topic", "metricpersist", "kafka topic")
inKafkaMdam.StringVar(&Group, "group", "group1", "kafka consumer group")
globalconf.Register("kafka-cluster", inKafkaMdam)
}

func ConfigProcess(instance string) {
if !Enabled {
return
}
Brokers = []string{broker}
Topics = []string{Topic}

CConfig = cluster.NewConfig()
// see https://github.com/raintank/metrictank/issues/236
CConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
CConfig.ClientID = instance + "-cluster"
CConfig.Group.Return.Notifications = true
CConfig.Config.Version = sarama.V0_10_0_0
err := CConfig.Validate()
if err != nil {
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
}

PConfig = sarama.NewConfig()
PConfig.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
PConfig.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
PConfig.Producer.Compression = sarama.CompressionNone
err = PConfig.Validate()
if err != nil {
log.Fatal(2, "kafka-cluster invalid producer config: %s", err)
}
}
49 changes: 18 additions & 31 deletions mdata/clnsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,14 @@ import (
"github.com/bitly/go-hostpool"
"github.com/nsqio/go-nsq"
"github.com/raintank/met"
clNSQ "github.com/raintank/metrictank/mdata/clnsq"
cfg "github.com/raintank/metrictank/mdata/clnsq"
"github.com/raintank/misc/instrumented_nsq"
"github.com/raintank/worldping-api/pkg/log"
)

var (
hostPool hostpool.HostPool
producers map[string]*nsq.Producer
nsqdAdds []string
lookupdAdds []string
topic string
channel string
pCfg *nsq.Config
cCfg *nsq.Config
hostPool hostpool.HostPool
producers map[string]*nsq.Producer
)

type ClNSQ struct {
Expand All @@ -33,29 +27,22 @@ type ClNSQ struct {
}

func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ {
nsqdAdds = clNSQ.NsqdAdds
lookupdAdds = clNSQ.LookupdAdds
topic = clNSQ.Topic
channel = clNSQ.Channel
pCfg = clNSQ.PCfg
cCfg = clNSQ.CCfg

// producers
hostPool = hostpool.NewEpsilonGreedy(nsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{})
hostPool = hostpool.NewEpsilonGreedy(cfg.NsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{})
producers = make(map[string]*nsq.Producer)

for _, addr := range nsqdAdds {
producer, err := nsq.NewProducer(addr, pCfg)
for _, addr := range cfg.NsqdAdds {
producer, err := nsq.NewProducer(addr, cfg.PCfg)
if err != nil {
log.Fatal(4, "failed creating producer %s", err.Error())
log.Fatal(4, "nsq-cluster failed creating producer %s", err.Error())
}
producers[addr] = producer
}

// consumers
consumer, err := insq.NewConsumer(topic, channel, cCfg, "metric_persist.%s", stats)
consumer, err := insq.NewConsumer(cfg.Topic, cfg.Channel, cfg.CCfg, "metric_persist.%s", stats)
if err != nil {
log.Fatal(4, "Failed to create NSQ consumer. %s", err)
log.Fatal(4, "nsq-cluster failed to create NSQ consumer. %s", err)
}
c := &ClNSQ{
in: make(chan SavedChunk),
Expand All @@ -67,15 +54,15 @@ func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ {
}
consumer.AddConcurrentHandlers(c, 2)

err = consumer.ConnectToNSQDs(nsqdAdds)
err = consumer.ConnectToNSQDs(cfg.NsqdAdds)
if err != nil {
log.Fatal(4, "failed to connect to NSQDs. %s", err)
log.Fatal(4, "nsq-cluster failed to connect to NSQDs. %s", err)
}
log.Info("persist consumer connected to nsqd")
log.Info("nsq-cluster persist consumer connected to nsqd")

err = consumer.ConnectToNSQLookupds(lookupdAdds)
err = consumer.ConnectToNSQLookupds(cfg.LookupdAdds)
if err != nil {
log.Fatal(4, "failed to connect to NSQLookupds. %s", err)
log.Fatal(4, "nsq-cluster failed to connect to NSQLookupds. %s", err)
}
go c.run()
return c
Expand Down Expand Up @@ -116,11 +103,11 @@ func (c *ClNSQ) flush() {
c.buf = nil

go func() {
log.Debug("CLU sending %d batch metricPersist messages", len(msg.SavedChunks))
log.Debug("CLU nsq-cluster sending %d batch metricPersist messages", len(msg.SavedChunks))

data, err := json.Marshal(&msg)
if err != nil {
log.Fatal(4, "failed to marshal persistMessage to json.")
log.Fatal(4, "CLU nsq-cluster failed to marshal persistMessage to json.")
}
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1))
Expand All @@ -134,12 +121,12 @@ func (c *ClNSQ) flush() {
// will result in this loop repeating forever until we successfully publish our msg.
hostPoolResponse := hostPool.Get()
prod := producers[hostPoolResponse.Host()]
err = prod.Publish(topic, buf.Bytes())
err = prod.Publish(cfg.Topic, buf.Bytes())
// Hosts that are marked as dead will be retried after 30seconds. If we published
// successfully, then sending a nil error will mark the host as alive again.
hostPoolResponse.Mark(err)
if err != nil {
log.Warn("publisher marking host %s as faulty due to %s", hostPoolResponse.Host(), err)
log.Warn("CLU nsq-cluster publisher marking host %s as faulty due to %s", hostPoolResponse.Host(), err)
} else {
sent = true
}
Expand Down
4 changes: 2 additions & 2 deletions mdata/clnsq/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func ConfigProcess() {
PCfg.UserAgent = "metrictank-cluster"
err := app.ParseOpts(PCfg, ProducerOpts)
if err != nil {
log.Fatal(4, "failed to parse nsq producer options. %s", err)
log.Fatal(4, "nsq-cluster: failed to parse nsq producer options. %s", err)
}

// consumer
CCfg = nsq.NewConfig()
CCfg.UserAgent = "metrictank-cluster"
err = app.ParseOpts(CCfg, ConsumerOpts)
if err != nil {
log.Fatal(4, "failed to parse nsq consumer options. %s", err)
log.Fatal(4, "nsq-cluster: failed to parse nsq consumer options. %s", err)
}
CCfg.MaxInFlight = MaxInFlight
}
Loading

0 comments on commit 02a8a92

Please sign in to comment.