diff --git a/dashboard.json b/dashboard.json index 0806ea22a8..2bf627c76d 100644 --- a/dashboard.json +++ b/dashboard.json @@ -1247,11 +1247,11 @@ "targets": [ { "refId": "A", - "target": "aliasByNode(stats.$environment.timers.metrictank.$instance.cluster.message_size.{mean,upper}, 6, 7)" + "target": "aliasByNode(stats.$environment.timers.metrictank.$instance.notifier.*.message_size.{mean,upper}, 5, 6, 7)" }, { "refId": "B", - "target": "alias(stats.$environment.metrictank.$instance.cluster.messages-published, 'msg published')" + "target": "aliasByNode(stats.$environment.metrictank.$instance.notifier.*.messages-published, 5, 6)" } ], "timeFrom": null, diff --git a/in/in.go b/in/in.go deleted file mode 100644 index 6652610c75..0000000000 --- a/in/in.go +++ /dev/null @@ -1,132 +0,0 @@ -// Package in provides interfaces, concrete implementations, and utilities -// to ingest data into metrictank -package in - -import ( - "fmt" - "time" - - "github.com/raintank/met" - "github.com/raintank/metrictank/idx" - "github.com/raintank/metrictank/mdata" - "github.com/raintank/metrictank/usage" - "github.com/raintank/worldping-api/pkg/log" - "gopkg.in/raintank/schema.v1" - "gopkg.in/raintank/schema.v1/msg" -) - -// In is a base handler for a metrics packet, aimed to be embedded by concrete implementations -type In struct { - metricsPerMessage met.Meter - metricsReceived met.Count - MetricsDecodeErr met.Count // metric metrics_decode_err is a count of times an input message (MetricData, MetricDataArray or carbon line) failed to parse - MetricInvalid met.Count // metric metric_invalid is a count of times a metric did not validate - msgsAge met.Meter // in ms - tmp msg.MetricData - - metrics mdata.Metrics - metricIndex idx.MetricIndex - usage *usage.Usage -} - -func New(metrics mdata.Metrics, metricIndex idx.MetricIndex, usage *usage.Usage, input string, stats met.Backend) In { - return In{ - metricsPerMessage: stats.NewMeter(fmt.Sprintf("%s.metrics_per_message", input), 0), - metricsReceived: stats.NewCount(fmt.Sprintf("%s.metrics_received", input)), - MetricsDecodeErr: stats.NewCount(fmt.Sprintf("%s.metrics_decode_err", input)), - MetricInvalid: stats.NewCount(fmt.Sprintf("%s.metric_invalid", input)), - msgsAge: stats.NewMeter(fmt.Sprintf("%s.message_age", input), 0), - tmp: msg.MetricData{Metrics: make([]*schema.MetricData, 1)}, - - metrics: metrics, - metricIndex: metricIndex, - usage: usage, - } -} - -// process makes sure the data is stored and the metadata is in the index, -// and the usage is tracked, if enabled. -// concurrency-safe. -func (in In) process(metric *schema.MetricData) { - if metric == nil { - return - } - err := metric.Validate() - if err != nil { - in.MetricInvalid.Inc(1) - log.Debug("Invalid metric %s %v", err, metric) - return - } - if metric.Time == 0 { - log.Warn("invalid metric. metric.Time is 0. %s", metric.Id) - } else { - in.metricIndex.Add(metric) - m := in.metrics.GetOrCreate(metric.Id) - m.Add(uint32(metric.Time), metric.Value) - if in.usage != nil { - in.usage.Add(metric.OrgId, metric.Id) - } - } -} - -// HandleLegacy processes legacy datapoints. we don't track msgsAge here -// concurrency-safe -func (in In) HandleLegacy(name string, val float64, ts uint32, interval int) { - // TODO reuse? - md := &schema.MetricData{ - Name: name, - Metric: name, - Interval: interval, - Value: val, - Unit: "unknown", - Time: int64(ts), - Mtype: "gauge", - Tags: []string{}, - OrgId: 1, // admin org - } - md.SetId() - in.metricsPerMessage.Value(int64(1)) - in.metricsReceived.Inc(1) - in.process(md) -} - -// Handle processes simple messages without format spec or produced timestamp, so we don't track msgsAge here -// concurrency-safe -func (in In) Handle(data []byte) { - // TODO reuse? - md := schema.MetricData{} - _, err := md.UnmarshalMsg(data) - if err != nil { - in.MetricsDecodeErr.Inc(1) - log.Error(3, "skipping message. %s", err) - return - } - in.metricsPerMessage.Value(int64(1)) - in.metricsReceived.Inc(1) - in.process(&md) -} - -// HandleArray processes MetricDataArray messages that have a format spec and produced timestamp. -// not concurrency-safe! -func (in In) HandleArray(data []byte) { - err := in.tmp.InitFromMsg(data) - if err != nil { - in.MetricsDecodeErr.Inc(1) - log.Error(3, "skipping message. %s", err) - return - } - in.msgsAge.Value(time.Now().Sub(in.tmp.Produced).Nanoseconds() / 1000) - - err = in.tmp.DecodeMetricData() // reads metrics from in.tmp.Msg and unsets it - if err != nil { - in.MetricsDecodeErr.Inc(1) - log.Error(3, "skipping message. %s", err) - return - } - in.metricsPerMessage.Value(int64(len(in.tmp.Metrics))) - in.metricsReceived.Inc(int64(len(in.tmp.Metrics))) - - for _, metric := range in.tmp.Metrics { - in.process(metric) - } -} diff --git a/in/in_test.go b/in/in_test.go deleted file mode 100644 index bc9f050535..0000000000 --- a/in/in_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package in - -import ( - "testing" - - "github.com/benbjohnson/clock" - "github.com/raintank/met/helper" - "github.com/raintank/metrictank/idx/memory" - "github.com/raintank/metrictank/mdata" - "github.com/raintank/metrictank/mdata/chunk" - "github.com/raintank/metrictank/usage" - "gopkg.in/raintank/schema.v1" -) - -func BenchmarkHandle(b *testing.B) { - stats, _ := helper.New(false, "", "standard", "metrictank", "") - mdata.CluStatus = mdata.NewClusterStatus("default", false) - mdata.InitMetrics(stats) - - store := mdata.NewDevnullStore() - aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) - metricIndex := memory.New() - metricIndex.Init(stats) - usage := usage.New(300, aggmetrics, metricIndex, clock.New()) - in := New(aggmetrics, metricIndex, usage, "test", stats) - - // timestamps start at 10 and go up from there. (we can't use 0, see AggMetric.Add()) - datas := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - metric := &schema.MetricData{ - Id: "some.id.of.a.metric", - OrgId: 500, - Name: "some.id", - Metric: "metric", - Interval: 10, - Value: 1234.567, - Unit: "ms", - Time: int64((i + 1) * 10), - Mtype: "gauge", - Tags: []string{"some_tag", "ok"}, - } - data, err := metric.MarshalMsg(nil) - if err != nil { - b.Fatal(err.Error()) - } - datas[i] = data - } - - b.ResetTimer() - go func() { - for range chunk.TotalPoints { - } - }() - for i := 0; i < b.N; i++ { - in.Handle(datas[i]) - } -} diff --git a/in/carbon/carbon.go b/input/carbon/carbon.go similarity index 53% rename from in/carbon/carbon.go rename to input/carbon/carbon.go index 30f125ae76..736914f9cc 100644 --- a/in/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -7,24 +7,65 @@ import ( "flag" "io" "net" + "sync" "github.com/lomik/go-carbon/persister" "github.com/metrics20/go-metrics20/carbon20" "github.com/raintank/met" "github.com/raintank/metrictank/idx" - "github.com/raintank/metrictank/in" + "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/usage" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" + "gopkg.in/raintank/schema.v1" ) type Carbon struct { - in.In - addrStr string - addr *net.TCPAddr - schemas persister.WhisperSchemas - stats met.Backend + input.Input + addrStr string + addr *net.TCPAddr + schemas persister.WhisperSchemas + stats met.Backend + listener *net.TCPListener + handlerWaitGroup sync.WaitGroup + quit chan struct{} + connTrack *ConnTrack +} + +type ConnTrack struct { + sync.Mutex + conns map[string]net.Conn +} + +func NewConnTrack() *ConnTrack { + return &ConnTrack{ + conns: make(map[string]net.Conn), + } +} + +func (c *ConnTrack) Add(conn net.Conn) { + c.Lock() + c.conns[conn.RemoteAddr().String()] = conn + c.Unlock() +} + +func (c *ConnTrack) Remove(conn net.Conn) { + c.Lock() + delete(c.conns, conn.RemoteAddr().String()) + c.Unlock() +} + +func (c *ConnTrack) CloseAll() { + c.Lock() + for _, conn := range c.conns { + conn.Close() + } + c.Unlock() +} + +func (c *Carbon) Name() string { + return "carbon" } var Enabled bool @@ -47,7 +88,7 @@ func ConfigProcess() { var err error schemas, err = persister.ReadWhisperSchemas(schemasFile) if err != nil { - log.Fatal(4, "can't read schemas file %q: %s", schemasFile, err.Error()) + log.Fatal(4, "carbon-in: can't read schemas file %q: %s", schemasFile, err.Error()) } var defaultFound bool for _, schema := range schemas { @@ -61,7 +102,7 @@ func ConfigProcess() { if !defaultFound { // good graphite health (not sure what graphite does if there's no .*) // but we definitely need to always be able to determine which interval to use - log.Fatal(4, "storage-conf does not have a default '.*' pattern") + log.Fatal(4, "carbon-in: storage-conf does not have a default '.*' pattern") } } @@ -69,39 +110,61 @@ func ConfigProcess() { func New(stats met.Backend) *Carbon { addrT, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - log.Fatal(4, err.Error()) + log.Fatal(4, "carbon-in: %s", err.Error()) } return &Carbon{ - addrStr: addr, - addr: addrT, - schemas: schemas, - stats: stats, + addrStr: addr, + addr: addrT, + schemas: schemas, + stats: stats, + quit: make(chan struct{}), + connTrack: NewConnTrack(), } } func (c *Carbon) Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, usg *usage.Usage) { - c.In = in.New(metrics, metricIndex, usg, "carbon", c.stats) + c.Input = input.New(metrics, metricIndex, usg, "carbon", c.stats) l, err := net.ListenTCP("tcp", c.addr) if nil != err { - log.Fatal(4, err.Error()) + log.Fatal(4, "carbon-in: %s", err.Error()) } + c.listener = l log.Info("carbon-in: listening on %v/tcp", c.addr) - go c.accept(l) + go c.accept() } -func (c *Carbon) accept(l *net.TCPListener) { +func (c *Carbon) accept() { for { - conn, err := l.AcceptTCP() + conn, err := c.listener.AcceptTCP() if nil != err { - log.Error(4, err.Error()) - break + select { + case <-c.quit: + // we are shutting down. + return + default: + } + log.Error(4, "carbon-in: Accept Error: %s", err.Error()) + return } + c.handlerWaitGroup.Add(1) + c.connTrack.Add(conn) go c.handle(conn) } } +func (c *Carbon) Stop() { + log.Info("carbon-in: shutting down.") + close(c.quit) + c.listener.Close() + c.connTrack.CloseAll() + c.handlerWaitGroup.Wait() +} + func (c *Carbon) handle(conn net.Conn) { - defer conn.Close() + defer func() { + conn.Close() + c.connTrack.Remove(conn) + }() // TODO c.SetTimeout(60e9) r := bufio.NewReaderSize(conn, 4096) for { @@ -109,15 +172,21 @@ func (c *Carbon) handle(conn net.Conn) { buf, _, err := r.ReadLine() if nil != err { + select { + case <-c.quit: + // we are shutting down. + break + default: + } if io.EOF != err { - log.Error(4, err.Error()) + log.Error(4, "carbon-in: Recv error: %s", err.Error()) } break } key, val, ts, err := carbon20.ValidatePacket(buf, carbon20.Medium) if err != nil { - c.In.MetricsDecodeErr.Inc(1) + c.Input.MetricsDecodeErr.Inc(1) log.Error(4, "carbon-in: invalid metric: %s", err.Error()) continue } @@ -127,6 +196,20 @@ func (c *Carbon) handle(conn net.Conn) { log.Fatal(4, "carbon-in: couldn't find a schema for %q - this is impossible since we asserted there was a default with patt .*", name) } interval := s.Retentions[0].SecondsPerPoint() - c.HandleLegacy(string(key), val, ts, interval) + md := &schema.MetricData{ + Name: name, + Metric: name, + Interval: interval, + Value: val, + Unit: "unknown", + Time: int64(ts), + Mtype: "gauge", + Tags: []string{}, + OrgId: 1, // admin org + } + md.SetId() + c.Input.MetricsPerMessage.Value(int64(1)) + c.Input.Process(md) } + c.handlerWaitGroup.Done() } diff --git a/input/carbon/carbon_test.go b/input/carbon/carbon_test.go new file mode 100644 index 0000000000..bfb6e80144 --- /dev/null +++ b/input/carbon/carbon_test.go @@ -0,0 +1,109 @@ +package carbon + +import ( + "fmt" + "net" + "regexp" + "sync" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/lomik/go-carbon/persister" + "github.com/raintank/met/helper" + "github.com/raintank/metrictank/idx/memory" + "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/usage" + "gopkg.in/raintank/schema.v1" +) + +func Test_HandleMessage(t *testing.T) { + stats, _ := helper.New(false, "", "standard", "metrictank", "") + mdata.CluStatus = mdata.NewClusterStatus("test", false) + mdata.InitMetrics(stats) + store := mdata.NewDevnullStore() + aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + metricIndex := memory.New() + metricIndex.Init(stats) + usage := usage.New(300, aggmetrics, metricIndex, clock.New()) + Enabled = true + addr = "localhost:2003" + var err error + s := persister.Schema{ + Name: "default", + RetentionStr: "1s:1d", + Pattern: regexp.MustCompile(".*"), + } + s.Retentions, err = persister.ParseRetentionDefs(s.RetentionStr) + if err != nil { + panic(err) + } + + schemas = persister.WhisperSchemas{s} + c := New(stats) + c.Start(aggmetrics, metricIndex, usage) + + allMetrics := make(map[string]int) + var mu sync.Mutex + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int, t *testing.T) { + metrics := test_handleMessage(i, t) + mu.Lock() + for mId, id := range metrics { + allMetrics[mId] = id + } + mu.Unlock() + wg.Done() + }(i, t) + } + wg.Wait() + c.Stop() + defs := metricIndex.List(-1) + if len(defs) != len(allMetrics) { + t.Fatalf("query for org -1 should result in %d distinct metrics. not %d", len(allMetrics), len(defs)) + } + + for _, d := range defs { + id := allMetrics[d.Id] + if d.Name != fmt.Sprintf("some.id.%d", id) { + t.Fatalf("incorrect name for %s : %s, expected %s", d.Id, d.Name, fmt.Sprintf("some.id.%d", id)) + } + if d.OrgId != 1 { + t.Fatalf("incorrect OrgId for %s : %d", d.Id, d.OrgId) + } + } +} + +func test_handleMessage(worker int, t *testing.T) map[string]int { + conn, _ := net.Dial("tcp", "127.0.0.1:2003") + defer conn.Close() + metrics := make(map[string]int) + for m := 0; m < 4; m++ { + id := (worker + 1) * (m + 1) + t.Logf("worker %d metric %d -> adding metric with id %d and orgid %d", worker, m, id, 1) + md := &schema.MetricData{ + Name: fmt.Sprintf("some.id.%d", id), + Metric: fmt.Sprintf("some.id.%d", id), + Interval: 1, + Value: 1234.567, + Unit: "unknown", + Time: int64(id), + Mtype: "gauge", + Tags: []string{}, + OrgId: 1, // admin org + } + md.SetId() + metrics[md.Id] = id + t.Logf("%s = %s", md.Id, md.Name) + _, err := fmt.Fprintf(conn, fmt.Sprintf("%s %f %d\n", md.Name, md.Value, md.Time)) + if err != nil { + t.Fatal(err) + } + } + // as soon as this function ends, the server will close the socket. We need to sleep here + // to ensure that the packets have time to be procesed by the kernel and passed to the server. + time.Sleep(time.Millisecond * 100) + return metrics +} diff --git a/input/input.go b/input/input.go new file mode 100644 index 0000000000..b59bf5404f --- /dev/null +++ b/input/input.go @@ -0,0 +1,67 @@ +// Package in provides interfaces, concrete implementations, and utilities +// to ingest data into metrictank +package input + +import ( + "fmt" + + "github.com/raintank/met" + "github.com/raintank/metrictank/idx" + "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/usage" + "github.com/raintank/worldping-api/pkg/log" + "gopkg.in/raintank/schema.v1" +) + +// In is a base handler for a metrics packet, aimed to be embedded by concrete implementations +type Input struct { + MetricsPerMessage met.Meter + metricsReceived met.Count + MetricsDecodeErr met.Count // metric metrics_decode_err is a count of times an input message (MetricData, MetricDataArray or carbon line) failed to parse + MetricInvalid met.Count // metric metric_invalid is a count of times a metric did not validate + MsgsAge met.Meter // in ms + + metrics mdata.Metrics + metricIndex idx.MetricIndex + usage *usage.Usage +} + +func New(metrics mdata.Metrics, metricIndex idx.MetricIndex, usage *usage.Usage, input string, stats met.Backend) Input { + return Input{ + MetricsPerMessage: stats.NewMeter(fmt.Sprintf("%s.metrics_per_message", input), 0), + metricsReceived: stats.NewCount(fmt.Sprintf("%s.metrics_received", input)), + MetricsDecodeErr: stats.NewCount(fmt.Sprintf("%s.metrics_decode_err", input)), + MetricInvalid: stats.NewCount(fmt.Sprintf("%s.metric_invalid", input)), + MsgsAge: stats.NewMeter(fmt.Sprintf("%s.message_age", input), 0), + + metrics: metrics, + metricIndex: metricIndex, + usage: usage, + } +} + +// process makes sure the data is stored and the metadata is in the index, +// and the usage is tracked, if enabled. +// concurrency-safe. +func (in Input) Process(metric *schema.MetricData) { + if metric == nil { + return + } + in.metricsReceived.Inc(1) + err := metric.Validate() + if err != nil { + in.MetricInvalid.Inc(1) + log.Debug("in: Invalid metric %s %v", err, metric) + return + } + if metric.Time == 0 { + log.Warn("in: invalid metric. metric.Time is 0. %s", metric.Id) + } else { + in.metricIndex.Add(metric) + m := in.metrics.GetOrCreate(metric.Id) + m.Add(uint32(metric.Time), metric.Value) + if in.usage != nil { + in.usage.Add(metric.OrgId, metric.Id) + } + } +} diff --git a/input/input_test.go b/input/input_test.go new file mode 100644 index 0000000000..a5c0b5b7a0 --- /dev/null +++ b/input/input_test.go @@ -0,0 +1,126 @@ +package input + +import ( + "fmt" + "testing" + + "github.com/benbjohnson/clock" + "github.com/raintank/met/helper" + "github.com/raintank/metrictank/idx/memory" + "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/mdata/chunk" + "github.com/raintank/metrictank/usage" + "gopkg.in/raintank/schema.v1" +) + +func Test_Process(t *testing.T) { + stats, _ := helper.New(false, "", "standard", "metrictank", "") + mdata.CluStatus = mdata.NewClusterStatus("test", false) + mdata.InitMetrics(stats) + store := mdata.NewDevnullStore() + aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + metricIndex := memory.New() + metricIndex.Init(stats) + usage := usage.New(300, aggmetrics, metricIndex, clock.New()) + in := New(aggmetrics, metricIndex, usage, "test", stats) + + allMetrics := make(map[string]int) + for i := 0; i < 5; i++ { + metrics := test_Process(i, &in, t) + for mId, id := range metrics { + allMetrics[mId] = id + } + } + defs := metricIndex.List(-1) + if len(defs) != 13 { + t.Fatalf("query for org -1 should result in 13 distinct metrics. not %d", len(defs)) + } + + for _, d := range defs { + id := allMetrics[d.Id] + if d.Name != fmt.Sprintf("some.id.%d", id) { + t.Fatalf("incorrect name for %s : %s", d.Id, d.Name) + } + if d.OrgId != id { + t.Fatalf("incorrect OrgId for %s : %d", d.Id, d.OrgId) + } + if d.Tags[0] != fmt.Sprintf("%d", id) { + t.Fatalf("incorrect tags for %s : %s", d.Id, d.Tags) + } + } + + defs = metricIndex.List(2) + if len(defs) != 1 { + t.Fatalf("len of defs should be exactly 1. got defs with len %d: %v", len(defs), defs) + } + d := defs[0] + if d.OrgId != 2 { + t.Fatalf("incorrect metricdef returned: %v", d) + } +} + +func test_Process(worker int, in *Input, t *testing.T) map[string]int { + var metric *schema.MetricData + metrics := make(map[string]int) + for m := 0; m < 4; m++ { + id := (worker + 1) * (m + 1) + t.Logf("worker %d metric %d -> adding metric with id and orgid %d", worker, m, id) + + metric = &schema.MetricData{ + Id: "", + OrgId: id, + Name: fmt.Sprintf("some.id.%d", id), + Metric: fmt.Sprintf("some.id.%d", id), + Interval: 60, + Value: 1234.567, + Unit: "ms", + Time: int64(id), + Mtype: "gauge", + Tags: []string{fmt.Sprintf("%d", id)}, + } + metric.SetId() + metrics[metric.Id] = id + in.Process(metric) + } + return metrics +} + +func BenchmarkProcess(b *testing.B) { + stats, _ := helper.New(false, "", "standard", "metrictank", "") + mdata.CluStatus = mdata.NewClusterStatus("default", false) + mdata.InitMetrics(stats) + + store := mdata.NewDevnullStore() + aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + metricIndex := memory.New() + metricIndex.Init(stats) + usage := usage.New(300, aggmetrics, metricIndex, clock.New()) + in := New(aggmetrics, metricIndex, usage, "test", stats) + + // timestamps start at 10 and go up from there. (we can't use 0, see AggMetric.Add()) + datas := make([]*schema.MetricData, b.N) + for i := 0; i < b.N; i++ { + metric := &schema.MetricData{ + Id: "some.id.of.a.metric", + OrgId: 500, + Name: "some.id", + Metric: "metric", + Interval: 10, + Value: 1234.567, + Unit: "ms", + Time: int64((i + 1) * 10), + Mtype: "gauge", + Tags: []string{"some_tag", "ok"}, + } + datas[i] = metric + } + + b.ResetTimer() + go func() { + for range chunk.TotalPoints { + } + }() + for i := 0; i < b.N; i++ { + in.Process(datas[i]) + } +} diff --git a/in/kafkamdam/kafkamdam.go b/input/kafkamdam/kafkamdam.go similarity index 77% rename from in/kafkamdam/kafkamdam.go rename to input/kafkamdam/kafkamdam.go index 7d3e3ffc62..30a6ff2920 100644 --- a/in/kafkamdam/kafkamdam.go +++ b/input/kafkamdam/kafkamdam.go @@ -4,6 +4,7 @@ import ( "flag" "strings" "sync" + "time" "github.com/Shopify/sarama" "github.com/raintank/worldping-api/pkg/log" @@ -12,19 +13,23 @@ import ( "github.com/bsm/sarama-cluster" "github.com/raintank/met" "github.com/raintank/metrictank/idx" - "github.com/raintank/metrictank/in" + "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/usage" + "gopkg.in/raintank/schema.v1" + schemaMsg "gopkg.in/raintank/schema.v1/msg" ) type KafkaMdam struct { - in.In + input.Input consumer *cluster.Consumer stats met.Backend wg sync.WaitGroup - // read from this channel to block until consumer is cleanly stopped - StopChan chan int +} + +func (k *KafkaMdam) Name() string { + return "kafkaMdam" } var Enabled bool @@ -72,14 +77,13 @@ func New(stats met.Backend) *KafkaMdam { k := KafkaMdam{ consumer: consumer, stats: stats, - StopChan: make(chan int), } return &k } func (k *KafkaMdam) Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, usg *usage.Usage) { - k.In = in.New(metrics, metricIndex, usg, "kafka-mdam", k.stats) + k.Input = input.New(metrics, metricIndex, usg, "kafka-mdam", k.stats) go k.notifications() go k.consume() } @@ -87,15 +91,36 @@ func (k *KafkaMdam) Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, us func (k *KafkaMdam) consume() { k.wg.Add(1) messageChan := k.consumer.Messages() + tmp := schemaMsg.MetricData{Metrics: make([]*schema.MetricData, 1)} for msg := range messageChan { log.Debug("kafka-mdam received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) - k.In.HandleArray(msg.Value) + k.handleMsg(msg.Value, &tmp) k.consumer.MarkOffset(msg, "") } log.Info("kafka-mdam consumer ended.") k.wg.Done() } +func (k *KafkaMdam) handleMsg(data []byte, tmp *schemaMsg.MetricData) { + err := tmp.InitFromMsg(data) + if err != nil { + k.Input.MetricsDecodeErr.Inc(1) + log.Error(3, "kafka-mdam skipping message. %s", err) + return + } + k.Input.MsgsAge.Value(time.Now().Sub(tmp.Produced).Nanoseconds() / 1000) + err = tmp.DecodeMetricData() // reads metrics from in.tmp.Msg and unsets it + if err != nil { + k.Input.MetricsDecodeErr.Inc(1) + log.Error(3, "kafka-mdam skipping message. %s", err) + return + } + k.Input.MetricsPerMessage.Value(int64(len(tmp.Metrics))) + for _, metric := range tmp.Metrics { + k.Input.Process(metric) + } +} + func (k *KafkaMdam) notifications() { k.wg.Add(1) for msg := range k.consumer.Notifications() { @@ -124,14 +149,9 @@ func (k *KafkaMdam) notifications() { } // Stop will initiate a graceful stop of the Consumer (permanent) -// -// NOTE: receive on StopChan to block until this process completes +// and block until it is stopped. func (k *KafkaMdam) Stop() { // closes notifications and messages channels, amongst others k.consumer.Close() - - go func() { - k.wg.Wait() - close(k.StopChan) - }() + k.wg.Wait() } diff --git a/in/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go similarity index 92% rename from in/kafkamdm/kafkamdm.go rename to input/kafkamdm/kafkamdm.go index 16f1eea85a..5eebdc8f9f 100644 --- a/in/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -12,26 +12,29 @@ import ( "github.com/raintank/met" "github.com/raintank/metrictank/idx" - "github.com/raintank/metrictank/in" + "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/kafka" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/usage" + "gopkg.in/raintank/schema.v1" ) type KafkaMdm struct { - in.In + input.Input consumer sarama.Consumer client sarama.Client stats met.Backend wg sync.WaitGroup - // read from this channel to block until consumer is cleanly stopped - StopChan chan int // signal to PartitionConsumers to shutdown stopConsuming chan struct{} } +func (k *KafkaMdm) Name() string { + return "kafkaMdm" +} + var LogLevel int var Enabled bool var brokerStr string @@ -131,7 +134,6 @@ func New(stats met.Backend) *KafkaMdm { consumer: consumer, client: client, stats: stats, - StopChan: make(chan int), stopConsuming: make(chan struct{}), } @@ -139,7 +141,7 @@ func New(stats met.Backend) *KafkaMdm { } func (k *KafkaMdm) Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, usg *usage.Usage) { - k.In = in.New(metrics, metricIndex, usg, "kafka-mdm", k.stats) + k.Input = input.New(metrics, metricIndex, usg, "kafka-mdm", k.stats) for _, topic := range topics { // get partitions. partitions, err := k.consumer.Partitions(topic) @@ -185,7 +187,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, partitionOffs if LogLevel < 2 { log.Debug("kafka-mdm received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) } - k.In.Handle(msg.Value) + k.handleMsg(msg.Value) currentOffset = msg.Offset case <-ticker.C: if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil { @@ -202,15 +204,23 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, partitionOffs } } +func (k *KafkaMdm) handleMsg(data []byte) { + md := schema.MetricData{} + _, err := md.UnmarshalMsg(data) + if err != nil { + k.Input.MetricsDecodeErr.Inc(1) + log.Error(3, "kafka-mdm decode error, skipping message. %s", err) + return + } + k.Input.MetricsPerMessage.Value(int64(1)) + k.Input.Process(&md) +} + // Stop will initiate a graceful stop of the Consumer (permanent) -// -// NOTE: receive on StopChan to block until this process completes +// and block until it stopped. func (k *KafkaMdm) Stop() { // closes notifications and messages channels, amongst others close(k.stopConsuming) - go func() { - k.wg.Wait() - offsetMgr.Close() - close(k.StopChan) - }() + k.wg.Wait() + offsetMgr.Close() } diff --git a/input/kafkamdm/kafkamdm_test.go b/input/kafkamdm/kafkamdm_test.go new file mode 100644 index 0000000000..a41652552e --- /dev/null +++ b/input/kafkamdm/kafkamdm_test.go @@ -0,0 +1,95 @@ +package kafkamdm + +import ( + "fmt" + "testing" + + "github.com/benbjohnson/clock" + "github.com/raintank/met/helper" + "github.com/raintank/metrictank/idx/memory" + "github.com/raintank/metrictank/input" + "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/usage" + + "gopkg.in/raintank/schema.v1" +) + +func Test_HandleMessage(t *testing.T) { + stats, _ := helper.New(false, "", "standard", "metrictank", "") + mdata.CluStatus = mdata.NewClusterStatus("test", false) + mdata.InitMetrics(stats) + store := mdata.NewDevnullStore() + aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + metricIndex := memory.New() + metricIndex.Init(stats) + usage := usage.New(300, aggmetrics, metricIndex, clock.New()) + k := KafkaMdm{ + Input: input.New(aggmetrics, metricIndex, usage, "test", stats), + } + + allMetrics := make(map[string]int) + for i := 0; i < 5; i++ { + metrics := test_handleMessage(i, &k, t) + for mId, id := range metrics { + allMetrics[mId] = id + } + } + defs := metricIndex.List(-1) + if len(defs) != 13 { + t.Fatalf("query for org -1 should result in 13 distinct metrics. not %d", len(defs)) + } + + for _, d := range defs { + id := allMetrics[d.Id] + if d.Name != fmt.Sprintf("some.id.%d", id) { + t.Fatalf("incorrect name for %s : %s", d.Id, d.Name) + } + if d.OrgId != id { + t.Fatalf("incorrect OrgId for %s : %d", d.Id, d.OrgId) + } + if d.Tags[0] != fmt.Sprintf("%d", id) { + t.Fatalf("incorrect tags for %s : %s", d.Id, d.Tags) + } + } + + defs = metricIndex.List(2) + if len(defs) != 1 { + t.Fatalf("len of defs should be exactly 1. got defs with len %d: %v", len(defs), defs) + } + d := defs[0] + if d.OrgId != 2 { + t.Fatalf("incorrect metricdef returned: %v", d) + } +} + +func test_handleMessage(worker int, k *KafkaMdm, t *testing.T) map[string]int { + var metric *schema.MetricData + metrics := make(map[string]int) + for m := 0; m < 4; m++ { + id := (worker + 1) * (m + 1) + t.Logf("worker %d metric %d -> adding metric with id and orgid %d", worker, m, id) + + metric = &schema.MetricData{ + Id: "", + OrgId: id, + Name: fmt.Sprintf("some.id.%d", id), + Metric: fmt.Sprintf("some.id.%d", id), + Interval: 60, + Value: 1234.567, + Unit: "ms", + Time: int64(id), + Mtype: "gauge", + Tags: []string{fmt.Sprintf("%d", id)}, + } + metric.SetId() + metrics[metric.Id] = id + var data []byte + var err error + data, err = metric.MarshalMsg(data[:]) + if err != nil { + t.Fatal(err.Error()) + } + k.handleMsg(data) + } + return metrics +} diff --git a/in/plugin.go b/input/plugin.go similarity index 74% rename from in/plugin.go rename to input/plugin.go index 3e047032a9..9f65d9084b 100644 --- a/in/plugin.go +++ b/input/plugin.go @@ -1,4 +1,4 @@ -package in +package input import ( "github.com/raintank/metrictank/idx" @@ -7,6 +7,7 @@ import ( ) type Plugin interface { + Name() string Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, usg *usage.Usage) - Stop() + Stop() // Should block until shutdown is complete. } diff --git a/mdata/chunk/chunk.go b/mdata/chunk/chunk.go index 6aec6ed377..6b6d71349b 100644 --- a/mdata/chunk/chunk.go +++ b/mdata/chunk/chunk.go @@ -16,7 +16,7 @@ func init() { // Chunk is a chunk of data. not concurrency safe. type Chunk struct { - *tsz.Series + tsz.Series LastTs uint32 // last TS seen, not computed or anything NumPoints uint32 Saved bool @@ -27,7 +27,7 @@ type Chunk struct { func New(t0 uint32) *Chunk { // we must set LastWrite here as well to make sure a new Chunk doesn't get immediately // garbage collected right after creating it, before we can push to it - return &Chunk{tsz.New(t0), 0, 0, false, false, uint32(time.Now().Unix())} + return &Chunk{*tsz.New(t0), 0, 0, false, false, uint32(time.Now().Unix())} } func (c *Chunk) String() string { diff --git a/mdata/cluster.go b/mdata/cluster.go index 2e93835aa1..bf1ccfbc4a 100644 --- a/mdata/cluster.go +++ b/mdata/cluster.go @@ -7,26 +7,13 @@ import ( "sync" "time" - "github.com/raintank/met" "github.com/raintank/worldping-api/pkg/log" ) var ( - CluStatus *ClusterStatus - clusterHandlers []ClusterHandler - persistMessageBatch *PersistMessageBatch - - messagesPublished met.Count - messagesSize met.Meter + CluStatus *ClusterStatus ) -type ClusterHandler interface { - Send(SavedChunk) -} - -//PersistMessage format version -const PersistMessageBatchV1 = 1 - // ClusterStatus has Exported fields but don't touch them directly // it's only for json marshaling. use the accessor methods. type ClusterStatus struct { @@ -103,77 +90,3 @@ func (c *ClusterStatus) getClusterStatus(w http.ResponseWriter, req *http.Reques } w.Write(resp) } - -type PersistMessage struct { - Instance string `json:"instance"` - Key string `json:"key"` - T0 uint32 `json:"t0"` -} - -type PersistMessageBatch struct { - Instance string `json:"instance"` - SavedChunks []SavedChunk `json:"saved_chunks"` -} - -type SavedChunk struct { - Key string `json:"key"` - T0 uint32 `json:"t0"` -} - -func SendPersistMessage(key string, t0 uint32) { - sc := SavedChunk{Key: key, T0: t0} - for _, h := range clusterHandlers { - h.Send(sc) - } -} - -func InitCluster(stats met.Backend, handlers ...ClusterHandler) { - messagesPublished = stats.NewCount("cluster.messages-published") - messagesSize = stats.NewMeter("cluster.message_size", 0) - clusterHandlers = handlers -} - -type Cl struct { - instance string - metrics Metrics -} - -func (cl Cl) Handle(data []byte) { - version := uint8(data[0]) - if version == uint8(PersistMessageBatchV1) { - // new batch format. - batch := PersistMessageBatch{} - err := json.Unmarshal(data[1:], &batch) - if err != nil { - log.Error(3, "failed to unmarsh batch message. skipping.", err) - return - } - if batch.Instance == cl.instance { - log.Debug("CLU skipping batch message we generated.") - return - } - for _, c := range batch.SavedChunks { - if agg, ok := cl.metrics.Get(c.Key); ok { - agg.(*AggMetric).SyncChunkSaveState(c.T0) - } - } - } else { - // assume the old format. - ms := PersistMessage{} - err := json.Unmarshal(data, &ms) - if err != nil { - log.Error(3, "skipping message. %s", err) - return - } - if ms.Instance == cl.instance { - log.Debug("CLU skipping message we generated. %s - %s:%d", ms.Instance, ms.Key, ms.T0) - return - } - - // get metric - if agg, ok := cl.metrics.Get(ms.Key); ok { - agg.(*AggMetric).SyncChunkSaveState(ms.T0) - } - } - return -} diff --git a/mdata/notifier.go b/mdata/notifier.go new file mode 100644 index 0000000000..ebd3308e01 --- /dev/null +++ b/mdata/notifier.go @@ -0,0 +1,92 @@ +package mdata + +import ( + "encoding/json" + + "github.com/raintank/met" + "github.com/raintank/worldping-api/pkg/log" +) + +var ( + notifierHandlers []NotifierHandler + persistMessageBatch *PersistMessageBatch +) + +type NotifierHandler interface { + Send(SavedChunk) +} + +//PersistMessage format version +const PersistMessageBatchV1 = 1 + +type PersistMessage struct { + Instance string `json:"instance"` + Key string `json:"key"` + T0 uint32 `json:"t0"` +} + +type PersistMessageBatch struct { + Instance string `json:"instance"` + SavedChunks []SavedChunk `json:"saved_chunks"` +} + +type SavedChunk struct { + Key string `json:"key"` + T0 uint32 `json:"t0"` +} + +func SendPersistMessage(key string, t0 uint32) { + sc := SavedChunk{Key: key, T0: t0} + for _, h := range notifierHandlers { + h.Send(sc) + } +} + +func InitPersistNotifier(stats met.Backend, handlers ...NotifierHandler) { + notifierHandlers = handlers +} + +type Notifier struct { + Instance string + Metrics Metrics +} + +func (cl Notifier) Handle(data []byte) { + version := uint8(data[0]) + if version == uint8(PersistMessageBatchV1) { + // new batch format. + batch := PersistMessageBatch{} + err := json.Unmarshal(data[1:], &batch) + if err != nil { + log.Error(3, "failed to unmarsh batch message. skipping.", err) + return + } + if batch.Instance == cl.Instance { + log.Debug("CLU skipping batch message we generated.") + return + } + for _, c := range batch.SavedChunks { + if agg, ok := cl.Metrics.Get(c.Key); ok { + agg.(*AggMetric).SyncChunkSaveState(c.T0) + } + } + } else { + // assume the old format. + ms := PersistMessage{} + err := json.Unmarshal(data, &ms) + if err != nil { + log.Error(3, "skipping message. %s", err) + return + } + if ms.Instance == cl.Instance { + log.Debug("CLU skipping message we generated. %s - %s:%d", ms.Instance, ms.Key, ms.T0) + return + } + + // get metric + if agg, ok := cl.Metrics.Get(ms.Key); ok { + agg.(*AggMetric).SyncChunkSaveState(ms.T0) + } + } + return +} diff --git a/mdata/clkafka/cfg.go b/mdata/notifierKafka/cfg.go similarity index 50% rename from mdata/clkafka/cfg.go rename to mdata/notifierKafka/cfg.go index 1389721acc..b000572a64 100644 --- a/mdata/clkafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -1,4 +1,4 @@ -package clkafka +package notifierKafka import ( "flag" @@ -7,27 +7,31 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/raintank/met" "github.com/rakyll/globalconf" ) var Enabled bool var brokerStr string -var Brokers []string -var Topic string -var OffsetStr string -var DataDir string -var Config *sarama.Config -var OffsetDuration time.Duration -var OffsetCommitInterval time.Duration +var brokers []string +var topic string +var offsetStr string +var dataDir string +var config *sarama.Config +var offsetDuration time.Duration +var offsetCommitInterval time.Duration + +var messagesPublished met.Count +var messagesSize met.Meter func ConfigSetup() { fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError) fs.BoolVar(&Enabled, "enabled", false, "") fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)") - fs.StringVar(&Topic, "topic", "metricpersist", "kafka topic") - fs.StringVar(&OffsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") - fs.StringVar(&DataDir, "data-dir", "", "Directory to store partition offsets index") - fs.DurationVar(&OffsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") + fs.StringVar(&topic, "topic", "metricpersist", "kafka topic") + fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") + fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") + fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") globalconf.Register("kafka-cluster", fs) } @@ -36,25 +40,25 @@ func ConfigProcess(instance string) { return } var err error - switch OffsetStr { + switch offsetStr { case "last": case "oldest": case "newest": default: - OffsetDuration, err = time.ParseDuration(OffsetStr) + offsetDuration, err = time.ParseDuration(offsetStr) if err != nil { log.Fatal(4, "kafka-cluster: invalid offest format. %s", err) } } - Brokers = strings.Split(brokerStr, ",") + brokers = strings.Split(brokerStr, ",") - Config = sarama.NewConfig() - Config.ClientID = instance + "-cluster" - Config.Version = sarama.V0_10_0_0 - Config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message - Config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message - Config.Producer.Compression = sarama.CompressionNone - err = Config.Validate() + config = sarama.NewConfig() + config.ClientID = instance + "-cluster" + config.Version = sarama.V0_10_0_0 + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + config.Producer.Compression = sarama.CompressionNone + err = config.Validate() if err != nil { log.Fatal(2, "kafka-cluster invalid consumer config: %s", err) } diff --git a/mdata/clkafka.go b/mdata/notifierKafka/notifierKafka.go similarity index 53% rename from mdata/clkafka.go rename to mdata/notifierKafka/notifierKafka.go index 6d5c728e31..9171c59f88 100644 --- a/mdata/clkafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -1,4 +1,4 @@ -package mdata +package notifierKafka import ( "bytes" @@ -10,13 +10,13 @@ import ( "github.com/Shopify/sarama" "github.com/raintank/met" "github.com/raintank/metrictank/kafka" - cfg "github.com/raintank/metrictank/mdata/clkafka" + "github.com/raintank/metrictank/mdata" "github.com/raintank/worldping-api/pkg/log" ) -type ClKafka struct { - in chan SavedChunk - buf []SavedChunk +type NotifierKafka struct { + in chan mdata.SavedChunk + buf []mdata.SavedChunk wg sync.WaitGroup instance string consumer sarama.Consumer @@ -26,40 +26,43 @@ type ClKafka struct { StopChan chan int // signal to PartitionConsumers to shutdown stopConsuming chan struct{} - Cl + mdata.Notifier } -func NewKafka(instance string, metrics Metrics, stats met.Backend) *ClKafka { - client, err := sarama.NewClient(cfg.Brokers, cfg.Config) +func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend) *NotifierKafka { + messagesPublished = stats.NewCount("notifier.kafka.messages-published") + messagesSize = stats.NewMeter("notifier.kafka.message_size", 0) + + client, err := sarama.NewClient(brokers, config) if err != nil { - log.Fatal(2, "kafka-cluster failed to start client: %s", err) + log.Fatal(2, "kafka-notifier failed to start client: %s", err) } consumer, err := sarama.NewConsumerFromClient(client) if err != nil { - log.Fatal(2, "kafka-cluster failed to initialize consumer: %s", err) + log.Fatal(2, "kafka-notifier failed to initialize consumer: %s", err) } - log.Info("kafka-cluster consumer initialized without error") + log.Info("kafka-notifier consumer initialized without error") producer, err := sarama.NewSyncProducerFromClient(client) if err != nil { - log.Fatal(2, "kafka-cluster failed to initialize producer: %s", err) + log.Fatal(2, "kafka-notifier failed to initialize producer: %s", err) } - offsetMgr, err := kafka.NewOffsetMgr(cfg.DataDir) + offsetMgr, err := kafka.NewOffsetMgr(dataDir) if err != nil { - log.Fatal(2, "kafka-cluster couldnt create offsetMgr. %s", err) + log.Fatal(2, "kafka-notifier couldnt create offsetMgr. %s", err) } - c := ClKafka{ - in: make(chan SavedChunk), + c := NotifierKafka{ + in: make(chan mdata.SavedChunk), offsetMgr: offsetMgr, client: client, consumer: consumer, producer: producer, instance: instance, - Cl: Cl{ - instance: instance, - metrics: metrics, + Notifier: mdata.Notifier{ + Instance: instance, + Metrics: metrics, }, StopChan: make(chan int), stopConsuming: make(chan struct{}), @@ -70,16 +73,15 @@ func NewKafka(instance string, metrics Metrics, stats met.Backend) *ClKafka { return &c } -func (c *ClKafka) start() { +func (c *NotifierKafka) start() { // get partitions. - topic := cfg.Topic partitions, err := c.consumer.Partitions(topic) if err != nil { - log.Fatal(4, "kafka-cluster: Faild to get partitions for topic %s. %s", topic, err) + log.Fatal(4, "kafka-notifier: Faild to get partitions for topic %s. %s", topic, err) } for _, partition := range partitions { var offset int64 - switch cfg.OffsetStr { + switch offsetStr { case "oldest": offset = -2 case "newest": @@ -87,45 +89,45 @@ func (c *ClKafka) start() { case "last": offset, err = c.offsetMgr.Last(topic, partition) default: - offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*cfg.OffsetDuration).UnixNano()/int64(time.Millisecond)) + offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond)) } if err != nil { - log.Fatal(4, "kafka-mdm: failed to get %q offset for %s:%d. %q", cfg.OffsetStr, topic, partition, err) + log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", offsetStr, topic, partition, err) } go c.consumePartition(topic, partition, offset) } } -func (c *ClKafka) consumePartition(topic string, partition int32, partitionOffset int64) { +func (c *NotifierKafka) consumePartition(topic string, partition int32, partitionOffset int64) { c.wg.Add(1) defer c.wg.Done() pc, err := c.consumer.ConsumePartition(topic, partition, partitionOffset) if err != nil { - log.Fatal(4, "kafka-cluster: failed to start partitionConsumer for %s:%d. %s", topic, partition, err) + log.Fatal(4, "kafka-notifier: failed to start partitionConsumer for %s:%d. %s", topic, partition, err) } - log.Info("kafka-cluster: consuming from %s:%d from offset %d", topic, partition, partitionOffset) + log.Info("kafka-notifier: consuming from %s:%d from offset %d", topic, partition, partitionOffset) currentOffset := partitionOffset messages := pc.Messages() - ticker := time.NewTicker(cfg.OffsetCommitInterval) + ticker := time.NewTicker(offsetCommitInterval) for { select { case msg := <-messages: - if LogLevel < 2 { - log.Debug("kafka-cluster received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) + if mdata.LogLevel < 2 { + log.Debug("kafka-notifier received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key) } c.Handle(msg.Value) currentOffset = msg.Offset case <-ticker.C: if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { - log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) + log.Error(3, "kafka-notifier failed to commit offset for %s:%d, %s", topic, partition, err) } case <-c.stopConsuming: pc.Close() if err := c.offsetMgr.Commit(topic, partition, currentOffset); err != nil { - log.Error(3, "kafka-cluster failed to commit offset for %s:%d, %s", topic, partition, err) + log.Error(3, "kafka-notifier failed to commit offset for %s:%d, %s", topic, partition, err) } - log.Info("kafka-cluster consumer for %s:%d ended.", topic, partition) + log.Info("kafka-notifier consumer for %s:%d ended.", topic, partition) return } } @@ -134,7 +136,7 @@ func (c *ClKafka) consumePartition(topic string, partition int32, partitionOffse // Stop will initiate a graceful stop of the Consumer (permanent) // // NOTE: receive on StopChan to block until this process completes -func (c *ClKafka) Stop() { +func (c *NotifierKafka) Stop() { // closes notifications and messages channels, amongst others close(c.stopConsuming) c.producer.Close() @@ -146,11 +148,11 @@ func (c *ClKafka) Stop() { }() } -func (c *ClKafka) Send(sc SavedChunk) { +func (c *NotifierKafka) Send(sc mdata.SavedChunk) { c.in <- sc } -func (c *ClKafka) produce() { +func (c *NotifierKafka) produce() { ticker := time.NewTicker(time.Second) max := 5000 for { @@ -167,27 +169,27 @@ func (c *ClKafka) produce() { } // flush makes sure the batch gets sent, asynchronously. -func (c *ClKafka) flush() { +func (c *NotifierKafka) flush() { if len(c.buf) == 0 { return } - msg := PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} + msg := mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} c.buf = nil go func() { - log.Debug("CLU kafka-cluster sending %d batch metricPersist messages", len(msg.SavedChunks)) + log.Debug("kafka-notifier sending %d batch metricPersist messages", len(msg.SavedChunks)) data, err := json.Marshal(&msg) if err != nil { - log.Fatal(4, "CLU kafka-cluster failed to marshal persistMessage to json.") + log.Fatal(4, "kafka-notifier failed to marshal persistMessage to json.") } buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1)) + binary.Write(buf, binary.LittleEndian, uint8(mdata.PersistMessageBatchV1)) buf.Write(data) messagesSize.Value(int64(buf.Len())) payload := &sarama.ProducerMessage{ - Topic: cfg.Topic, + Topic: topic, Value: sarama.ByteEncoder(buf.Bytes()), } @@ -196,7 +198,7 @@ func (c *ClKafka) flush() { // note: currently we don't do partitioning yet for cluster msgs, so no key needed _, _, err := c.producer.SendMessage(payload) if err != nil { - log.Warn("CLU kafka-cluster publisher %s", err) + log.Warn("kafka-notifier publisher %s", err) } else { sent = true } diff --git a/mdata/clnsq/cfg.go b/mdata/notifierNsq/cfg.go similarity index 53% rename from mdata/clnsq/cfg.go rename to mdata/notifierNsq/cfg.go index 3fc40bc68b..f52ea27e62 100644 --- a/mdata/clnsq/cfg.go +++ b/mdata/notifierNsq/cfg.go @@ -4,7 +4,7 @@ // and refer to the config variables (clNSQ.Enabled etc), so we currently have this hybrid approach // with config handling in this package, but the rest of clNSQ is in the mdata package -package clnsq +package notifierNsq import ( "flag" @@ -12,35 +12,38 @@ import ( "strings" "github.com/nsqio/go-nsq" + "github.com/raintank/met" "github.com/raintank/misc/app" "github.com/rakyll/globalconf" ) var ( - Enabled bool - NsqdTCPAddrs string - LookupdHTTPAddrs string - NsqdAdds []string - LookupdAdds []string - Topic string - Channel string - MaxInFlight int - ProducerOpts string - ConsumerOpts string - PCfg *nsq.Config - CCfg *nsq.Config + Enabled bool + nsqdTCPAddrs string + lookupdHTTPAddrs string + nsqdAdds []string + lookupdAdds []string + topic string + channel string + maxInFlight int + producerOpts string + consumerOpts string + pCfg *nsq.Config + cCfg *nsq.Config + messagesPublished met.Count + messagesSize met.Meter ) func ConfigSetup() { fs := flag.NewFlagSet("nsq-cluster", flag.ExitOnError) fs.BoolVar(&Enabled, "enabled", false, "") - fs.StringVar(&NsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)") - fs.StringVar(&LookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)") - fs.StringVar(&Topic, "topic", "metricpersist", "NSQ topic for persist messages") - fs.StringVar(&Channel, "channel", "tank", "NSQ channel for persist messages") - fs.StringVar(&ProducerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)") - fs.StringVar(&ConsumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)") - fs.IntVar(&MaxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer") + fs.StringVar(&nsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)") + fs.StringVar(&lookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)") + fs.StringVar(&topic, "topic", "metricpersist", "NSQ topic for persist messages") + fs.StringVar(&channel, "channel", "tank", "NSQ channel for persist messages") + fs.StringVar(&producerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)") + fs.StringVar(&consumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)") + fs.IntVar(&maxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer") globalconf.Register("nsq-cluster", fs) } @@ -48,34 +51,34 @@ func ConfigProcess() { if !Enabled { return } - if Topic == "" { + if topic == "" { log.Fatal(4, "topic for nsq-cluster cannot be empty") } - NsqdAdds = strings.Split(NsqdTCPAddrs, ",") - if len(NsqdAdds) == 1 && NsqdAdds[0] == "" { - NsqdAdds = []string{} + nsqdAdds = strings.Split(nsqdTCPAddrs, ",") + if len(nsqdAdds) == 1 && nsqdAdds[0] == "" { + nsqdAdds = []string{} } - LookupdAdds = strings.Split(LookupdHTTPAddrs, ",") - if len(LookupdAdds) == 1 && LookupdAdds[0] == "" { - LookupdAdds = []string{} + lookupdAdds = strings.Split(lookupdHTTPAddrs, ",") + if len(lookupdAdds) == 1 && lookupdAdds[0] == "" { + lookupdAdds = []string{} } // producers - PCfg = nsq.NewConfig() - PCfg.UserAgent = "metrictank-cluster" - err := app.ParseOpts(PCfg, ProducerOpts) + pCfg = nsq.NewConfig() + pCfg.UserAgent = "metrictank-cluster" + err := app.ParseOpts(pCfg, producerOpts) if err != nil { log.Fatal(4, "nsq-cluster: failed to parse nsq producer options. %s", err) } // consumer - CCfg = nsq.NewConfig() - CCfg.UserAgent = "metrictank-cluster" - err = app.ParseOpts(CCfg, ConsumerOpts) + cCfg = nsq.NewConfig() + cCfg.UserAgent = "metrictank-cluster" + err = app.ParseOpts(cCfg, consumerOpts) if err != nil { log.Fatal(4, "nsq-cluster: failed to parse nsq consumer options. %s", err) } - CCfg.MaxInFlight = MaxInFlight + cCfg.MaxInFlight = maxInFlight } diff --git a/mdata/clnsq.go b/mdata/notifierNsq/notifierNsq.go similarity index 66% rename from mdata/clnsq.go rename to mdata/notifierNsq/notifierNsq.go index 5ee4d3719c..d1a04ec843 100644 --- a/mdata/clnsq.go +++ b/mdata/notifierNsq/notifierNsq.go @@ -1,4 +1,4 @@ -package mdata +package notifierNsq import ( "bytes" @@ -9,7 +9,7 @@ import ( "github.com/bitly/go-hostpool" "github.com/nsqio/go-nsq" "github.com/raintank/met" - cfg "github.com/raintank/metrictank/mdata/clnsq" + "github.com/raintank/metrictank/mdata" "github.com/raintank/misc/instrumented_nsq" "github.com/raintank/worldping-api/pkg/log" ) @@ -19,20 +19,22 @@ var ( producers map[string]*nsq.Producer ) -type ClNSQ struct { - in chan SavedChunk - buf []SavedChunk +type NotifierNSQ struct { + in chan mdata.SavedChunk + buf []mdata.SavedChunk instance string - Cl + mdata.Notifier } -func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ { +func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *NotifierNSQ { + messagesPublished = stats.NewCount("notifier.nsq.messages-published") + messagesSize = stats.NewMeter("notifier.nsq.message_size", 0) // producers - hostPool = hostpool.NewEpsilonGreedy(cfg.NsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) + hostPool = hostpool.NewEpsilonGreedy(nsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{}) producers = make(map[string]*nsq.Producer) - for _, addr := range cfg.NsqdAdds { - producer, err := nsq.NewProducer(addr, cfg.PCfg) + for _, addr := range nsqdAdds { + producer, err := nsq.NewProducer(addr, pCfg) if err != nil { log.Fatal(4, "nsq-cluster failed creating producer %s", err.Error()) } @@ -40,27 +42,27 @@ func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ { } // consumers - consumer, err := insq.NewConsumer(cfg.Topic, cfg.Channel, cfg.CCfg, "metric_persist.%s", stats) + consumer, err := insq.NewConsumer(topic, channel, cCfg, "metric_persist.%s", stats) if err != nil { log.Fatal(4, "nsq-cluster failed to create NSQ consumer. %s", err) } - c := &ClNSQ{ - in: make(chan SavedChunk), + c := &NotifierNSQ{ + in: make(chan mdata.SavedChunk), instance: instance, - Cl: Cl{ - instance: instance, - metrics: metrics, + Notifier: mdata.Notifier{ + Instance: instance, + Metrics: metrics, }, } consumer.AddConcurrentHandlers(c, 2) - err = consumer.ConnectToNSQDs(cfg.NsqdAdds) + err = consumer.ConnectToNSQDs(nsqdAdds) if err != nil { log.Fatal(4, "nsq-cluster failed to connect to NSQDs. %s", err) } log.Info("nsq-cluster persist consumer connected to nsqd") - err = consumer.ConnectToNSQLookupds(cfg.LookupdAdds) + err = consumer.ConnectToNSQLookupds(lookupdAdds) if err != nil { log.Fatal(4, "nsq-cluster failed to connect to NSQLookupds. %s", err) } @@ -68,16 +70,16 @@ func NewNSQ(instance string, metrics Metrics, stats met.Backend) *ClNSQ { return c } -func (c *ClNSQ) HandleMessage(m *nsq.Message) error { +func (c *NotifierNSQ) HandleMessage(m *nsq.Message) error { c.Handle(m.Body) return nil } -func (c *ClNSQ) Send(sc SavedChunk) { +func (c *NotifierNSQ) Send(sc mdata.SavedChunk) { c.in <- sc } -func (c *ClNSQ) run() { +func (c *NotifierNSQ) run() { ticker := time.NewTicker(time.Second) max := 5000 for { @@ -94,12 +96,12 @@ func (c *ClNSQ) run() { } // flush makes sure the batch gets sent, asynchronously. -func (c *ClNSQ) flush() { +func (c *NotifierNSQ) flush() { if len(c.buf) == 0 { return } - msg := PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} + msg := mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf} c.buf = nil go func() { @@ -110,7 +112,7 @@ func (c *ClNSQ) flush() { log.Fatal(4, "CLU nsq-cluster failed to marshal persistMessage to json.") } buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, uint8(PersistMessageBatchV1)) + binary.Write(buf, binary.LittleEndian, uint8(mdata.PersistMessageBatchV1)) buf.Write(data) messagesSize.Value(int64(buf.Len())) @@ -121,7 +123,7 @@ func (c *ClNSQ) flush() { // will result in this loop repeating forever until we successfully publish our msg. hostPoolResponse := hostPool.Get() prod := producers[hostPoolResponse.Host()] - err = prod.Publish(cfg.Topic, buf.Bytes()) + err = prod.Publish(topic, buf.Bytes()) // Hosts that are marked as dead will be retried after 30seconds. If we published // successfully, then sending a nil error will mark the host as alive again. hostPoolResponse.Mark(err) diff --git a/metrictank.go b/metrictank.go index 6b8d280181..15ec97a708 100644 --- a/metrictank.go +++ b/metrictank.go @@ -9,6 +9,7 @@ import ( "runtime" "strconv" "strings" + "sync" "syscall" "time" @@ -25,25 +26,25 @@ import ( "github.com/raintank/metrictank/idx/cassandra" "github.com/raintank/metrictank/idx/elasticsearch" "github.com/raintank/metrictank/idx/memory" - "github.com/raintank/metrictank/in" - inCarbon "github.com/raintank/metrictank/in/carbon" - inKafkaMdam "github.com/raintank/metrictank/in/kafkamdam" - inKafkaMdm "github.com/raintank/metrictank/in/kafkamdm" + "github.com/raintank/metrictank/input" + inCarbon "github.com/raintank/metrictank/input/carbon" + inKafkaMdam "github.com/raintank/metrictank/input/kafkamdam" + inKafkaMdm "github.com/raintank/metrictank/input/kafkamdm" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/mdata/chunk" - clKafka "github.com/raintank/metrictank/mdata/clkafka" - clNSQ "github.com/raintank/metrictank/mdata/clnsq" + "github.com/raintank/metrictank/mdata/notifierKafka" + "github.com/raintank/metrictank/mdata/notifierNsq" "github.com/raintank/metrictank/usage" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" ) var ( - inCarbonInst *inCarbon.Carbon - inKafkaMdmInst *inKafkaMdm.KafkaMdm - inKafkaMdamInst *inKafkaMdam.KafkaMdam - clKafkaInst *mdata.ClKafka - clNSQInst *mdata.ClNSQ + inCarbonInst *inCarbon.Carbon + inKafkaMdmInst *inKafkaMdm.KafkaMdm + inKafkaMdamInst *inKafkaMdam.KafkaMdam + notifierKafkaInst *notifierKafka.NotifierKafka + notifierNsqInst *notifierNsq.NotifierNSQ logLevel int warmupPeriod time.Duration @@ -182,8 +183,8 @@ func main() { inKafkaMdam.ConfigSetup() // load config for cluster handlers - clNSQ.ConfigSetup() - clKafka.ConfigSetup() + notifierNsq.ConfigSetup() + notifierKafka.ConfigSetup() // load config for metricIndexers memory.ConfigSetup() @@ -227,8 +228,8 @@ func main() { inCarbon.ConfigProcess() inKafkaMdm.ConfigProcess(*instance) inKafkaMdam.ConfigProcess(*instance) - clNSQ.ConfigProcess() - clKafka.ConfigProcess(*instance) + notifierNsq.ConfigProcess() + notifierKafka.ConfigProcess(*instance) if !inCarbon.Enabled && !inKafkaMdm.Enabled && !inKafkaMdam.Enabled { log.Fatal(4, "you should enable at least 1 input plugin") @@ -316,18 +317,26 @@ func main() { } store.InitMetrics(stats) + /*********************************** + Initialize our Inputs + ***********************************/ + inputs := make([]input.Plugin, 0) // note. all these New functions must either return a valid instance or call log.Fatal - if inCarbon.Enabled { inCarbonInst = inCarbon.New(stats) + inputs = append(inputs, inCarbonInst) } if inKafkaMdm.Enabled { + sarama.Logger = l.New(os.Stdout, "[Sarama] ", l.LstdFlags) inKafkaMdmInst = inKafkaMdm.New(stats) + inputs = append(inputs, inKafkaMdmInst) } if inKafkaMdam.Enabled { + sarama.Logger = l.New(os.Stdout, "[Sarama] ", l.LstdFlags) inKafkaMdamInst = inKafkaMdam.New(stats) + inputs = append(inputs, inKafkaMdamInst) } accountingPeriod := dur.MustParseUNsec("accounting-period", *accountingPeriodStr) @@ -367,25 +376,19 @@ func main() { usg := usage.New(accountingPeriod, metrics, metricIndex, clock.New()) - handlers := make([]mdata.ClusterHandler, 0) - if clKafka.Enabled { - clKafkaInst = mdata.NewKafka(*instance, metrics, stats) - handlers = append(handlers, clKafkaInst) + handlers := make([]mdata.NotifierHandler, 0) + if notifierKafka.Enabled { + notifierKafkaInst = notifierKafka.NewNotifierKafka(*instance, metrics, stats) + handlers = append(handlers, notifierKafkaInst) } - mdata.InitCluster(stats, handlers...) - - if inCarbon.Enabled { - inCarbonInst.Start(metrics, metricIndex, usg) - } + mdata.InitPersistNotifier(stats, handlers...) - if inKafkaMdm.Enabled { - sarama.Logger = l.New(os.Stdout, "[Sarama] ", l.LstdFlags) - inKafkaMdmInst.Start(metrics, metricIndex, usg) - } - if inKafkaMdam.Enabled { - sarama.Logger = l.New(os.Stdout, "[Sarama] ", l.LstdFlags) - inKafkaMdamInst.Start(metrics, metricIndex, usg) + /*********************************** + Start our inputs + ***********************************/ + for _, plugin := range inputs { + plugin.Start(metrics, metricIndex, usg) } promotionReadyAtChan <- (uint32(time.Now().Unix())/highestChunkSpan + 1) * highestChunkSpan @@ -406,39 +409,36 @@ func main() { log.Info("%s", http.ListenAndServe(*listenAddr, nil)) }() - type waiter struct { - key string - plugin in.Plugin - ch chan int - } - - waiters := make([]waiter, 0) - - if inKafkaMdm.Enabled { - waiters = append(waiters, waiter{ - "kafka-mdm", - inKafkaMdmInst, - inKafkaMdmInst.StopChan, - }) - } - if inKafkaMdam.Enabled { - waiters = append(waiters, waiter{ - "kafka-mdam", - inKafkaMdamInst, - inKafkaMdamInst.StopChan, - }) - } + /*********************************** + Wait for Shutdown + ***********************************/ <-sigChan - for _, w := range waiters { - log.Info("Shutting down %s consumer", w.key) - w.plugin.Stop() + + // shutdown our input plugins. These may take a while as we allow them + // to finish processing any metrics that have already been ingested. + timer := time.NewTimer(time.Second * 10) + var wg sync.WaitGroup + for _, plugin := range inputs { + wg.Add(1) + go func(plugin input.Plugin) { + log.Info("Shutting down %s consumer", plugin.Name()) + plugin.Stop() + log.Info("%s consumer finished shutdown", plugin.Name()) + wg.Done() + }(plugin) } - for _, w := range waiters { - // the order here is arbitrary, they could stop in either order, but it doesn't really matter - log.Info("waiting for %s consumer to finish shutdown", w.key) - <-w.ch - log.Info("%s consumer finished shutdown", w.key) + pluginsStopped := make(chan struct{}) + go func() { + wg.Wait() + close(pluginsStopped) + }() + select { + case <-timer.C: + log.Warn("Plugins taking too long to shutdown, not waiting any longer.") + case <-pluginsStopped: + timer.Stop() } + log.Info("closing store") store.Stop() metricIndex.Stop()