From 9ec27cb32dbbaef7ccccdafde35f8c61e4edabdb Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 3 May 2017 17:39:31 -0400 Subject: [PATCH] add tool to see which metrics are out of order --- cmd/mt-kafka-mdm-sniff-out-of-order/main.go | 131 ++++++++++++++++++++ docs/tools.md | 20 +++ 2 files changed, 151 insertions(+) create mode 100644 cmd/mt-kafka-mdm-sniff-out-of-order/main.go diff --git a/cmd/mt-kafka-mdm-sniff-out-of-order/main.go b/cmd/mt-kafka-mdm-sniff-out-of-order/main.go new file mode 100644 index 0000000000..b5a564a4cd --- /dev/null +++ b/cmd/mt-kafka-mdm-sniff-out-of-order/main.go @@ -0,0 +1,131 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "syscall" + "text/template" + "time" + + inKafkaMdm "github.com/raintank/metrictank/input/kafkamdm" + "github.com/raintank/metrictank/stats" + "github.com/raintank/worldping-api/pkg/log" + "github.com/rakyll/globalconf" + "gopkg.in/raintank/schema.v1" +) + +var ( + confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path") + format = flag.String("format", "{{.First.Seen}} {{.First.Time}} | {{.Seen}} {{.Time}} {{.Part}} {{.OrgId}} {{.Id}} {{.Name}} {{.Metric}} {{.Interval}} {{.Value}} {{.Unit}} {{.Mtype}} {{.Tags}}", "template to render the data with") + prefix = flag.String("prefix", "", "only show metrics that have this prefix") + substr = flag.String("substr", "", "only show metrics that have this substring") +) + +type Data struct { + Part int32 + Seen time.Time + schema.MetricData +} + +type TplData struct { + Data // currently seen + First Data // seen the first time +} + +// find out of order metrics +type inputOOOFinder struct { + template.Template + data map[string]Data // by metric name + lock sync.Mutex +} + +func newInputOOOFinder(format string) *inputOOOFinder { + tpl := template.Must(template.New("format").Parse(format + "\n")) + return &inputOOOFinder{ + *tpl, + make(map[string]Data), + sync.Mutex{}, + } +} + +func (ip *inputOOOFinder) Process(metric *schema.MetricData, partition int32) { + if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) { + return + } + if *substr != "" && !strings.Contains(metric.Metric, *substr) { + return + } + now := Data{ + Part: partition, + Seen: time.Now(), + MetricData: *metric, + } + ip.lock.Lock() + first, ok := ip.data[metric.Name] + if !ok { + ip.data[metric.Name] = now + } else { + if metric.Time > first.Time { + ip.data[metric.Name] = now + } else { + t := TplData{now, first} + err := ip.Execute(os.Stdout, t) + if err != nil { + log.Error(0, "executing template: %s", err) + } + } + } + ip.lock.Unlock() +} + +func main() { + flag.Usage = func() { + fmt.Fprintln(os.Stderr, "mt-kafka-mdm-sniff-out-of-order") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "Inspects what's flowing through kafka (in mdm format) and reports it to you") + fmt.Fprintf(os.Stderr, "\nFlags:\n\n") + flag.PrintDefaults() + } + flag.Parse() + log.NewLogger(0, "console", fmt.Sprintf(`{"level": %d, "formatting":false}`, 2)) + instance := "mt-kafka-mdm-sniff-out-of-order" + strconv.Itoa(rand.Int()) + + // Only try and parse the conf file if it exists + path := "" + if _, err := os.Stat(*confFile); err == nil { + path = *confFile + } + conf, err := globalconf.NewWithOptions(&globalconf.Options{ + Filename: path, + EnvPrefix: "MT_", + }) + if err != nil { + log.Fatal(4, "error with configuration file: %s", err) + os.Exit(1) + } + inKafkaMdm.ConfigSetup() + conf.ParseAll() + + // config may have had it disabled + inKafkaMdm.Enabled = true + // important: we don't want to share the same offset tracker as the mdm input of MT itself + inKafkaMdm.DataDir = "/tmp/" + instance + + inKafkaMdm.ConfigProcess(instance) + + stats.NewDevnull() // make sure metrics don't pile up without getting discarded + + mdm := inKafkaMdm.New() + mdm.Start(newInputOOOFinder(*format)) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + log.Info("stopping") + mdm.Stop() +} diff --git a/docs/tools.md b/docs/tools.md index e06684012d..7df0ae02a3 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -210,6 +210,26 @@ Flags: ``` +## mt-kafka-mdm-sniff-out-of-order + +``` +mt-kafka-mdm-sniff-out-of-order + +Inspects what's flowing through kafka (in mdm format) and reports it to you + +Flags: + + -config string + configuration file path (default "/etc/metrictank/metrictank.ini") + -format string + template to render the data with (default "{{.First.Seen}} {{.First.Time}} | {{.Seen}} {{.Time}} {{.Part}} {{.OrgId}} {{.Id}} {{.Name}} {{.Metric}} {{.Interval}} {{.Value}} {{.Unit}} {{.Mtype}} {{.Tags}}") + -prefix string + only show metrics that have this prefix + -substr string + only show metrics that have this substring +``` + + ## mt-replicator ```