From 68a9aac3702339465183bfb2e79ed933d7712c2e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 6 Feb 2020 03:57:37 -0500 Subject: [PATCH 01/60] Add mt-parrot command to generate deterministic artificial metrics for each metrictank partition. --- .gitignore | 1 + Gopkg.lock | 1 + cmd/mt-parrot/generate.go | 62 ++++++++++++++++++++++++++ cmd/mt-parrot/main.go | 91 +++++++++++++++++++++++++++++++++++++++ cmd/mt-parrot/monitor.go | 81 ++++++++++++++++++++++++++++++++++ docs/tools.md | 21 +++++++++ 6 files changed, 257 insertions(+) create mode 100644 cmd/mt-parrot/generate.go create mode 100644 cmd/mt-parrot/main.go create mode 100644 cmd/mt-parrot/monitor.go diff --git a/.gitignore b/.gitignore index 83ae8b873b..b4cae775f3 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ /cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff /cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff /cmd/mt-keygen/mt-keygen +/cmd/mt-parrot/mt-parrot /cmd/mt-schemas-explain/mt-schemas-explain /cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl /cmd/mt-store-cat/mt-store-cat diff --git a/Gopkg.lock b/Gopkg.lock index a20f6320ac..3154af9dc3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1216,6 +1216,7 @@ "github.com/raintank/gziper", "github.com/raintank/met", "github.com/raintank/met/helper", + "github.com/raintank/met/statsd", "github.com/raintank/worldping-api/pkg/log", "github.com/rs/cors", "github.com/sergi/go-diff/diffmatchpatch", diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go new file mode 100644 index 0000000000..92d5daa056 --- /dev/null +++ b/cmd/mt-parrot/generate.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + log "github.com/sirupsen/logrus" + "time" + + "github.com/grafana/metrictank/schema" +) + +func produceArtificialMetrics(schemas []*schema.MetricData) { + for tick := range time.NewTicker(artificialMetricsInterval).C { + for _, metric := range schemas { + metric.Time = tick.Unix() + metric.Value = float64(tick.Unix()) + } + gateway.Flush(schemas) + log.Infof("flushed schemas for ts %d", tick.Unix()) + } +} + +//generateSchemas generates a MetricData that hashes to each of numPartitions partitions +func generateSchemas(numPartitions int32) []*schema.MetricData { + var metrics []*schema.MetricData + for i := int32(0); i < numPartitions; i++ { + metrics = append(metrics, generateSchema(i)) + } + return metrics +} + +//generateSchema generates a single MetricData that hashes to the given partition +func generateSchema(desiredPartition int32) *schema.MetricData { + metric := schema.MetricData{ + OrgId: orgId, + Unit: "partyparrots", + Mtype: "gauge", + Interval: int(artificialMetricsInterval.Seconds()), + } + + for i := 1; true; i++ { + metric.Name = fmt.Sprintf("parrot.testdata.%d.generated.%s", desiredPartition, generatePartitionSuffix(i)) + id, err := metric.PartitionID(partitionMethod, partitionCount) + if err != nil { + log.Fatal(err) + } + if id == desiredPartition { + log.Infof("metric for partition %d: %s", desiredPartition, metric.Name) + return &metric + } + } + return nil +} + +var alphabet = []rune("abcdefghijklmnopqrstuvwxyz") + +//generatePartitionSuffix deterministically generates a suffix for partition by brute force +func generatePartitionSuffix(i int) string { + if i == 0 { + return "" + } + return generatePartitionSuffix(i/26) + string(alphabet[i%26]) +} diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go new file mode 100644 index 0000000000..24c8d5eff8 --- /dev/null +++ b/cmd/mt-parrot/main.go @@ -0,0 +1,91 @@ +package main + +import ( + "github.com/grafana/metrictank/cmd/mt-fakemetrics/out" + "github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet" + "github.com/grafana/metrictank/logger" + "github.com/grafana/metrictank/schema" + "github.com/raintank/met/statsd" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "time" +) + +var ( + gatewayAddress string + gatewayKey string + orgId int + partitionCount int32 + partitionMethodString string + artificialMetricsInterval time.Duration + queryInterval time.Duration + logLevel string + + partitionMethod schema.PartitionByMethod + gateway out.Out +) + +func init() { + parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059/metrics", "the url of the metrics gateway to publish to") + parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests") + parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") + parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of partitions to publish parrot metrics to") + parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") + parrotCmd.Flags().DurationVar(&artificialMetricsInterval, "artificial-metrics-interval", 5*time.Second, "interval to send metrics") + parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") + + parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") + + formatter := &logger.TextFormatter{} + formatter.TimestampFormat = "2006-01-02 15:04:05.000" + log.SetFormatter(formatter) +} + +func main() { + err := parrotCmd.Execute() + if err != nil { + log.Fatal(err) + } +} + +var parrotCmd = &cobra.Command{ + Use: "parrot", + Short: "generate deterministic metrics for each metrictank partition", + Run: func(cmd *cobra.Command, args []string) { + lvl, err := log.ParseLevel(logLevel) + if err != nil { + log.Fatalf("failed to parse log-level, %s", err.Error()) + } + log.SetLevel(lvl) + parsePartitionMethod() + initGateway() + + schemas := generateSchemas(partitionCount) + go produceArtificialMetrics(schemas) + + monitor() + }, +} + +//parsePartitionMethod parses the partitionScheme cli flag, +//exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot). +func parsePartitionMethod() { + var err error + partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString) + if err != nil { + log.Fatal(err) + } + if partitionMethod == schema.PartitionByOrg { + log.Fatal("byOrg not supported") + } +} + +func initGateway() { + var err error + backend, _ := statsd.New(false, "", "") + gateway, err = gnet.New(gatewayAddress, gatewayKey, backend) + if err != nil { + log.Fatal(err) + } + log.Info("gateway initialized") +} diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go new file mode 100644 index 0000000000..293327e3df --- /dev/null +++ b/cmd/mt-parrot/monitor.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "github.com/grafana/metrictank/stacktest/graphite" + log "github.com/sirupsen/logrus" + "math" + "net/http" + "strconv" + "time" +) + +type seriesStats struct { + lastTs uint32 + //the partition currently being checked + partition int32 + //the number of nans present in the time series + nans int32 + //the sum of abs(value - ts) across the time series + deltaSum float64 + //the number of timestamps where value != ts + numNonMatching int32 + + //tracks the last seen non-NaN time stamp (useful for lag + lastSeen uint32 +} + +func monitor() { + for range time.NewTicker(queryInterval).C { + + query := graphite.ExecuteRenderQuery(buildRequest()) + + for _, s := range query.Decoded { + log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) + + stats := seriesStats{} + stats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts + + for _, dp := range s.Datapoints { + + if math.IsNaN(dp.Val) { + stats.nans += 1 + continue + } + if diff := dp.Val - float64(dp.Ts); diff != 0 { + stats.lastSeen = dp.Ts + stats.deltaSum += diff + stats.numNonMatching += 1 + } + } + + //TODO create/emit metrics for each partition + + //number of missing values for each series + fmt.Printf("parrot.monitoring.nanCount;partition=%d; %d\n", stats.partition, stats.nans) + //time since the last value was recorded + fmt.Printf("parrot.monitoring.lag;partition=%d; %d\n", stats.partition, stats.lastTs-stats.lastSeen) + //total amount of drift between expected value and actual values + fmt.Printf("parrot.monitoring.deltaSum;partition=%d; %f\n", stats.partition, stats.deltaSum) + //total number of entries where drift occurred + fmt.Printf("parrot.monitoring.nonMatching;partition=%d; %d\n", stats.partition, stats.numNonMatching) + fmt.Println() + } + } +} + +func buildRequest() *http.Request { + req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) + q := req.URL.Query() + q.Set("target", "parrot.testdata.*.generated.*") + //TODO parameterize this + q.Set("from", "-5min") + q.Set("until", "now") + q.Set("format", "json") + q.Set("X-Org-Id", strconv.Itoa(orgId)) + req.URL.RawQuery = q.Encode() + if len(gatewayKey) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", gatewayKey)) + } + return req +} diff --git a/docs/tools.md b/docs/tools.md index 5ef9410655..f623821f9c 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -534,6 +534,27 @@ It prints the MKey ``` +## mt-parrot + +``` +generate deterministic metrics for each metrictank partition + +Usage: + parrot [flags] + +Flags: + --artificial-metrics-interval duration interval to send metrics (default 5s) + --gateway-address string the url of the metrics gateway to publish to (default "http://localhost:6059/metrics") + --gateway-key string the bearer token to include with gateway requests + -h, --help help for parrot + --log-level string log level. panic|fatal|error|warning|info|debug (default "info") + --org-id int org id to publish parrot metrics to (default 1) + --partition-count int32 number of partitions to publish parrot metrics to (default 8) + --partition-method string the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") + --query-interval duration interval to query to validate metrics (default 10s) +``` + + ## mt-schemas-explain ``` From 646865801ec2693bcdc73e374ac62d583ada7aea Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:43:45 -0500 Subject: [PATCH 02/60] only use base url for gatewayAddress so that we can query it --- cmd/mt-parrot/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 24c8d5eff8..a2244bd1b4 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -26,7 +26,7 @@ var ( ) func init() { - parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059/metrics", "the url of the metrics gateway to publish to") + parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059", "the url of the metrics gateway") parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests") parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of partitions to publish parrot metrics to") @@ -83,7 +83,7 @@ func parsePartitionMethod() { func initGateway() { var err error backend, _ := statsd.New(false, "", "") - gateway, err = gnet.New(gatewayAddress, gatewayKey, backend) + gateway, err = gnet.New(gatewayAddress + "/metrics", gatewayKey, backend) if err != nil { log.Fatal(err) } From 993dcc74bc3bed1d55c3852ccb120907b1dcf06d Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:44:39 -0500 Subject: [PATCH 03/60] query with explicit time ranges --- cmd/mt-parrot/monitor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 293327e3df..4427591b01 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -26,9 +26,9 @@ type seriesStats struct { } func monitor() { - for range time.NewTicker(queryInterval).C { + for tick := range time.NewTicker(queryInterval).C { - query := graphite.ExecuteRenderQuery(buildRequest()) + query := graphite.ExecuteRenderQuery(buildRequest(tick)) for _, s := range query.Decoded { log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) @@ -64,13 +64,12 @@ func monitor() { } } -func buildRequest() *http.Request { +func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() q.Set("target", "parrot.testdata.*.generated.*") - //TODO parameterize this - q.Set("from", "-5min") - q.Set("until", "now") + q.Set("from", strconv.FormatInt(now.Add(-5*time.Minute).Unix(), 10)) + q.Set("until", strconv.FormatInt(now.Unix(), 10)) q.Set("format", "json") q.Set("X-Org-Id", strconv.Itoa(orgId)) req.URL.RawQuery = q.Encode() From 1ba76d33291c6136eb80314aaf0fd0918a37802a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:47:05 -0500 Subject: [PATCH 04/60] publish stats instead of printing them --- cmd/mt-parrot/monitor.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 4427591b01..2987973e33 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/grafana/metrictank/stacktest/graphite" + "github.com/grafana/metrictank/stats" log "github.com/sirupsen/logrus" "math" "net/http" @@ -33,33 +34,30 @@ func monitor() { for _, s := range query.Decoded { log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) - stats := seriesStats{} - stats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts + serStats := seriesStats{} + serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts for _, dp := range s.Datapoints { if math.IsNaN(dp.Val) { - stats.nans += 1 + serStats.nans += 1 continue } if diff := dp.Val - float64(dp.Ts); diff != 0 { - stats.lastSeen = dp.Ts - stats.deltaSum += diff - stats.numNonMatching += 1 + serStats.lastSeen = dp.Ts + serStats.deltaSum += diff + serStats.numNonMatching += 1 } } - //TODO create/emit metrics for each partition - //number of missing values for each series - fmt.Printf("parrot.monitoring.nanCount;partition=%d; %d\n", stats.partition, stats.nans) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", serStats.partition)).Set(int(serStats.nans)) //time since the last value was recorded - fmt.Printf("parrot.monitoring.lag;partition=%d; %d\n", stats.partition, stats.lastTs-stats.lastSeen) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", serStats.partition)).Set(int(serStats.lastTs - serStats.lastSeen)) //total amount of drift between expected value and actual values - fmt.Printf("parrot.monitoring.deltaSum;partition=%d; %f\n", stats.partition, stats.deltaSum) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", serStats.partition)).Set(int(serStats.deltaSum)) //total number of entries where drift occurred - fmt.Printf("parrot.monitoring.nonMatching;partition=%d; %d\n", stats.partition, stats.numNonMatching) - fmt.Println() + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", serStats.partition)).Set(int(serStats.numNonMatching)) } } } From e36561f094021763861e5545e05b1795478d9ae0 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:47:21 -0500 Subject: [PATCH 05/60] extract partition via regex --- cmd/mt-parrot/monitor.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 2987973e33..89bc323534 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -1,12 +1,14 @@ package main import ( + "errors" "fmt" "github.com/grafana/metrictank/stacktest/graphite" "github.com/grafana/metrictank/stats" log "github.com/sirupsen/logrus" "math" "net/http" + "regexp" "strconv" "time" ) @@ -34,6 +36,12 @@ func monitor() { for _, s := range query.Decoded { log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) + partition, err := extractPartition(s.Target) + if err != nil { + log.Debug("unable to extract partition", err) + stats.NewCounter32("parrot.monitoring.error.parsePartition").Inc() + continue + } serStats := seriesStats{} serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts @@ -62,6 +70,21 @@ func monitor() { } } +var pattern = regexp.MustCompile(`parrot.testdata.(\d+).generated.\w+`) + +func extractPartition(target string) (int32, error) { + submatch := pattern.FindStringSubmatch(target) + if len(submatch) < 2 { + return -1, errors.New(fmt.Sprintf("target [%s] did not match pattern", target)) + } + partition, err := strconv.Atoi(submatch[1]) + if err != nil { + return -1, err + } + return int32(partition), nil + +} + func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() From 8c917ceb83901a5ae96c9760871bd03484d37e3f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:47:30 -0500 Subject: [PATCH 06/60] more logging --- cmd/mt-parrot/monitor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 89bc323534..2b17113449 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -52,6 +52,7 @@ func monitor() { continue } if diff := dp.Val - float64(dp.Ts); diff != 0 { + log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) serStats.lastSeen = dp.Ts serStats.deltaSum += diff serStats.numNonMatching += 1 From 9fccc2195cf3818fc9532a5aea7f8b593e414b1b Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:49:11 -0500 Subject: [PATCH 07/60] stats for other failure modes --- cmd/mt-parrot/monitor.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 2b17113449..a09d34d51b 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -32,6 +32,12 @@ func monitor() { for tick := range time.NewTicker(queryInterval).C { query := graphite.ExecuteRenderQuery(buildRequest(tick)) + if query.HTTPErr != nil { + stats.NewCounter32("parrot.monitoring.error;error=http").Inc() + } + if query.DecodeErr != nil { + stats.NewCounter32("parrot.monitoring.error;error=decode").Inc() + } for _, s := range query.Decoded { log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) @@ -39,7 +45,7 @@ func monitor() { partition, err := extractPartition(s.Target) if err != nil { log.Debug("unable to extract partition", err) - stats.NewCounter32("parrot.monitoring.error.parsePartition").Inc() + stats.NewCounter32("parrot.monitoring.error;error=parsePartition").Inc() continue } serStats := seriesStats{} From f3875477ccf814f5c6959145d32eb3a84c532214 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:52:25 -0500 Subject: [PATCH 08/60] go fmt --- cmd/mt-parrot/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index a2244bd1b4..3d448f4056 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -83,7 +83,7 @@ func parsePartitionMethod() { func initGateway() { var err error backend, _ := statsd.New(false, "", "") - gateway, err = gnet.New(gatewayAddress + "/metrics", gatewayKey, backend) + gateway, err = gnet.New(gatewayAddress+"/metrics", gatewayKey, backend) if err != nil { log.Fatal(err) } From 74116e5ae303c54cee47ce1fa965bf4ac2d8634a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 20 Feb 2020 22:53:50 -0500 Subject: [PATCH 09/60] update docs --- docs/tools.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tools.md b/docs/tools.md index f623821f9c..7a5dbbf752 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -544,7 +544,7 @@ Usage: Flags: --artificial-metrics-interval duration interval to send metrics (default 5s) - --gateway-address string the url of the metrics gateway to publish to (default "http://localhost:6059/metrics") + --gateway-address string the url of the metrics gateway (default "http://localhost:6059") --gateway-key string the bearer token to include with gateway requests -h, --help help for parrot --log-level string log level. panic|fatal|error|warning|info|debug (default "info") From e5f70fbe2773340bdfb67e04f29c19dc0a4ce1a0 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Feb 2020 16:48:22 -0500 Subject: [PATCH 10/60] FormatInt -> Itoa --- cmd/mt-parrot/monitor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index a09d34d51b..c0fed6e277 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -96,8 +96,8 @@ func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() q.Set("target", "parrot.testdata.*.generated.*") - q.Set("from", strconv.FormatInt(now.Add(-5*time.Minute).Unix(), 10)) - q.Set("until", strconv.FormatInt(now.Unix(), 10)) + q.Set("from", strconv.Itoa(int(now.Add(-5*time.Minute).Unix()))) + q.Set("until", strconv.Itoa(int(now.Unix()))) q.Set("format", "json") q.Set("X-Org-Id", strconv.Itoa(orgId)) req.URL.RawQuery = q.Encode() From ec7ebf62e09c2a2c687dc7ef79a792e7dc7bc785 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Feb 2020 16:53:41 -0500 Subject: [PATCH 11/60] update flags to clarify that parrot doesn't control the number of partitions --- cmd/mt-parrot/main.go | 4 ++-- docs/tools.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 3d448f4056..2f6058088b 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -29,8 +29,8 @@ func init() { parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059", "the url of the metrics gateway") parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests") parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") - parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of partitions to publish parrot metrics to") - parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") + parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of kafka partitions in use") + parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") parrotCmd.Flags().DurationVar(&artificialMetricsInterval, "artificial-metrics-interval", 5*time.Second, "interval to send metrics") parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") diff --git a/docs/tools.md b/docs/tools.md index 7a5dbbf752..bd783e469a 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -549,8 +549,8 @@ Flags: -h, --help help for parrot --log-level string log level. panic|fatal|error|warning|info|debug (default "info") --org-id int org id to publish parrot metrics to (default 1) - --partition-count int32 number of partitions to publish parrot metrics to (default 8) - --partition-method string the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") + --partition-count int32 number of kafka partitions in use (default 8) + --partition-method string the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") --query-interval duration interval to query to validate metrics (default 10s) ``` From d18681fc9987908b614f600ac01030719d4f4ab1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Feb 2020 16:55:34 -0500 Subject: [PATCH 12/60] update command name --- cmd/mt-parrot/main.go | 2 +- docs/tools.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 2f6058088b..591499296d 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -49,7 +49,7 @@ func main() { } var parrotCmd = &cobra.Command{ - Use: "parrot", + Use: "mt-parrot", Short: "generate deterministic metrics for each metrictank partition", Run: func(cmd *cobra.Command, args []string) { lvl, err := log.ParseLevel(logLevel) diff --git a/docs/tools.md b/docs/tools.md index bd783e469a..54a9b5f31c 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -540,13 +540,13 @@ It prints the MKey generate deterministic metrics for each metrictank partition Usage: - parrot [flags] + mt-parrot [flags] Flags: --artificial-metrics-interval duration interval to send metrics (default 5s) --gateway-address string the url of the metrics gateway (default "http://localhost:6059") --gateway-key string the bearer token to include with gateway requests - -h, --help help for parrot + -h, --help help for mt-parrot --log-level string log level. panic|fatal|error|warning|info|debug (default "info") --org-id int org id to publish parrot metrics to (default 1) --partition-count int32 number of kafka partitions in use (default 8) From 62a9a737614d5ca54ec151914b9667a22d2c3890 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Feb 2020 23:14:41 -0500 Subject: [PATCH 13/60] declare counters once --- cmd/mt-parrot/monitor.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index c0fed6e277..2c8aa646d9 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -13,6 +13,11 @@ import ( "time" ) +var ( + httpError = stats.NewCounter32("parrot.monitoring.error;error=http") + decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") +) + type seriesStats struct { lastTs uint32 //the partition currently being checked @@ -33,10 +38,10 @@ func monitor() { query := graphite.ExecuteRenderQuery(buildRequest(tick)) if query.HTTPErr != nil { - stats.NewCounter32("parrot.monitoring.error;error=http").Inc() + httpError.Inc() } if query.DecodeErr != nil { - stats.NewCounter32("parrot.monitoring.error;error=decode").Inc() + decodeError.Inc() } for _, s := range query.Decoded { From 4b4fb0f5f2c07171b8bbb0d0567ca1667bc20d0e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Feb 2020 23:26:08 -0500 Subject: [PATCH 14/60] simplify partition parsing logic --- cmd/mt-parrot/monitor.go | 41 +++++++++++----------------------------- 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 2c8aa646d9..effa7e1db5 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -1,28 +1,25 @@ package main import ( - "errors" "fmt" "github.com/grafana/metrictank/stacktest/graphite" "github.com/grafana/metrictank/stats" log "github.com/sirupsen/logrus" "math" "net/http" - "regexp" "strconv" "time" ) var ( - httpError = stats.NewCounter32("parrot.monitoring.error;error=http") - decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") + httpError = stats.NewCounter32("parrot.monitoring.error;error=http") + decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") + parsePartitionError = stats.NewCounter32("parrot.monitoring.error;error=parsePartition") ) type seriesStats struct { lastTs uint32 //the partition currently being checked - partition int32 - //the number of nans present in the time series nans int32 //the sum of abs(value - ts) across the time series deltaSum float64 @@ -46,11 +43,10 @@ func monitor() { for _, s := range query.Decoded { log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) - - partition, err := extractPartition(s.Target) + partition, err := strconv.Atoi(s.Target) if err != nil { - log.Debug("unable to extract partition", err) - stats.NewCounter32("parrot.monitoring.error;error=parsePartition").Inc() + log.Debug("unable to parse partition", err) + parsePartitionError.Inc() continue } serStats := seriesStats{} @@ -71,36 +67,21 @@ func monitor() { } //number of missing values for each series - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", serStats.partition)).Set(int(serStats.nans)) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", partition)).Set(int(serStats.nans)) //time since the last value was recorded - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", serStats.partition)).Set(int(serStats.lastTs - serStats.lastSeen)) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", partition)).Set(int(serStats.lastTs - serStats.lastSeen)) //total amount of drift between expected value and actual values - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", serStats.partition)).Set(int(serStats.deltaSum)) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", partition)).Set(int(serStats.deltaSum)) //total number of entries where drift occurred - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", serStats.partition)).Set(int(serStats.numNonMatching)) + stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", partition)).Set(int(serStats.numNonMatching)) } } } -var pattern = regexp.MustCompile(`parrot.testdata.(\d+).generated.\w+`) - -func extractPartition(target string) (int32, error) { - submatch := pattern.FindStringSubmatch(target) - if len(submatch) < 2 { - return -1, errors.New(fmt.Sprintf("target [%s] did not match pattern", target)) - } - partition, err := strconv.Atoi(submatch[1]) - if err != nil { - return -1, err - } - return int32(partition), nil - -} - func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() - q.Set("target", "parrot.testdata.*.generated.*") + q.Set("target", "aliasByNode(parrot.testdata.*.generated.*, 2)") q.Set("from", strconv.Itoa(int(now.Add(-5*time.Minute).Unix()))) q.Set("until", strconv.Itoa(int(now.Unix()))) q.Set("format", "json") From 1080059b0401c0dc7bd579e0da296f1257a17ec4 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 08:34:29 -0500 Subject: [PATCH 15/60] tweak parition suffix generation function --- cmd/mt-parrot/generate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 92d5daa056..04d847edbb 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -55,8 +55,8 @@ var alphabet = []rune("abcdefghijklmnopqrstuvwxyz") //generatePartitionSuffix deterministically generates a suffix for partition by brute force func generatePartitionSuffix(i int) string { - if i == 0 { - return "" + if i > 25 { + return generatePartitionSuffix((i/26)-1) + string(alphabet[i%26]) } - return generatePartitionSuffix(i/26) + string(alphabet[i%26]) + return string(alphabet[i%26]) } From 1a498fc7dd20a06e619996e824febef36beff9cd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 08:36:18 -0500 Subject: [PATCH 16/60] artificial -> test --- cmd/mt-parrot/generate.go | 6 +++--- cmd/mt-parrot/main.go | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 04d847edbb..5820535616 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -8,8 +8,8 @@ import ( "github.com/grafana/metrictank/schema" ) -func produceArtificialMetrics(schemas []*schema.MetricData) { - for tick := range time.NewTicker(artificialMetricsInterval).C { +func produceTestMetrics(schemas []*schema.MetricData) { + for tick := range time.NewTicker(testMetricsInterval).C { for _, metric := range schemas { metric.Time = tick.Unix() metric.Value = float64(tick.Unix()) @@ -34,7 +34,7 @@ func generateSchema(desiredPartition int32) *schema.MetricData { OrgId: orgId, Unit: "partyparrots", Mtype: "gauge", - Interval: int(artificialMetricsInterval.Seconds()), + Interval: int(testMetricsInterval.Seconds()), } for i := 1; true; i++ { diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 591499296d..87166dcceb 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -12,14 +12,14 @@ import ( ) var ( - gatewayAddress string - gatewayKey string - orgId int - partitionCount int32 - partitionMethodString string - artificialMetricsInterval time.Duration - queryInterval time.Duration - logLevel string + gatewayAddress string + gatewayKey string + orgId int + partitionCount int32 + partitionMethodString string + testMetricsInterval time.Duration + queryInterval time.Duration + logLevel string partitionMethod schema.PartitionByMethod gateway out.Out @@ -31,7 +31,7 @@ func init() { parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of kafka partitions in use") parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") - parrotCmd.Flags().DurationVar(&artificialMetricsInterval, "artificial-metrics-interval", 5*time.Second, "interval to send metrics") + parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 5*time.Second, "interval to send test metrics") parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") @@ -61,7 +61,7 @@ var parrotCmd = &cobra.Command{ initGateway() schemas := generateSchemas(partitionCount) - go produceArtificialMetrics(schemas) + go produceTestMetrics(schemas) monitor() }, From af6b666d69483717293a25d349a99536c322ca63 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 08:38:13 -0500 Subject: [PATCH 17/60] bump default test interval 5s -> 10s + update docs --- cmd/mt-parrot/main.go | 2 +- docs/tools.md | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 87166dcceb..4b63990e18 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -31,7 +31,7 @@ func init() { parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of kafka partitions in use") parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") - parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 5*time.Second, "interval to send test metrics") + parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 10*time.Second, "interval to send test metrics") parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") diff --git a/docs/tools.md b/docs/tools.md index 54a9b5f31c..09881d7561 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -543,15 +543,15 @@ Usage: mt-parrot [flags] Flags: - --artificial-metrics-interval duration interval to send metrics (default 5s) - --gateway-address string the url of the metrics gateway (default "http://localhost:6059") - --gateway-key string the bearer token to include with gateway requests - -h, --help help for mt-parrot - --log-level string log level. panic|fatal|error|warning|info|debug (default "info") - --org-id int org id to publish parrot metrics to (default 1) - --partition-count int32 number of kafka partitions in use (default 8) - --partition-method string the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") - --query-interval duration interval to query to validate metrics (default 10s) + --gateway-address string the url of the metrics gateway (default "http://localhost:6059") + --gateway-key string the bearer token to include with gateway requests + -h, --help help for mt-parrot + --log-level string log level. panic|fatal|error|warning|info|debug (default "info") + --org-id int org id to publish parrot metrics to (default 1) + --partition-count int32 number of kafka partitions in use (default 8) + --partition-method string the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") + --query-interval duration interval to query to validate metrics (default 10s) + --test-metrics-interval duration interval to send test metrics (default 10s) ``` From f330794f6f4b709cd7bc486c5f178aa66f6b3dab Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 14:38:25 -0500 Subject: [PATCH 18/60] gateway -> publisher --- cmd/mt-parrot/generate.go | 2 +- cmd/mt-parrot/main.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 5820535616..60ee83c22f 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -14,7 +14,7 @@ func produceTestMetrics(schemas []*schema.MetricData) { metric.Time = tick.Unix() metric.Value = float64(tick.Unix()) } - gateway.Flush(schemas) + publisher.Flush(schemas) log.Infof("flushed schemas for ts %d", tick.Unix()) } } diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 4b63990e18..c3bd708280 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -22,7 +22,7 @@ var ( logLevel string partitionMethod schema.PartitionByMethod - gateway out.Out + publisher out.Out ) func init() { @@ -83,7 +83,7 @@ func parsePartitionMethod() { func initGateway() { var err error backend, _ := statsd.New(false, "", "") - gateway, err = gnet.New(gatewayAddress+"/metrics", gatewayKey, backend) + publisher, err = gnet.New(gatewayAddress+"/metrics", gatewayKey, backend) if err != nil { log.Fatal(err) } From 69e110b2ea2b05402baeff9d25497e4f4ea7528b Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 14:55:38 -0500 Subject: [PATCH 19/60] fix lastSeen tracking --- cmd/mt-parrot/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index effa7e1db5..92edc37bf6 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -58,9 +58,9 @@ func monitor() { serStats.nans += 1 continue } + serStats.lastSeen = dp.Ts if diff := dp.Val - float64(dp.Ts); diff != 0 { log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) - serStats.lastSeen = dp.Ts serStats.deltaSum += diff serStats.numNonMatching += 1 } From 755f474a8fdc867920a1846b27aaba59e585d293 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 26 Feb 2020 14:57:06 -0500 Subject: [PATCH 20/60] limit number of error types tracked --- cmd/mt-parrot/monitor.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 92edc37bf6..70b8930d42 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -13,8 +13,7 @@ import ( var ( httpError = stats.NewCounter32("parrot.monitoring.error;error=http") - decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") - parsePartitionError = stats.NewCounter32("parrot.monitoring.error;error=parsePartition") + invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") ) type seriesStats struct { @@ -38,7 +37,7 @@ func monitor() { httpError.Inc() } if query.DecodeErr != nil { - decodeError.Inc() + invalidError.Inc() } for _, s := range query.Decoded { @@ -46,7 +45,7 @@ func monitor() { partition, err := strconv.Atoi(s.Target) if err != nil { log.Debug("unable to parse partition", err) - parsePartitionError.Inc() + invalidError.Inc() continue } serStats := seriesStats{} From d9672614655756d2cfeb33d38f26ff30f9d4e188 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 27 Feb 2020 10:52:45 -0500 Subject: [PATCH 21/60] initialize metrics up front --- cmd/mt-parrot/monitor.go | 44 ++++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 70b8930d42..94fdc1b6f6 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -12,8 +12,8 @@ import ( ) var ( - httpError = stats.NewCounter32("parrot.monitoring.error;error=http") - invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") + httpError = stats.NewCounter32("parrot.monitoring.error;error=http") + invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") ) type seriesStats struct { @@ -24,12 +24,23 @@ type seriesStats struct { deltaSum float64 //the number of timestamps where value != ts numNonMatching int32 - //tracks the last seen non-NaN time stamp (useful for lag lastSeen uint32 } +type partitionMetrics struct { + //number of missing values for each series + nanCount *stats.Gauge32 + //time since the last value was recorded + lag *stats.Gauge32 + //total amount of drift between expected value and actual values + deltaSum *stats.Gauge32 + //total number of entries where drift occurred + nonMatching *stats.Gauge32 +} + func monitor() { + metricsBySeries := initMetricsBySeries() for tick := range time.NewTicker(queryInterval).C { query := graphite.ExecuteRenderQuery(buildRequest(tick)) @@ -65,16 +76,27 @@ func monitor() { } } - //number of missing values for each series - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", partition)).Set(int(serStats.nans)) - //time since the last value was recorded - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", partition)).Set(int(serStats.lastTs - serStats.lastSeen)) - //total amount of drift between expected value and actual values - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", partition)).Set(int(serStats.deltaSum)) - //total number of entries where drift occurred - stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", partition)).Set(int(serStats.numNonMatching)) + metrics := metricsBySeries[partition] + metrics.nanCount.Set(int(serStats.nans)) + metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen)) + metrics.deltaSum.Set(int(serStats.deltaSum)) + metrics.nonMatching.Set(int(serStats.numNonMatching)) + } + } +} + +func initMetricsBySeries() []partitionMetrics { + var metricsBySeries []partitionMetrics + for p := 0; p < int(partitionCount); p++ { + metrics := partitionMetrics{ + nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), + lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), + deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), + nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), } + metricsBySeries = append(metricsBySeries, metrics) } + return metricsBySeries } func buildRequest(now time.Time) *http.Request { From 14187eb5d818ce8485b91f1a2b0e0f720c08af56 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 27 Feb 2020 11:00:14 -0500 Subject: [PATCH 22/60] use aligned lossless tick --- cmd/mt-parrot/generate.go | 7 +++---- cmd/mt-parrot/monitor.go | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 60ee83c22f..53fff370ab 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -2,14 +2,13 @@ package main import ( "fmt" - log "github.com/sirupsen/logrus" - "time" - + "github.com/grafana/metrictank/clock" "github.com/grafana/metrictank/schema" + log "github.com/sirupsen/logrus" ) func produceTestMetrics(schemas []*schema.MetricData) { - for tick := range time.NewTicker(testMetricsInterval).C { + for tick := range clock.AlignedTickLossless(testMetricsInterval) { for _, metric := range schemas { metric.Time = tick.Unix() metric.Value = float64(tick.Unix()) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 94fdc1b6f6..96d3ebe723 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/grafana/metrictank/clock" "github.com/grafana/metrictank/stacktest/graphite" "github.com/grafana/metrictank/stats" log "github.com/sirupsen/logrus" @@ -41,7 +42,7 @@ type partitionMetrics struct { func monitor() { metricsBySeries := initMetricsBySeries() - for tick := range time.NewTicker(queryInterval).C { + for tick := range clock.AlignedTickLossless(queryInterval) { query := graphite.ExecuteRenderQuery(buildRequest(tick)) if query.HTTPErr != nil { From c19b30457a8b55859454784eec794fbc36f3deda Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 27 Feb 2020 14:11:43 -0500 Subject: [PATCH 23/60] bring decode error back --- cmd/mt-parrot/monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 96d3ebe723..0f92f26db9 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -14,6 +14,7 @@ import ( var ( httpError = stats.NewCounter32("parrot.monitoring.error;error=http") + decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") ) @@ -49,7 +50,7 @@ func monitor() { httpError.Inc() } if query.DecodeErr != nil { - invalidError.Inc() + decodeError.Inc() } for _, s := range query.Decoded { From 86c7cff45d05d7d11a03870f764aeb0c0083af23 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 28 Feb 2020 06:19:21 -0500 Subject: [PATCH 24/60] refactor, pull series metrics gathering into a function --- cmd/mt-parrot/monitor.go | 67 +++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 0f92f26db9..bc3f7fb51f 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -14,10 +14,12 @@ import ( var ( httpError = stats.NewCounter32("parrot.monitoring.error;error=http") - decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") + decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") ) +var metricsBySeries []partitionMetrics + type seriesStats struct { lastTs uint32 //the partition currently being checked @@ -42,7 +44,7 @@ type partitionMetrics struct { } func monitor() { - metricsBySeries := initMetricsBySeries() + initMetricsBySeries() for tick := range clock.AlignedTickLossless(queryInterval) { query := graphite.ExecuteRenderQuery(buildRequest(tick)) @@ -54,41 +56,45 @@ func monitor() { } for _, s := range query.Decoded { - log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) - partition, err := strconv.Atoi(s.Target) - if err != nil { - log.Debug("unable to parse partition", err) - invalidError.Inc() - continue - } - serStats := seriesStats{} - serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts + processPartitionSeries(s) + } + } +} - for _, dp := range s.Datapoints { - if math.IsNaN(dp.Val) { - serStats.nans += 1 - continue - } - serStats.lastSeen = dp.Ts - if diff := dp.Val - float64(dp.Ts); diff != 0 { - log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) - serStats.deltaSum += diff - serStats.numNonMatching += 1 - } - } +func processPartitionSeries(s graphite.Series) { + log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) + partition, err := strconv.Atoi(s.Target) + if err != nil { + log.Debug("unable to parse partition", err) + invalidError.Inc() + return + } + serStats := seriesStats{} + serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts + + for _, dp := range s.Datapoints { - metrics := metricsBySeries[partition] - metrics.nanCount.Set(int(serStats.nans)) - metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen)) - metrics.deltaSum.Set(int(serStats.deltaSum)) - metrics.nonMatching.Set(int(serStats.numNonMatching)) + if math.IsNaN(dp.Val) { + serStats.nans += 1 + continue + } + serStats.lastSeen = dp.Ts + if diff := dp.Val - float64(dp.Ts); diff != 0 { + log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) + serStats.deltaSum += diff + serStats.numNonMatching += 1 } } + + metrics := metricsBySeries[partition] + metrics.nanCount.Set(int(serStats.nans)) + metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen)) + metrics.deltaSum.Set(int(serStats.deltaSum)) + metrics.nonMatching.Set(int(serStats.numNonMatching)) } -func initMetricsBySeries() []partitionMetrics { - var metricsBySeries []partitionMetrics +func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), @@ -98,7 +104,6 @@ func initMetricsBySeries() []partitionMetrics { } metricsBySeries = append(metricsBySeries, metrics) } - return metricsBySeries } func buildRequest(now time.Time) *http.Request { From 6ccddd9bf21471941468b926752aa52e1376d091 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:18:28 -0500 Subject: [PATCH 25/60] add and validate parameter to control how far back we query --- cmd/mt-parrot/main.go | 5 +++++ cmd/mt-parrot/monitor.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index c3bd708280..d020493add 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -19,6 +19,7 @@ var ( partitionMethodString string testMetricsInterval time.Duration queryInterval time.Duration + lookbackPeriod time.Duration logLevel string partitionMethod schema.PartitionByMethod @@ -33,6 +34,7 @@ func init() { parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 10*time.Second, "interval to send test metrics") parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") + parrotCmd.Flags().DurationVar(&lookbackPeriod, "lookback-period", 5*time.Minute, "how far to look back when validating metrics") parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") @@ -56,6 +58,9 @@ var parrotCmd = &cobra.Command{ if err != nil { log.Fatalf("failed to parse log-level, %s", err.Error()) } + if int(lookbackPeriod.Seconds())%int(testMetricsInterval) != 0 { + log.Fatal("lookback period must be evenly divisible by test metrics interval") + } log.SetLevel(lvl) parsePartitionMethod() initGateway() diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index bc3f7fb51f..e3019b54f6 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -110,7 +110,7 @@ func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() q.Set("target", "aliasByNode(parrot.testdata.*.generated.*, 2)") - q.Set("from", strconv.Itoa(int(now.Add(-5*time.Minute).Unix()))) + q.Set("from", strconv.Itoa(int(now.Add(-1*lookbackPeriod).Unix()-1))) q.Set("until", strconv.Itoa(int(now.Unix()))) q.Set("format", "json") q.Set("X-Org-Id", strconv.Itoa(orgId)) From 0b0c9ceb3ac7210dd53d37c356096122d530eb1a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:18:47 -0500 Subject: [PATCH 26/60] break out if we encounter an error --- cmd/mt-parrot/monitor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index e3019b54f6..dba95ec9f3 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -50,9 +50,11 @@ func monitor() { query := graphite.ExecuteRenderQuery(buildRequest(tick)) if query.HTTPErr != nil { httpError.Inc() + continue } if query.DecodeErr != nil { decodeError.Inc() + continue } for _, s := range query.Decoded { From 713927e755e3091bab3685bfa695d71d70f54f5f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:19:38 -0500 Subject: [PATCH 27/60] Add additional validation for correct number of points that are correctly spaced etc --- cmd/mt-parrot/monitor.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index dba95ec9f3..6e0e835dd9 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -30,6 +30,12 @@ type seriesStats struct { numNonMatching int32 //tracks the last seen non-NaN time stamp (useful for lag lastSeen uint32 + //the expected number of points were received + correctNumPoints bool + //the last ts matches `now` + correctAlignment bool + //all points are sorted and 1 period apart + correctSpacing bool } type partitionMetrics struct { @@ -58,25 +64,31 @@ func monitor() { } for _, s := range query.Decoded { - processPartitionSeries(s) + processPartitionSeries(s, tick) } } } - -func processPartitionSeries(s graphite.Series) { - log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) +func processPartitionSeries(s graphite.Series, now time.Time) { partition, err := strconv.Atoi(s.Target) if err != nil { log.Debug("unable to parse partition", err) invalidError.Inc() return } + if len(s.Datapoints) < 2 { + log.Debugf("partition has invalid number of datapoints: %d", len(s.Datapoints)) + invalidError.Inc() + return + } + serStats := seriesStats{} serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts + serStats.correctAlignment = int64(serStats.lastTs) == now.Unix() + serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)+1 + serStats.correctSpacing = checkSpacing(s.Datapoints) for _, dp := range s.Datapoints { - if math.IsNaN(dp.Val) { serStats.nans += 1 continue @@ -96,6 +108,18 @@ func processPartitionSeries(s graphite.Series) { metrics.nonMatching.Set(int(serStats.numNonMatching)) } +func checkSpacing(points []graphite.Point) bool { + previous := points[0].Ts + for i := 1; i < len(points); i++ { + current := points[i].Ts + if current-previous != uint32(testMetricsInterval.Seconds()) { + return false + } + previous = current + } + return true +} + func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ From 362f914fae36599eef92dac3221ebf0de24c2f5b Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:26:16 -0500 Subject: [PATCH 28/60] set lag based on last publish time --- cmd/mt-parrot/generate.go | 2 ++ cmd/mt-parrot/main.go | 1 + cmd/mt-parrot/monitor.go | 3 ++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 53fff370ab..6504e6fa17 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/metrictank/clock" "github.com/grafana/metrictank/schema" log "github.com/sirupsen/logrus" + "sync/atomic" ) func produceTestMetrics(schemas []*schema.MetricData) { @@ -14,6 +15,7 @@ func produceTestMetrics(schemas []*schema.MetricData) { metric.Value = float64(tick.Unix()) } publisher.Flush(schemas) + atomic.StoreInt64(&lastPublish, tick.Unix()) log.Infof("flushed schemas for ts %d", tick.Unix()) } } diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index d020493add..5a494841f5 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -21,6 +21,7 @@ var ( queryInterval time.Duration lookbackPeriod time.Duration logLevel string + lastPublish int64 partitionMethod schema.PartitionByMethod publisher out.Out diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 6e0e835dd9..ad87a3341e 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -103,7 +103,8 @@ func processPartitionSeries(s graphite.Series, now time.Time) { metrics := metricsBySeries[partition] metrics.nanCount.Set(int(serStats.nans)) - metrics.lag.Set(int(serStats.lastTs - serStats.lastSeen)) + lag := lastPublish - int64(serStats.lastSeen) + metrics.lag.Set(int(lag)) metrics.deltaSum.Set(int(serStats.deltaSum)) metrics.nonMatching.Set(int(serStats.numNonMatching)) } From ab338f2c61860bb793f5eed6963ae824fa77c75b Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:32:05 -0500 Subject: [PATCH 29/60] fix validation of lookback parameter --- cmd/mt-parrot/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 5a494841f5..0e375cca1c 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -59,7 +59,7 @@ var parrotCmd = &cobra.Command{ if err != nil { log.Fatalf("failed to parse log-level, %s", err.Error()) } - if int(lookbackPeriod.Seconds())%int(testMetricsInterval) != 0 { + if int(lookbackPeriod.Seconds())%int(testMetricsInterval.Seconds()) != 0 { log.Fatal("lookback period must be evenly divisible by test metrics interval") } log.SetLevel(lvl) From d06aa84b0ac8dc0aca6ebcc2030c018311b6a4d4 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 3 Mar 2020 11:36:38 -0500 Subject: [PATCH 30/60] actually export metrics for new stats --- cmd/mt-parrot/monitor.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index ad87a3341e..aec13545e8 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -47,6 +47,12 @@ type partitionMetrics struct { deltaSum *stats.Gauge32 //total number of entries where drift occurred nonMatching *stats.Gauge32 + //the expected number of points were received + correctNumPoints *stats.Bool + //the last ts matches `now` + correctAlignment *stats.Bool + //all points are sorted and 1 period apart + correctSpacing *stats.Bool } func monitor() { @@ -107,6 +113,9 @@ func processPartitionSeries(s graphite.Series, now time.Time) { metrics.lag.Set(int(lag)) metrics.deltaSum.Set(int(serStats.deltaSum)) metrics.nonMatching.Set(int(serStats.numNonMatching)) + metrics.correctNumPoints.Set(serStats.correctNumPoints) + metrics.correctAlignment.Set(serStats.correctAlignment) + metrics.correctSpacing.Set(serStats.correctSpacing) } func checkSpacing(points []graphite.Point) bool { @@ -124,10 +133,13 @@ func checkSpacing(points []graphite.Point) bool { func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ - nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), - lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), - deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), - nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), + nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), + lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), + deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), + nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), + correctNumPoints: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctNumPoints;partition=%d", p)), + correctAlignment: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctAlignment;partition=%d", p)), + correctSpacing: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctSpacing;partition=%d", p)), } metricsBySeries = append(metricsBySeries, metrics) } From c57c5affca5e52e99b92924f11ee1703ee254499 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 4 Mar 2020 00:23:58 -0500 Subject: [PATCH 31/60] update docs --- docs/tools.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/tools.md b/docs/tools.md index 09881d7561..01be644a47 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -547,6 +547,7 @@ Flags: --gateway-key string the bearer token to include with gateway requests -h, --help help for mt-parrot --log-level string log level. panic|fatal|error|warning|info|debug (default "info") + --lookback-period duration how far to look back when validating metrics (default 5m0s) --org-id int org id to publish parrot metrics to (default 1) --partition-count int32 number of kafka partitions in use (default 8) --partition-method string the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") From 0a7553e02df11e97f7e44403a1f2e69cedfa2a2c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 11 Mar 2020 20:04:17 +0200 Subject: [PATCH 32/60] clearer desc --- cmd/mt-parrot/monitor.go | 61 ++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index aec13545e8..78ed7dbb10 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -21,38 +21,24 @@ var ( var metricsBySeries []partitionMetrics type seriesStats struct { - lastTs uint32 - //the partition currently being checked - nans int32 - //the sum of abs(value - ts) across the time series - deltaSum float64 - //the number of timestamps where value != ts - numNonMatching int32 - //tracks the last seen non-NaN time stamp (useful for lag - lastSeen uint32 - //the expected number of points were received - correctNumPoints bool - //the last ts matches `now` - correctAlignment bool - //all points are sorted and 1 period apart - correctSpacing bool + lastTs uint32 + nans int32 // the partition currently being checked - nope? + deltaSum float64 // sum of abs(value - ts) across the time series + numNonMatching int32 // number of timestamps where value != ts + lastSeen uint32 // the last seen non-NaN time stamp (useful for lag) + correctNumPoints bool // whether the expected number of points were received + correctAlignment bool // whether the last ts matches `now` + correctSpacing bool // whether all points are sorted and 1 period apart } type partitionMetrics struct { - //number of missing values for each series - nanCount *stats.Gauge32 - //time since the last value was recorded - lag *stats.Gauge32 - //total amount of drift between expected value and actual values - deltaSum *stats.Gauge32 - //total number of entries where drift occurred - nonMatching *stats.Gauge32 - //the expected number of points were received - correctNumPoints *stats.Bool - //the last ts matches `now` - correctAlignment *stats.Bool - //all points are sorted and 1 period apart - correctSpacing *stats.Bool + nanCount *stats.Gauge32 // the number of missing values for each series + lag *stats.Gauge32 // time since the last value was recorded + deltaSum *stats.Gauge32 // the total amount of drift between expected value and actual values + nonMatching *stats.Gauge32 // total number of entries where drift occurred + correctNumPoints *stats.Bool // whether the expected number of points were received + correctAlignment *stats.Bool // whether the last ts matches `now` + correctSpacing *stats.Bool // whether all points are sorted and 1 period apart } func monitor() { @@ -133,13 +119,20 @@ func checkSpacing(points []graphite.Point) bool { func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ - nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), - lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), - deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), - nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), + // metric parrot.monitoring.nancount is the number of missing values for each series + nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), + // metric parrot.monitoring.lag is the time since the last value was recorded + lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), + // metric parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values + deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), + // metric parrot.monitoring.nonMatching is the total number of entries where drift occurred + nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), + // metric parrot.monitoring.correctNumPoints is whether the expected number of points were received correctNumPoints: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctNumPoints;partition=%d", p)), + // metric parrot.monitoring.correctAlignment is whether the last ts matches `now` correctAlignment: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctAlignment;partition=%d", p)), - correctSpacing: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctSpacing;partition=%d", p)), + // metric parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart + correctSpacing: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctSpacing;partition=%d", p)), } metricsBySeries = append(metricsBySeries, metrics) } From 679eabc714af624fa45c5fea35d3a5956d3a2d39 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 02:46:40 -0400 Subject: [PATCH 33/60] initialize stats and manually set reporting timestamps --- cmd/mt-parrot/main.go | 22 ++++++++++++++++++++++ cmd/mt-parrot/monitor.go | 1 + docs/tools.md | 4 ++++ 3 files changed, 27 insertions(+) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 0e375cca1c..efee94485e 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -5,9 +5,12 @@ import ( "github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet" "github.com/grafana/metrictank/logger" "github.com/grafana/metrictank/schema" + "github.com/grafana/metrictank/stats" "github.com/raintank/met/statsd" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "os" + "strings" "time" ) @@ -23,6 +26,12 @@ var ( logLevel string lastPublish int64 + statsGraphite *stats.Graphite + statsPrefix string + statsAddr string + statsBufferSize int + statsTimeout time.Duration + partitionMethod schema.PartitionByMethod publisher out.Out ) @@ -36,6 +45,10 @@ func init() { parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 10*time.Second, "interval to send test metrics") parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") parrotCmd.Flags().DurationVar(&lookbackPeriod, "lookback-period", 5*time.Minute, "how far to look back when validating metrics") + parrotCmd.Flags().StringVar(&statsPrefix, "stats-prefix", "", "stats prefix (will add trailing dot automatically if needed)") + parrotCmd.Flags().StringVar(&statsAddr, "stats-address", "localhost:2003", "address to send monitoring statistics to") + parrotCmd.Flags().IntVar(&statsBufferSize, "stats-buffer-size", 20000, "how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable.") + parrotCmd.Flags().DurationVar(&statsTimeout, "stats-timeout", time.Second*10, "timeout after which a write is considered not successful") parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") @@ -62,9 +75,11 @@ var parrotCmd = &cobra.Command{ if int(lookbackPeriod.Seconds())%int(testMetricsInterval.Seconds()) != 0 { log.Fatal("lookback period must be evenly divisible by test metrics interval") } + log.SetLevel(lvl) parsePartitionMethod() initGateway() + initStats() schemas := generateSchemas(partitionCount) go produceTestMetrics(schemas) @@ -95,3 +110,10 @@ func initGateway() { } log.Info("gateway initialized") } + +func initStats() { + hostname, _ := os.Hostname() + prefix := strings.Replace(statsPrefix, "$hostname", strings.Replace(hostname, ".", "_", -1), -1) + //need to use a negative interval so we can manually set the report timestamps + statsGraphite = stats.NewGraphite(prefix, statsAddr, -1, statsBufferSize, statsTimeout) +} diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 78ed7dbb10..7a027fd1f3 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -58,6 +58,7 @@ func monitor() { for _, s := range query.Decoded { processPartitionSeries(s, tick) } + statsGraphite.Report(tick) } } diff --git a/docs/tools.md b/docs/tools.md index 01be644a47..038d510827 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -552,6 +552,10 @@ Flags: --partition-count int32 number of kafka partitions in use (default 8) --partition-method string the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv (default "bySeries") --query-interval duration interval to query to validate metrics (default 10s) + --stats-address string address to send monitoring statistics to (default "localhost:2003") + --stats-buffer-size int how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable. (default 20000) + --stats-prefix string stats prefix (will add trailing dot automatically if needed) + --stats-timeout duration timeout after which a write is considered not successful (default 10s) --test-metrics-interval duration interval to send test metrics (default 10s) ``` From 26e2b19b85e9632477827a6f43d9cc07a93ef4d6 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 02:57:06 -0400 Subject: [PATCH 34/60] load atomically --- cmd/mt-parrot/monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 7a027fd1f3..37bbe0414b 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -9,6 +9,7 @@ import ( "math" "net/http" "strconv" + "sync/atomic" "time" ) @@ -96,7 +97,7 @@ func processPartitionSeries(s graphite.Series, now time.Time) { metrics := metricsBySeries[partition] metrics.nanCount.Set(int(serStats.nans)) - lag := lastPublish - int64(serStats.lastSeen) + lag := atomic.LoadInt64(&lastPublish) - int64(serStats.lastSeen) metrics.lag.Set(int(lag)) metrics.deltaSum.Set(int(serStats.deltaSum)) metrics.nonMatching.Set(int(serStats.numNonMatching)) From f9d103b7b354dec2a036e9f40b1f0313afa295bd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 02:59:50 -0400 Subject: [PATCH 35/60] update metric name --- cmd/mt-parrot/generate.go | 2 +- cmd/mt-parrot/monitor.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 6504e6fa17..414a8f1315 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -39,7 +39,7 @@ func generateSchema(desiredPartition int32) *schema.MetricData { } for i := 1; true; i++ { - metric.Name = fmt.Sprintf("parrot.testdata.%d.generated.%s", desiredPartition, generatePartitionSuffix(i)) + metric.Name = fmt.Sprintf("parrot.testdata.%d.identity.%s", desiredPartition, generatePartitionSuffix(i)) id, err := metric.PartitionID(partitionMethod, partitionCount) if err != nil { log.Fatal(err) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 37bbe0414b..5379774acd 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -143,7 +143,7 @@ func initMetricsBySeries() { func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() - q.Set("target", "aliasByNode(parrot.testdata.*.generated.*, 2)") + q.Set("target", "aliasByNode(parrot.testdata.*.identity.*, 2)") q.Set("from", strconv.Itoa(int(now.Add(-1*lookbackPeriod).Unix()-1))) q.Set("until", strconv.Itoa(int(now.Unix()))) q.Set("format", "json") From 6ce3660a3a29702bba2035dc91a060c944c6d6cf Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 03:04:00 -0400 Subject: [PATCH 36/60] update log --- cmd/mt-parrot/generate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 414a8f1315..20e54cec9c 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -16,7 +16,7 @@ func produceTestMetrics(schemas []*schema.MetricData) { } publisher.Flush(schemas) atomic.StoreInt64(&lastPublish, tick.Unix()) - log.Infof("flushed schemas for ts %d", tick.Unix()) + log.Infof("flushed metrics for ts %d", tick.Unix()) } } From 496a4cbbf1d246fa841d60ec61933a6df2a7a7ab Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 09:53:19 -0400 Subject: [PATCH 37/60] use tag constructor for tag based metrics --- cmd/mt-parrot/monitor.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 5379774acd..b45c8526c8 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -14,9 +14,9 @@ import ( ) var ( - httpError = stats.NewCounter32("parrot.monitoring.error;error=http") - decodeError = stats.NewCounter32("parrot.monitoring.error;error=decode") - invalidError = stats.NewCounter32("parrot.monitoring.error;error=invalid") + httpError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=http") + decodeError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=decode") + invalidError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=invalid") ) var metricsBySeries []partitionMetrics @@ -122,19 +122,19 @@ func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ // metric parrot.monitoring.nancount is the number of missing values for each series - nanCount: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nancount;partition=%d", p)), + nanCount: stats.NewGauge32WithTags("parrot.monitoring.nancount", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.lag is the time since the last value was recorded - lag: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.lag;partition=%d", p)), + lag: stats.NewGauge32WithTags("parrot.monitoring.lag", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values - deltaSum: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.deltaSum;partition=%d", p)), + deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.nonMatching is the total number of entries where drift occurred - nonMatching: stats.NewGauge32(fmt.Sprintf("parrot.monitoring.nonMatching;partition=%d", p)), + nonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonMatching", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.correctNumPoints is whether the expected number of points were received - correctNumPoints: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctNumPoints;partition=%d", p)), + correctNumPoints: stats.NewBoolWithTags("parrot.monitoring.correctNumPoints", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.correctAlignment is whether the last ts matches `now` - correctAlignment: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctAlignment;partition=%d", p)), + correctAlignment: stats.NewBoolWithTags("parrot.monitoring.correctAlignment", fmt.Sprintf(";partition=%d", p)), // metric parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart - correctSpacing: stats.NewBool(fmt.Sprintf("parrot.monitoring.correctSpacing;partition=%d", p)), + correctSpacing: stats.NewBoolWithTags("parrot.monitoring.correctSpacing", fmt.Sprintf(";partition=%d", p)), } metricsBySeries = append(metricsBySeries, metrics) } From b5771cbe7a9cfe7ba71740015b93ef7ee06bd05f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 09:58:22 -0400 Subject: [PATCH 38/60] rename variables schema -> metric --- cmd/mt-parrot/generate.go | 16 ++++++++-------- cmd/mt-parrot/main.go | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/mt-parrot/generate.go b/cmd/mt-parrot/generate.go index 20e54cec9c..beb9630240 100644 --- a/cmd/mt-parrot/generate.go +++ b/cmd/mt-parrot/generate.go @@ -8,29 +8,29 @@ import ( "sync/atomic" ) -func produceTestMetrics(schemas []*schema.MetricData) { +func produceTestMetrics(metrics []*schema.MetricData) { for tick := range clock.AlignedTickLossless(testMetricsInterval) { - for _, metric := range schemas { + for _, metric := range metrics { metric.Time = tick.Unix() metric.Value = float64(tick.Unix()) } - publisher.Flush(schemas) + publisher.Flush(metrics) atomic.StoreInt64(&lastPublish, tick.Unix()) log.Infof("flushed metrics for ts %d", tick.Unix()) } } -//generateSchemas generates a MetricData that hashes to each of numPartitions partitions -func generateSchemas(numPartitions int32) []*schema.MetricData { +//generateMetrics generates a MetricData that hashes to each of numPartitions partitions +func generateMetrics(numPartitions int32) []*schema.MetricData { var metrics []*schema.MetricData for i := int32(0); i < numPartitions; i++ { - metrics = append(metrics, generateSchema(i)) + metrics = append(metrics, generateMetric(i)) } return metrics } -//generateSchema generates a single MetricData that hashes to the given partition -func generateSchema(desiredPartition int32) *schema.MetricData { +//generateMetric generates a single MetricData that hashes to the given partition +func generateMetric(desiredPartition int32) *schema.MetricData { metric := schema.MetricData{ OrgId: orgId, Unit: "partyparrots", diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index efee94485e..18b2c862dd 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -81,8 +81,8 @@ var parrotCmd = &cobra.Command{ initGateway() initStats() - schemas := generateSchemas(partitionCount) - go produceTestMetrics(schemas) + metrics := generateMetrics(partitionCount) + go produceTestMetrics(metrics) monitor() }, From 23a250ba07a5c7448344f9d8fe7ef4e0dc5cf4d8 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 17 Mar 2020 10:08:04 -0400 Subject: [PATCH 39/60] ensure that durations are multiples of 1s --- cmd/mt-parrot/main.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 18b2c862dd..95f226842b 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -72,6 +72,8 @@ var parrotCmd = &cobra.Command{ if err != nil { log.Fatalf("failed to parse log-level, %s", err.Error()) } + + validateDurationsInSeconds() if int(lookbackPeriod.Seconds())%int(testMetricsInterval.Seconds()) != 0 { log.Fatal("lookback period must be evenly divisible by test metrics interval") } @@ -117,3 +119,15 @@ func initStats() { //need to use a negative interval so we can manually set the report timestamps statsGraphite = stats.NewGraphite(prefix, statsAddr, -1, statsBufferSize, statsTimeout) } + +func validateDurationsInSeconds() { + if testMetricsInterval%time.Second != 0 { + log.Fatal("test-metrics-interval must be in seconds") + } + if queryInterval%time.Second != 0 { + log.Fatal("query-interval must be in seconds") + } + if lookbackPeriod%time.Second != 0 { + log.Fatal("lookback-period must be in seconds") + } +} From b2612af719dad0b6b2bac8a32e252f19d3303df7 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 27 Mar 2020 03:40:21 -0400 Subject: [PATCH 40/60] disable metrics2d0cs for parrot --- cmd/mt-parrot/monitor.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index b45c8526c8..7c53d292cc 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -121,19 +121,20 @@ func checkSpacing(points []graphite.Point) bool { func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ - // metric parrot.monitoring.nancount is the number of missing values for each series + //TODO enable metrics2docs by adding 'metric' prefix to each metric + // parrot.monitoring.nancount is the number of missing values for each series nanCount: stats.NewGauge32WithTags("parrot.monitoring.nancount", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.lag is the time since the last value was recorded + // parrot.monitoring.lag is the time since the last value was recorded lag: stats.NewGauge32WithTags("parrot.monitoring.lag", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values + // parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.nonMatching is the total number of entries where drift occurred + // parrot.monitoring.nonMatching is the total number of entries where drift occurred nonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonMatching", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.correctNumPoints is whether the expected number of points were received + // parrot.monitoring.correctNumPoints is whether the expected number of points were received correctNumPoints: stats.NewBoolWithTags("parrot.monitoring.correctNumPoints", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.correctAlignment is whether the last ts matches `now` + // parrot.monitoring.correctAlignment is whether the last ts matches `now` correctAlignment: stats.NewBoolWithTags("parrot.monitoring.correctAlignment", fmt.Sprintf(";partition=%d", p)), - // metric parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart + // parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart correctSpacing: stats.NewBoolWithTags("parrot.monitoring.correctSpacing", fmt.Sprintf(";partition=%d", p)), } metricsBySeries = append(metricsBySeries, metrics) From df72b2a917ff00a34c3bbef99677c7aa98fc2887 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 12 Mar 2020 13:47:09 +0200 Subject: [PATCH 41/60] make checkSpacing simpler --- cmd/mt-parrot/monitor.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 7c53d292cc..b2a4b7a57f 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -107,13 +107,12 @@ func processPartitionSeries(s graphite.Series, now time.Time) { } func checkSpacing(points []graphite.Point) bool { - previous := points[0].Ts for i := 1; i < len(points); i++ { - current := points[i].Ts - if current-previous != uint32(testMetricsInterval.Seconds()) { + prev := points[i-1].Ts + cur := points[i].Ts + if cur-prev != uint32(testMetricsInterval.Seconds()) { return false } - previous = current } return true } From 3cb22c58056f27bc771d05a8e13b29c23ecc93ac Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 21:46:37 +0300 Subject: [PATCH 42/60] clearer description --- cmd/mt-parrot/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 95f226842b..33a1bdd45e 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -1,6 +1,10 @@ package main import ( + "os" + "strings" + "time" + "github.com/grafana/metrictank/cmd/mt-fakemetrics/out" "github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet" "github.com/grafana/metrictank/logger" @@ -9,9 +13,6 @@ import ( "github.com/raintank/met/statsd" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "os" - "strings" - "time" ) var ( @@ -66,7 +67,7 @@ func main() { var parrotCmd = &cobra.Command{ Use: "mt-parrot", - Short: "generate deterministic metrics for each metrictank partition", + Short: "generate deterministic metrics for each metrictank partition, query them back and report on correctness", Run: func(cmd *cobra.Command, args []string) { lvl, err := log.ParseLevel(logLevel) if err != nil { From d8c46dda59e8c3c39c3e9023218a80f7d20b0f73 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:36:58 +0300 Subject: [PATCH 43/60] remove needless clause --- cmd/mt-parrot/main.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 33a1bdd45e..8d9990ba62 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -75,9 +75,6 @@ var parrotCmd = &cobra.Command{ } validateDurationsInSeconds() - if int(lookbackPeriod.Seconds())%int(testMetricsInterval.Seconds()) != 0 { - log.Fatal("lookback period must be evenly divisible by test metrics interval") - } log.SetLevel(lvl) parsePartitionMethod() From a03fda3f34dad975154376e69c7e13deb0406819 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:37:12 +0300 Subject: [PATCH 44/60] functions that can force quit the program should be called must* --- cmd/mt-parrot/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index 8d9990ba62..a1d8d2c8db 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -77,7 +77,7 @@ var parrotCmd = &cobra.Command{ validateDurationsInSeconds() log.SetLevel(lvl) - parsePartitionMethod() + mustParsePartitionMethod() initGateway() initStats() @@ -88,9 +88,9 @@ var parrotCmd = &cobra.Command{ }, } -//parsePartitionMethod parses the partitionScheme cli flag, -//exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot). -func parsePartitionMethod() { +// mustParsePartitionMethod parses the partitionScheme cli flag, +// exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot). +func mustParsePartitionMethod() { var err error partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString) if err != nil { From 08e2d4ba61741c724033f975d90b861b91ecb487 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:57:31 +0300 Subject: [PATCH 45/60] gofmt import paths --- cmd/mt-parrot/monitor.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index b2a4b7a57f..92d887df21 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -2,15 +2,17 @@ package main import ( "fmt" - "github.com/grafana/metrictank/clock" - "github.com/grafana/metrictank/stacktest/graphite" - "github.com/grafana/metrictank/stats" - log "github.com/sirupsen/logrus" "math" "net/http" "strconv" "sync/atomic" "time" + + "github.com/grafana/metrictank/clock" + "github.com/grafana/metrictank/stacktest/graphite" + "github.com/grafana/metrictank/stats" + "github.com/grafana/metrictank/util/align" + log "github.com/sirupsen/logrus" ) var ( From 07a9d1a1d13f43ff2c0e50390ce0cec688098bdb Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:39:45 +0300 Subject: [PATCH 46/60] consistency between series and partition stats --- cmd/mt-parrot/monitor.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 92d887df21..a9888a4f77 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -23,22 +23,24 @@ var ( var metricsBySeries []partitionMetrics -type seriesStats struct { - lastTs uint32 - nans int32 // the partition currently being checked - nope? +type seriesInfo struct { + lastTs uint32 // last timestamp seen in the response + + // to generate stats from + lastSeen uint32 // the last seen non-NaN time stamp (to generate lag from) deltaSum float64 // sum of abs(value - ts) across the time series - numNonMatching int32 // number of timestamps where value != ts - lastSeen uint32 // the last seen non-NaN time stamp (useful for lag) + numNans int32 // number of missing values for each series + numNonMatching int32 // number of points where value != ts correctNumPoints bool // whether the expected number of points were received correctAlignment bool // whether the last ts matches `now` correctSpacing bool // whether all points are sorted and 1 period apart } type partitionMetrics struct { - nanCount *stats.Gauge32 // the number of missing values for each series lag *stats.Gauge32 // time since the last value was recorded - deltaSum *stats.Gauge32 // the total amount of drift between expected value and actual values - nonMatching *stats.Gauge32 // total number of entries where drift occurred + deltaSum *stats.Gauge32 // total amount of drift between expected value and actual values + numNans *stats.Gauge32 // number of missing values for each series + numNonMatching *stats.Gauge32 // number of points where value != ts correctNumPoints *stats.Bool // whether the expected number of points were received correctAlignment *stats.Bool // whether the last ts matches `now` correctSpacing *stats.Bool // whether all points are sorted and 1 period apart @@ -78,7 +80,7 @@ func processPartitionSeries(s graphite.Series, now time.Time) { return } - serStats := seriesStats{} + serStats := seriesInfo{} serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts serStats.correctAlignment = int64(serStats.lastTs) == now.Unix() serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)+1 @@ -86,7 +88,7 @@ func processPartitionSeries(s graphite.Series, now time.Time) { for _, dp := range s.Datapoints { if math.IsNaN(dp.Val) { - serStats.nans += 1 + serStats.numNans += 1 continue } serStats.lastSeen = dp.Ts @@ -98,11 +100,11 @@ func processPartitionSeries(s graphite.Series, now time.Time) { } metrics := metricsBySeries[partition] - metrics.nanCount.Set(int(serStats.nans)) + metrics.numNans.Set(int(serStats.numNans)) lag := atomic.LoadInt64(&lastPublish) - int64(serStats.lastSeen) metrics.lag.Set(int(lag)) metrics.deltaSum.Set(int(serStats.deltaSum)) - metrics.nonMatching.Set(int(serStats.numNonMatching)) + metrics.numNonMatching.Set(int(serStats.numNonMatching)) metrics.correctNumPoints.Set(serStats.correctNumPoints) metrics.correctAlignment.Set(serStats.correctAlignment) metrics.correctSpacing.Set(serStats.correctSpacing) @@ -123,14 +125,14 @@ func initMetricsBySeries() { for p := 0; p < int(partitionCount); p++ { metrics := partitionMetrics{ //TODO enable metrics2docs by adding 'metric' prefix to each metric - // parrot.monitoring.nancount is the number of missing values for each series - nanCount: stats.NewGauge32WithTags("parrot.monitoring.nancount", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.nans is the number of missing values for each series + numNans: stats.NewGauge32WithTags("parrot.monitoring.nans", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.lag is the time since the last value was recorded lag: stats.NewGauge32WithTags("parrot.monitoring.lag", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.nonMatching is the total number of entries where drift occurred - nonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonMatching", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.nonmatching is the total number of entries where drift occurred + numNonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonmatching", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.correctNumPoints is whether the expected number of points were received correctNumPoints: stats.NewBoolWithTags("parrot.monitoring.correctNumPoints", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.correctAlignment is whether the last ts matches `now` From 9ab9906fa977e45835b6bcdd355d201ab3538d3a Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:45:42 +0300 Subject: [PATCH 47/60] need a lossy ticker for the checker routine we wouldn't want to process old ticks with a lag as that would produce incorrect outcomes --- cmd/mt-parrot/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index a9888a4f77..86bb268f7b 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -48,7 +48,7 @@ type partitionMetrics struct { func monitor() { initMetricsBySeries() - for tick := range clock.AlignedTickLossless(queryInterval) { + for tick := range clock.AlignedTickLossy(queryInterval) { query := graphite.ExecuteRenderQuery(buildRequest(tick)) if query.HTTPErr != nil { From 2095f9f1931d7d9b75edf64654752520d518fe9f Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:46:31 +0300 Subject: [PATCH 48/60] query response is a response --- cmd/mt-parrot/monitor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 86bb268f7b..c10f154e52 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -50,17 +50,17 @@ func monitor() { initMetricsBySeries() for tick := range clock.AlignedTickLossy(queryInterval) { - query := graphite.ExecuteRenderQuery(buildRequest(tick)) - if query.HTTPErr != nil { + resp := graphite.ExecuteRenderQuery(buildRequest(tick)) + if resp.HTTPErr != nil { httpError.Inc() continue } - if query.DecodeErr != nil { + if resp.DecodeErr != nil { decodeError.Inc() continue } - for _, s := range query.Decoded { + for _, s := range resp.Decoded { processPartitionSeries(s, tick) } statsGraphite.Report(tick) From 9281a5a85ad8148b9a7675c5eac0fd5ef4c8e687 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:47:23 +0300 Subject: [PATCH 49/60] fix off-by-one's --- cmd/mt-parrot/monitor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index c10f154e52..53322ec7e2 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -83,7 +83,7 @@ func processPartitionSeries(s graphite.Series, now time.Time) { serStats := seriesInfo{} serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts serStats.correctAlignment = int64(serStats.lastTs) == now.Unix() - serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)+1 + serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval) serStats.correctSpacing = checkSpacing(s.Datapoints) for _, dp := range s.Datapoints { @@ -148,7 +148,7 @@ func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() q.Set("target", "aliasByNode(parrot.testdata.*.identity.*, 2)") - q.Set("from", strconv.Itoa(int(now.Add(-1*lookbackPeriod).Unix()-1))) + q.Set("from", strconv.Itoa(int(now.Add(-1*lookbackPeriod).Unix()))) q.Set("until", strconv.Itoa(int(now.Unix()))) q.Set("format", "json") q.Set("X-Org-Id", strconv.Itoa(orgId)) From 2cb97a68ec163c020d7694a74878d17f5aa51a33 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:53:52 +0300 Subject: [PATCH 50/60] make alignForward/alignBackward functions reusable --- api/dataprocessor.go | 27 ++++++--------------- api/dataprocessor_test.go | 46 ------------------------------------ expr/normalize.go | 14 +++-------- util/align/align.go | 28 ++++++++++++++++++++++ util/align/align_test.go | 49 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 87 insertions(+), 77 deletions(-) create mode 100644 util/align/align.go create mode 100644 util/align/align_test.go diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 2fe9ef42d4..17afb7963c 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -10,6 +10,7 @@ import ( "time" "github.com/grafana/metrictank/expr" + "github.com/grafana/metrictank/util/align" "github.com/grafana/metrictank/api/models" "github.com/grafana/metrictank/consolidation" @@ -53,20 +54,6 @@ type getTargetsResp struct { err error } -// alignForward aligns ts to the next timestamp that divides by the interval, except if it is already aligned -func alignForward(ts, interval uint32) uint32 { - remain := ts % interval - if remain == 0 { - return ts - } - return ts + interval - remain -} - -// alignBackward aligns the ts to the previous ts that divides by the interval, even if it is already aligned -func alignBackward(ts uint32, interval uint32) uint32 { - return ts - ((ts-1)%interval + 1) -} - // Fix assures a series is in quantized form: // all points are nicely aligned (quantized) and padded with nulls in case there's gaps in data // graphite does this quantization before storing, we may want to do that as well at some point @@ -75,8 +62,8 @@ func alignBackward(ts uint32, interval uint32) uint32 { // values to earlier in time. func Fix(in []schema.Point, from, to, interval uint32) []schema.Point { - first := alignForward(from, interval) - last := alignBackward(to, interval) + first := align.ForwardIfNotAligned(from, interval) + last := align.Backward(to, interval) if last < first { // the requested range is too narrow for the requested interval @@ -740,8 +727,8 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client. if req.Archive == 0 { - rc.From = alignBackward(req.From, req.ArchInterval) + 1 - rc.To = alignBackward(req.To, req.ArchInterval) + 1 + rc.From = align.Backward(req.From, req.ArchInterval) + 1 + rc.To = align.Backward(req.To, req.ArchInterval) + 1 rc.AMKey = schema.AMKey{MKey: req.MKey} } else { rc.From = req.From @@ -766,7 +753,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // but because we eventually want to consolidate into a point with ts=60, we also need the points that will be fix-adjusted to 40 and 50. // so From needs to be lowered by 20 to become 35 (or 31 if adjusted). - boundary := alignForward(rc.From, req.OutInterval) + boundary := align.ForwardIfNotAligned(rc.From, req.OutInterval) rewind := req.AggNum * req.ArchInterval if boundary < rewind { panic(fmt.Sprintf("Cannot rewind far back enough (trying to rewind by %d from timestamp %d)", rewind, boundary)) @@ -792,7 +779,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // 240 - 211 - ...,210 - 210 - 210 - 210 //*240->231 - 211 - ...,210 - 210 - 210 - 210 - rc.To = alignBackward(rc.To, req.OutInterval) + 1 + rc.To = align.Backward(rc.To, req.OutInterval) + 1 } return &rc diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index db9e904594..1fdd8d863e 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -306,52 +306,6 @@ func TestFix(t *testing.T) { } -type pbCase struct { - ts uint32 - span uint32 - boundary uint32 -} - -func TestAlignForward(t *testing.T) { - cases := []pbCase{ - {1, 60, 60}, - {2, 60, 60}, - {3, 60, 60}, - {57, 60, 60}, - {58, 60, 60}, - {59, 60, 60}, - {60, 60, 60}, - {61, 60, 120}, - {62, 60, 120}, - {63, 60, 120}, - } - for _, c := range cases { - if ret := alignForward(c.ts, c.span); ret != c.boundary { - t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret) - } - } -} - -func TestAlignBackward(t *testing.T) { - cases := []pbCase{ - {1, 60, 0}, - {2, 60, 0}, - {3, 60, 0}, - {57, 60, 0}, - {58, 60, 0}, - {59, 60, 0}, - {60, 60, 0}, - {61, 60, 60}, - {62, 60, 60}, - {63, 60, 60}, - } - for _, c := range cases { - if ret := alignBackward(c.ts, c.span); ret != c.boundary { - t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret) - } - } -} - // TestGetSeriesFixed assures that series data is returned in proper form. // for each case, we generate a new series of 5 points to cover every possible combination of: // * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1 diff --git a/expr/normalize.go b/expr/normalize.go index 68af0e7f03..2a85d19291 100644 --- a/expr/normalize.go +++ b/expr/normalize.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/metrictank/consolidation" "github.com/grafana/metrictank/schema" "github.com/grafana/metrictank/util" + "github.com/grafana/metrictank/util/align" ) // Normalize normalizes series to the same common LCM interval - if they don't already have the same interval @@ -66,7 +67,7 @@ func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Seri // or more generally (you can follow any example vertically): // 5 10 15 20 25 30 35 40 45 50 <-- if any of these timestamps are your first point in `in` // 5 5 5 20 20 20 35 35 35 50 <-- then these are the corresponding timestamps of the first values we want as input for the consolidator - // 15 15 15 30 30 30 45 45 45 60 <-- which, when fed through alignForward(), result in these numbers + // 15 15 15 30 30 30 45 45 45 60 <-- which, when fed through alignForwardIfNotAligned(), result in these numbers // 5 5 5 20 20 20 35 35 35 50 <-- subtract (aggnum-1)* in.interval or equivalent -interval + in.Interval = -15 + 5 = -10. these are our desired numbers! // now, for the final value, it's important to be aware of cases like this: @@ -75,7 +76,7 @@ func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Seri // (it breaches `to`, and may have more points than other series it needs to be combined with) // thus, we also need to potentially trim points from the back until the last point has the same Ts as a canonical series would - for ts := alignForward(in.Datapoints[0].Ts, interval) - interval + in.Interval; ts < in.Datapoints[0].Ts; ts += interval { + for ts := align.ForwardIfNotAligned(in.Datapoints[0].Ts, interval) - interval + in.Interval; ts < in.Datapoints[0].Ts; ts += interval { datapoints = append(datapoints, schema.Point{Val: math.NaN(), Ts: ts}) } @@ -90,12 +91,3 @@ func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Seri dataMap.Add(Req{}, in) return in } - -// alignForward aligns ts to the next timestamp that divides by the interval, except if it is already aligned -func alignForward(ts, interval uint32) uint32 { - remain := ts % interval - if remain == 0 { - return ts - } - return ts + interval - remain -} diff --git a/util/align/align.go b/util/align/align.go new file mode 100644 index 0000000000..fa062a4641 --- /dev/null +++ b/util/align/align.go @@ -0,0 +1,28 @@ +package align + +// Backward aligns the ts to the previous ts that divides by the interval, even if it is already aligned +func Backward(ts uint32, interval uint32) uint32 { + return ts - ((ts-1)%interval + 1) +} + +// BackwardIfNotAligned aligns the ts to the previous ts that divides by the interval, except if it is already aligned +func BackwardIfNotAligned(ts uint32, interval uint32) uint32 { + if ts%interval == 0 { + return ts + } + return Backward(ts, interval) +} + +// Forward aligns ts to the next timestamp that divides by the interval, even if it is already aligned +func Forward(ts, interval uint32) uint32 { + remain := ts % interval + return ts + interval - remain +} + +// ForwardIfNotAligned aligns ts to the next timestamp that divides by the interval, except if it is already aligned +func ForwardIfNotAligned(ts, interval uint32) uint32 { + if ts%interval == 0 { + return ts + } + return Forward(ts, interval) +} diff --git a/util/align/align_test.go b/util/align/align_test.go new file mode 100644 index 0000000000..04a1ab25b5 --- /dev/null +++ b/util/align/align_test.go @@ -0,0 +1,49 @@ +package align + +import "testing" + +type pbCase struct { + ts uint32 + span uint32 + boundary uint32 +} + +func TestForwardIfNotAligned(t *testing.T) { + cases := []pbCase{ + {1, 60, 60}, + {2, 60, 60}, + {3, 60, 60}, + {57, 60, 60}, + {58, 60, 60}, + {59, 60, 60}, + {60, 60, 60}, + {61, 60, 120}, + {62, 60, 120}, + {63, 60, 120}, + } + for _, c := range cases { + if ret := ForwardIfNotAligned(c.ts, c.span); ret != c.boundary { + t.Fatalf("ForwardIfNotAligned for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret) + } + } +} + +func TestBackward(t *testing.T) { + cases := []pbCase{ + {1, 60, 0}, + {2, 60, 0}, + {3, 60, 0}, + {57, 60, 0}, + {58, 60, 0}, + {59, 60, 0}, + {60, 60, 0}, + {61, 60, 60}, + {62, 60, 60}, + {63, 60, 60}, + } + for _, c := range cases { + if ret := Backward(c.ts, c.span); ret != c.boundary { + t.Fatalf("Backward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret) + } + } +} From 4f3518fc1c7a2a5d910f7160d47ab7af01d698ed Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 22:48:41 +0300 Subject: [PATCH 51/60] fix check of last timestamp. needs to be aligned --- cmd/mt-parrot/monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 53322ec7e2..60ab924c32 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -81,8 +81,9 @@ func processPartitionSeries(s graphite.Series, now time.Time) { } serStats := seriesInfo{} + lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts - serStats.correctAlignment = int64(serStats.lastTs) == now.Unix() + serStats.correctAlignment = serStats.lastTs == lastTs serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval) serStats.correctSpacing = checkSpacing(s.Datapoints) From bb73e5eb4d3a71b8edd9f1674a6af387fff4da52 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 23:14:11 +0300 Subject: [PATCH 52/60] clean up code by using constructor --- cmd/mt-parrot/monitor.go | 49 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 60ab924c32..1d46f5b46d 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -23,6 +23,12 @@ var ( var metricsBySeries []partitionMetrics +func initMetricsBySeries() { + for p := 0; p < int(partitionCount); p++ { + metricsBySeries = append(metricsBySeries, NewPartitionMetrics(p)) + } +} + type seriesInfo struct { lastTs uint32 // last timestamp seen in the response @@ -46,6 +52,26 @@ type partitionMetrics struct { correctSpacing *stats.Bool // whether all points are sorted and 1 period apart } +func NewPartitionMetrics(p int) partitionMetrics { + return partitionMetrics{ + //TODO enable metrics2docs by adding 'metric' prefix to each metric + // parrot.monitoring.lag is the time since the last value was recorded + lag: stats.NewGauge32WithTags("parrot.monitoring.lag", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values + deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.nans is the number of missing values for each series + numNans: stats.NewGauge32WithTags("parrot.monitoring.nans", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.nonmatching is the total number of entries where drift occurred + numNonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonmatching", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.correctNumPoints is whether the expected number of points were received + correctNumPoints: stats.NewBoolWithTags("parrot.monitoring.correctNumPoints", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.correctAlignment is whether the last ts matches `now` + correctAlignment: stats.NewBoolWithTags("parrot.monitoring.correctAlignment", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart + correctSpacing: stats.NewBoolWithTags("parrot.monitoring.correctSpacing", fmt.Sprintf(";partition=%d", p)), + } +} + func monitor() { initMetricsBySeries() for tick := range clock.AlignedTickLossy(queryInterval) { @@ -122,29 +148,6 @@ func checkSpacing(points []graphite.Point) bool { return true } -func initMetricsBySeries() { - for p := 0; p < int(partitionCount); p++ { - metrics := partitionMetrics{ - //TODO enable metrics2docs by adding 'metric' prefix to each metric - // parrot.monitoring.nans is the number of missing values for each series - numNans: stats.NewGauge32WithTags("parrot.monitoring.nans", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.lag is the time since the last value was recorded - lag: stats.NewGauge32WithTags("parrot.monitoring.lag", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.deltaSum is the total amount of drift between expected value and actual values - deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.nonmatching is the total number of entries where drift occurred - numNonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonmatching", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.correctNumPoints is whether the expected number of points were received - correctNumPoints: stats.NewBoolWithTags("parrot.monitoring.correctNumPoints", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.correctAlignment is whether the last ts matches `now` - correctAlignment: stats.NewBoolWithTags("parrot.monitoring.correctAlignment", fmt.Sprintf(";partition=%d", p)), - // parrot.monitoring.correctSpacing is whether all points are sorted and 1 period apart - correctSpacing: stats.NewBoolWithTags("parrot.monitoring.correctSpacing", fmt.Sprintf(";partition=%d", p)), - } - metricsBySeries = append(metricsBySeries, metrics) - } -} - func buildRequest(now time.Time) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() From f88aad6ac2efd5eacf4eb332ecb25a3d05b21cab Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 23:17:44 +0300 Subject: [PATCH 53/60] remove bogus clause --- cmd/mt-parrot/monitor.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 1d46f5b46d..f4c63495ac 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -100,11 +100,6 @@ func processPartitionSeries(s graphite.Series, now time.Time) { invalidError.Inc() return } - if len(s.Datapoints) < 2 { - log.Debugf("partition has invalid number of datapoints: %d", len(s.Datapoints)) - invalidError.Inc() - return - } serStats := seriesInfo{} lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) From 4a53d2234f411c91b8d850c19ee8e875ff81157c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 23:18:01 +0300 Subject: [PATCH 54/60] consistent parrot.monitoring.error metrics: once per response --- cmd/mt-parrot/monitor.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index f4c63495ac..422e2d23af 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -86,19 +86,25 @@ func monitor() { continue } + var invalid bool for _, s := range resp.Decoded { - processPartitionSeries(s, tick) + ok := processPartitionSeries(s, tick) + if !ok { + invalid = true + } + } + if invalid { + invalidError.Inc() } statsGraphite.Report(tick) } } -func processPartitionSeries(s graphite.Series, now time.Time) { +func processPartitionSeries(s graphite.Series, now time.Time) bool { partition, err := strconv.Atoi(s.Target) if err != nil { log.Debug("unable to parse partition", err) - invalidError.Inc() - return + return false } serStats := seriesInfo{} @@ -130,6 +136,8 @@ func processPartitionSeries(s graphite.Series, now time.Time) { metrics.correctNumPoints.Set(serStats.correctNumPoints) metrics.correctAlignment.Set(serStats.correctAlignment) metrics.correctSpacing.Set(serStats.correctSpacing) + + return true } func checkSpacing(points []graphite.Point) bool { From 10a230446eb0353dc2dbf8c6632518a1742d2beb Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 23:36:36 +0300 Subject: [PATCH 55/60] don't round deltaSum down. we need to know every difference --- cmd/mt-parrot/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 422e2d23af..8c35e0def2 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -131,7 +131,7 @@ func processPartitionSeries(s graphite.Series, now time.Time) bool { metrics.numNans.Set(int(serStats.numNans)) lag := atomic.LoadInt64(&lastPublish) - int64(serStats.lastSeen) metrics.lag.Set(int(lag)) - metrics.deltaSum.Set(int(serStats.deltaSum)) + metrics.deltaSum.Set(int(math.Ceil(serStats.deltaSum))) metrics.numNonMatching.Set(int(serStats.numNonMatching)) metrics.correctNumPoints.Set(serStats.correctNumPoints) metrics.correctAlignment.Set(serStats.correctAlignment) From 1045f17b14a5518b05d9cf81e98dd9f42a082cfb Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 30 Mar 2020 23:38:02 +0300 Subject: [PATCH 56/60] simplify stats setting --- cmd/mt-parrot/monitor.go | 67 +++++++++++++++------------------------- 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 8c35e0def2..46a19f2b85 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -29,19 +29,6 @@ func initMetricsBySeries() { } } -type seriesInfo struct { - lastTs uint32 // last timestamp seen in the response - - // to generate stats from - lastSeen uint32 // the last seen non-NaN time stamp (to generate lag from) - deltaSum float64 // sum of abs(value - ts) across the time series - numNans int32 // number of missing values for each series - numNonMatching int32 // number of points where value != ts - correctNumPoints bool // whether the expected number of points were received - correctAlignment bool // whether the last ts matches `now` - correctSpacing bool // whether all points are sorted and 1 period apart -} - type partitionMetrics struct { lag *stats.Gauge32 // time since the last value was recorded deltaSum *stats.Gauge32 // total amount of drift between expected value and actual values @@ -107,47 +94,43 @@ func processPartitionSeries(s graphite.Series, now time.Time) bool { return false } - serStats := seriesInfo{} lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) - serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts - serStats.correctAlignment = serStats.lastTs == lastTs - serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval) - serStats.correctSpacing = checkSpacing(s.Datapoints) - for _, dp := range s.Datapoints { + var nans, nonMatching, lastSeen uint32 + var deltaSum float64 + + goodSpacing := true + + for i, dp := range s.Datapoints { + if i > 0 { + prev := s.Datapoints[i-1] + if dp.Ts-prev.Ts != uint32(testMetricsInterval.Seconds()) { + goodSpacing = false + } + } + if math.IsNaN(dp.Val) { - serStats.numNans += 1 + nans++ continue } - serStats.lastSeen = dp.Ts + lastSeen = dp.Ts if diff := dp.Val - float64(dp.Ts); diff != 0 { log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) - serStats.deltaSum += diff - serStats.numNonMatching += 1 + deltaSum += diff + nonMatching++ } } metrics := metricsBySeries[partition] - metrics.numNans.Set(int(serStats.numNans)) - lag := atomic.LoadInt64(&lastPublish) - int64(serStats.lastSeen) - metrics.lag.Set(int(lag)) - metrics.deltaSum.Set(int(math.Ceil(serStats.deltaSum))) - metrics.numNonMatching.Set(int(serStats.numNonMatching)) - metrics.correctNumPoints.Set(serStats.correctNumPoints) - metrics.correctAlignment.Set(serStats.correctAlignment) - metrics.correctSpacing.Set(serStats.correctSpacing) - - return true -} + metrics.numNans.SetUint32(nans) + lag := uint32(atomic.LoadInt64(&lastPublish)) - lastSeen + metrics.lag.SetUint32(lag) + metrics.deltaSum.Set(int(math.Ceil(deltaSum))) + metrics.numNonMatching.SetUint32(nonMatching) + metrics.correctNumPoints.Set(len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)) + metrics.correctAlignment.Set(s.Datapoints[len(s.Datapoints)-1].Ts == lastTs) + metrics.correctSpacing.Set(goodSpacing) -func checkSpacing(points []graphite.Point) bool { - for i := 1; i < len(points); i++ { - prev := points[i-1].Ts - cur := points[i].Ts - if cur-prev != uint32(testMetricsInterval.Seconds()) { - return false - } - } return true } From 7a1671e5f0be38266f2e0cc8b55757384c7eab73 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 31 Mar 2020 00:18:37 +0300 Subject: [PATCH 57/60] fix invalid check. see https://github.com/grafana/metrictank/pull/1680#discussion_r384721121 --- cmd/mt-parrot/monitor.go | 45 ++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 46a19f2b85..1b2ed08e88 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -73,13 +73,36 @@ func monitor() { continue } - var invalid bool + seenPartitions := make(map[int]struct{}) + + // the response should contain partitionCount series entries, each of which named by a number, + // covering each partition exactly once. otherwise is invalid. + invalid := len(resp.Decoded) == int(partitionCount) + for _, s := range resp.Decoded { - ok := processPartitionSeries(s, tick) + partition, err := strconv.Atoi(s.Target) + if err != nil { + log.Debug("unable to parse partition", err) + invalid = true + } else { + _, ok := seenPartitions[partition] + if ok { + // should not see same partition twice! + invalid = true + } else { + processPartitionSeries(s.Datapoints, partition, tick) + } + } + } + + // check whether we encountered all partitions we expected + for p := 0; p < int(partitionCount); p++ { + _, ok := seenPartitions[p] if !ok { invalid = true } } + if invalid { invalidError.Inc() } @@ -87,13 +110,7 @@ func monitor() { } } -func processPartitionSeries(s graphite.Series, now time.Time) bool { - partition, err := strconv.Atoi(s.Target) - if err != nil { - log.Debug("unable to parse partition", err) - return false - } - +func processPartitionSeries(points []graphite.Point, partition int, now time.Time) { lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) var nans, nonMatching, lastSeen uint32 @@ -101,9 +118,9 @@ func processPartitionSeries(s graphite.Series, now time.Time) bool { goodSpacing := true - for i, dp := range s.Datapoints { + for i, dp := range points { if i > 0 { - prev := s.Datapoints[i-1] + prev := points[i-1] if dp.Ts-prev.Ts != uint32(testMetricsInterval.Seconds()) { goodSpacing = false } @@ -127,11 +144,9 @@ func processPartitionSeries(s graphite.Series, now time.Time) bool { metrics.lag.SetUint32(lag) metrics.deltaSum.Set(int(math.Ceil(deltaSum))) metrics.numNonMatching.SetUint32(nonMatching) - metrics.correctNumPoints.Set(len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)) - metrics.correctAlignment.Set(s.Datapoints[len(s.Datapoints)-1].Ts == lastTs) + metrics.correctNumPoints.Set(len(points) == int(lookbackPeriod/testMetricsInterval)) + metrics.correctAlignment.Set(points[len(points)-1].Ts == lastTs) metrics.correctSpacing.Set(goodSpacing) - - return true } func buildRequest(now time.Time) *http.Request { From 200b0c0c93c0d45c1242b67a882a733c60d078d5 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 31 Mar 2020 00:35:56 +0300 Subject: [PATCH 58/60] make it easy to monitor for correctness introduce "post nan points" concept --- cmd/mt-parrot/main.go | 17 ++++++++++++-- cmd/mt-parrot/monitor.go | 50 ++++++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/cmd/mt-parrot/main.go b/cmd/mt-parrot/main.go index a1d8d2c8db..0c8314391e 100644 --- a/cmd/mt-parrot/main.go +++ b/cmd/mt-parrot/main.go @@ -66,8 +66,21 @@ func main() { } var parrotCmd = &cobra.Command{ - Use: "mt-parrot", - Short: "generate deterministic metrics for each metrictank partition, query them back and report on correctness", + Use: "mt-parrot", + Short: `generate deterministic metrics for each metrictank partition, query them back and report on correctness and performance + +Correctness: + Monitor the parrot.monitoring.error series. There's 3 potential issues: + + * parrot.monitoring.error;error=http // could not execute http request + * parrot.monitoring.error;error=decode // could not decode http response + * parrot.monitoring.error;error=invalid // any other problem with the response itself + +Performance: + In addition to these black-and-white measurements above, there are also more subjective measurements + * parrot.monitoring.lag // how far the response is lagging behind + * parrot.monitoring.nans // number of nans included in the response + `, Run: func(cmd *cobra.Command, args []string) { lvl, err := log.ParseLevel(logLevel) if err != nil { diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 1b2ed08e88..94772c098f 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -16,9 +16,9 @@ import ( ) var ( - httpError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=http") - decodeError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=decode") - invalidError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=invalid") + httpError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=http") // could not execute http request + decodeError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=decode") // could not decode http response + invalidError = stats.NewCounter32WithTags("parrot.monitoring.error", ";error=invalid") // any other problem with the response itself ) var metricsBySeries []partitionMetrics @@ -33,6 +33,7 @@ type partitionMetrics struct { lag *stats.Gauge32 // time since the last value was recorded deltaSum *stats.Gauge32 // total amount of drift between expected value and actual values numNans *stats.Gauge32 // number of missing values for each series + numPostNans *stats.Gauge32 // number of non-null values for each series that come after a nan value numNonMatching *stats.Gauge32 // number of points where value != ts correctNumPoints *stats.Bool // whether the expected number of points were received correctAlignment *stats.Bool // whether the last ts matches `now` @@ -48,6 +49,8 @@ func NewPartitionMetrics(p int) partitionMetrics { deltaSum: stats.NewGauge32WithTags("parrot.monitoring.deltaSum", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.nans is the number of missing values for each series numNans: stats.NewGauge32WithTags("parrot.monitoring.nans", fmt.Sprintf(";partition=%d", p)), + // parrot.monitoring.postnans is the number of non-null values for each series that come after a nan value + numPostNans: stats.NewGauge32WithTags("parrot.monitoring.postnans", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.nonmatching is the total number of entries where drift occurred numNonMatching: stats.NewGauge32WithTags("parrot.monitoring.nonmatching", fmt.Sprintf(";partition=%d", p)), // parrot.monitoring.correctNumPoints is whether the expected number of points were received @@ -76,7 +79,7 @@ func monitor() { seenPartitions := make(map[int]struct{}) // the response should contain partitionCount series entries, each of which named by a number, - // covering each partition exactly once. otherwise is invalid. + // covering each partition exactly once. and each partition series should be correct. otherwise is invalid. invalid := len(resp.Decoded) == int(partitionCount) for _, s := range resp.Decoded { @@ -90,7 +93,10 @@ func monitor() { // should not see same partition twice! invalid = true } else { - processPartitionSeries(s.Datapoints, partition, tick) + ok := processPartitionSeries(s.Datapoints, partition, tick) + if !ok { + invalid = true + } } } } @@ -110,19 +116,23 @@ func monitor() { } } -func processPartitionSeries(points []graphite.Point, partition int, now time.Time) { +// a,b,c,null,null <-- valid response. just a bit laggy +// a,null,c,d,null <- invalid response. + +func processPartitionSeries(points []graphite.Point, partition int, now time.Time) bool { lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) var nans, nonMatching, lastSeen uint32 var deltaSum float64 + var postNanPoints uint32 - goodSpacing := true + correctSpacing := true for i, dp := range points { if i > 0 { prev := points[i-1] if dp.Ts-prev.Ts != uint32(testMetricsInterval.Seconds()) { - goodSpacing = false + correctSpacing = false } } @@ -130,6 +140,11 @@ func processPartitionSeries(points []graphite.Point, partition int, now time.Tim nans++ continue } + + if nans > 0 { + postNanPoints++ + } + lastSeen = dp.Ts if diff := dp.Val - float64(dp.Ts); diff != 0 { log.Debugf("partition=%d dp.Val=%f dp.Ts=%d diff=%f", partition, dp.Val, dp.Ts, diff) @@ -139,14 +154,25 @@ func processPartitionSeries(points []graphite.Point, partition int, now time.Tim } metrics := metricsBySeries[partition] - metrics.numNans.SetUint32(nans) lag := uint32(atomic.LoadInt64(&lastPublish)) - lastSeen + metrics.lag.SetUint32(lag) metrics.deltaSum.Set(int(math.Ceil(deltaSum))) + metrics.numNans.SetUint32(nans) + metrics.numPostNans.SetUint32(postNanPoints) metrics.numNonMatching.SetUint32(nonMatching) - metrics.correctNumPoints.Set(len(points) == int(lookbackPeriod/testMetricsInterval)) - metrics.correctAlignment.Set(points[len(points)-1].Ts == lastTs) - metrics.correctSpacing.Set(goodSpacing) + + correctNumPoints := len(points) == int(lookbackPeriod/testMetricsInterval) + correctAlignment := points[len(points)-1].Ts == lastTs + + metrics.correctNumPoints.Set(correctNumPoints) + metrics.correctAlignment.Set(correctAlignment) + metrics.correctSpacing.Set(correctSpacing) + + // we allow for lag and nans, that's subjective and should be monitored separately. + // but we will say it's invalid if any of the clear-cut signals say so. + return postNanPoints == 0 && nonMatching == 0 && correctNumPoints && correctAlignment && correctSpacing + } func buildRequest(now time.Time) *http.Request { From e8b845c09eefc5ed2166200f454ec429958502dc Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 31 Mar 2020 01:18:26 +0300 Subject: [PATCH 59/60] fix lastPoint and num points check --- cmd/mt-parrot/monitor.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/cmd/mt-parrot/monitor.go b/cmd/mt-parrot/monitor.go index 94772c098f..7ccdc25671 100644 --- a/cmd/mt-parrot/monitor.go +++ b/cmd/mt-parrot/monitor.go @@ -6,7 +6,6 @@ import ( "net/http" "strconv" "sync/atomic" - "time" "github.com/grafana/metrictank/clock" "github.com/grafana/metrictank/stacktest/graphite" @@ -66,7 +65,9 @@ func monitor() { initMetricsBySeries() for tick := range clock.AlignedTickLossy(queryInterval) { - resp := graphite.ExecuteRenderQuery(buildRequest(tick)) + from := uint32(tick.Add(-1 * lookbackPeriod).Unix()) + to := uint32(tick.Unix()) + resp := graphite.ExecuteRenderQuery(buildRequest(from, to)) if resp.HTTPErr != nil { httpError.Inc() continue @@ -93,7 +94,7 @@ func monitor() { // should not see same partition twice! invalid = true } else { - ok := processPartitionSeries(s.Datapoints, partition, tick) + ok := processPartitionSeries(s.Datapoints, partition, from, to) if !ok { invalid = true } @@ -119,8 +120,7 @@ func monitor() { // a,b,c,null,null <-- valid response. just a bit laggy // a,null,c,d,null <- invalid response. -func processPartitionSeries(points []graphite.Point, partition int, now time.Time) bool { - lastTs := align.Forward(uint32(now.Unix()), uint32(testMetricsInterval.Seconds())) +func processPartitionSeries(points []graphite.Point, partition int, from, to uint32) bool { var nans, nonMatching, lastSeen uint32 var deltaSum float64 @@ -162,8 +162,13 @@ func processPartitionSeries(points []graphite.Point, partition int, now time.Tim metrics.numPostNans.SetUint32(postNanPoints) metrics.numNonMatching.SetUint32(nonMatching) - correctNumPoints := len(points) == int(lookbackPeriod/testMetricsInterval) - correctAlignment := points[len(points)-1].Ts == lastTs + // render api has from exclusive, to inclusive. + expFirstTs := align.ForwardIfNotAligned(from+1, uint32(testMetricsInterval.Seconds())) + expLastTs := align.BackwardIfNotAligned(to, uint32(testMetricsInterval.Seconds())) + expNumPoints := int((expLastTs-expFirstTs)/uint32(testMetricsInterval.Seconds()) + 1) + + correctNumPoints := len(points) == expNumPoints + correctAlignment := points[len(points)-1].Ts == expLastTs metrics.correctNumPoints.Set(correctNumPoints) metrics.correctAlignment.Set(correctAlignment) @@ -175,12 +180,12 @@ func processPartitionSeries(points []graphite.Point, partition int, now time.Tim } -func buildRequest(now time.Time) *http.Request { +func buildRequest(from, to uint32) *http.Request { req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) q := req.URL.Query() q.Set("target", "aliasByNode(parrot.testdata.*.identity.*, 2)") - q.Set("from", strconv.Itoa(int(now.Add(-1*lookbackPeriod).Unix()))) - q.Set("until", strconv.Itoa(int(now.Unix()))) + q.Set("from", strconv.Itoa(int(from))) + q.Set("until", strconv.Itoa(int(to))) q.Set("format", "json") q.Set("X-Org-Id", strconv.Itoa(orgId)) req.URL.RawQuery = q.Encode() From 5c3aace0eb78765bc24c17ff2c8754d40032478d Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Apr 2020 11:48:16 +0300 Subject: [PATCH 60/60] scripts/dev/tools-to-doc.sh > docs/tools.md --- docs/tools.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/tools.md b/docs/tools.md index 038d510827..c6b2c13730 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -537,7 +537,19 @@ It prints the MKey ## mt-parrot ``` -generate deterministic metrics for each metrictank partition +generate deterministic metrics for each metrictank partition, query them back and report on correctness and performance + +Correctness: + Monitor the parrot.monitoring.error series. There's 3 potential issues: + + * parrot.monitoring.error;error=http // could not execute http request + * parrot.monitoring.error;error=decode // could not decode http response + * parrot.monitoring.error;error=invalid // any other problem with the response itself + +Performance: + In addition to these black-and-white measurements above, there are also more subjective measurements + * parrot.monitoring.lag // how far the response is lagging behind + * parrot.monitoring.nans // number of nans included in the response Usage: mt-parrot [flags]