Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

mt-parrot: continuous validation by sending dummy stats and querying them back #1680

Merged
merged 60 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
68a9aac
Add mt-parrot command to generate deterministic artificial metrics fo…
fitzoh Feb 6, 2020
6468658
only use base url for gatewayAddress so that we can query it
fitzoh Feb 21, 2020
993dcc7
query with explicit time ranges
fitzoh Feb 21, 2020
1ba76d3
publish stats instead of printing them
fitzoh Feb 21, 2020
e36561f
extract partition via regex
fitzoh Feb 21, 2020
8c917ce
more logging
fitzoh Feb 21, 2020
9fccc21
stats for other failure modes
fitzoh Feb 21, 2020
f387547
go fmt
fitzoh Feb 21, 2020
74116e5
update docs
fitzoh Feb 21, 2020
e5f70fb
FormatInt -> Itoa
fitzoh Feb 25, 2020
ec7ebf6
update flags to clarify that parrot doesn't control the number of par…
fitzoh Feb 25, 2020
d18681f
update command name
fitzoh Feb 25, 2020
62a9a73
declare counters once
fitzoh Feb 26, 2020
4b4fb0f
simplify partition parsing logic
fitzoh Feb 26, 2020
1080059
tweak parition suffix generation function
fitzoh Feb 26, 2020
1a498fc
artificial -> test
fitzoh Feb 26, 2020
af6b666
bump default test interval 5s -> 10s + update docs
fitzoh Feb 26, 2020
f330794
gateway -> publisher
fitzoh Feb 26, 2020
69e110b
fix lastSeen tracking
fitzoh Feb 26, 2020
755f474
limit number of error types tracked
fitzoh Feb 26, 2020
d967261
initialize metrics up front
fitzoh Feb 27, 2020
14187eb
use aligned lossless tick
fitzoh Feb 27, 2020
c19b304
bring decode error back
fitzoh Feb 27, 2020
86c7cff
refactor, pull series metrics gathering into a function
fitzoh Feb 28, 2020
6ccddd9
add and validate parameter to control how far back we query
fitzoh Mar 3, 2020
0b0c9ce
break out if we encounter an error
fitzoh Mar 3, 2020
713927e
Add additional validation for correct number of points that are corre…
fitzoh Mar 3, 2020
362f914
set lag based on last publish time
fitzoh Mar 3, 2020
ab338f2
fix validation of lookback parameter
fitzoh Mar 3, 2020
d06aa84
actually export metrics for new stats
fitzoh Mar 3, 2020
c57c5af
update docs
fitzoh Mar 4, 2020
0a7553e
clearer desc
Dieterbe Mar 11, 2020
679eabc
initialize stats and manually set reporting timestamps
fitzoh Mar 17, 2020
26e2b19
load atomically
fitzoh Mar 17, 2020
f9d103b
update metric name
fitzoh Mar 17, 2020
6ce3660
update log
fitzoh Mar 17, 2020
496a4cb
use tag constructor for tag based metrics
fitzoh Mar 17, 2020
b5771cb
rename variables schema -> metric
fitzoh Mar 17, 2020
23a250b
ensure that durations are multiples of 1s
fitzoh Mar 17, 2020
b2612af
disable metrics2d0cs for parrot
fitzoh Mar 27, 2020
df72b2a
make checkSpacing simpler
Dieterbe Mar 12, 2020
3cb22c5
clearer description
Dieterbe Mar 30, 2020
d8c46dd
remove needless clause
Dieterbe Mar 30, 2020
a03fda3
functions that can force quit the program should be called must*
Dieterbe Mar 30, 2020
08e2d4b
gofmt import paths
Dieterbe Mar 30, 2020
07a9d1a
consistency between series and partition stats
Dieterbe Mar 30, 2020
9ab9906
need a lossy ticker for the checker routine
Dieterbe Mar 30, 2020
2095f9f
query response is a response
Dieterbe Mar 30, 2020
9281a5a
fix off-by-one's
Dieterbe Mar 30, 2020
2cb97a6
make alignForward/alignBackward functions reusable
Dieterbe Mar 30, 2020
4f3518f
fix check of last timestamp. needs to be aligned
Dieterbe Mar 30, 2020
bb73e5e
clean up code by using constructor
Dieterbe Mar 30, 2020
f88aad6
remove bogus clause
Dieterbe Mar 30, 2020
4a53d22
consistent parrot.monitoring.error metrics: once per response
Dieterbe Mar 30, 2020
10a2304
don't round deltaSum down. we need to know every difference
Dieterbe Mar 30, 2020
1045f17
simplify stats setting
Dieterbe Mar 30, 2020
7a1671e
fix invalid check.
Dieterbe Mar 30, 2020
200b0c0
make it easy to monitor for correctness
Dieterbe Mar 30, 2020
e8b845c
fix lastPoint and num points check
Dieterbe Mar 30, 2020
5c3aace
scripts/dev/tools-to-doc.sh > docs/tools.md
Dieterbe Apr 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-keygen/mt-keygen
/cmd/mt-parrot/mt-parrot
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 7 additions & 20 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/metrictank/expr"
"github.com/grafana/metrictank/util/align"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
Expand Down Expand Up @@ -53,20 +54,6 @@ type getTargetsResp struct {
err error
}

// alignForward aligns ts to the next timestamp that divides by the interval, except if it is already aligned
func alignForward(ts, interval uint32) uint32 {
remain := ts % interval
if remain == 0 {
return ts
}
return ts + interval - remain
}

// alignBackward aligns the ts to the previous ts that divides by the interval, even if it is already aligned
func alignBackward(ts uint32, interval uint32) uint32 {
return ts - ((ts-1)%interval + 1)
}

// Fix assures a series is in quantized form:
// all points are nicely aligned (quantized) and padded with nulls in case there's gaps in data
// graphite does this quantization before storing, we may want to do that as well at some point
Expand All @@ -75,8 +62,8 @@ func alignBackward(ts uint32, interval uint32) uint32 {
// values to earlier in time.
func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {

first := alignForward(from, interval)
last := alignBackward(to, interval)
first := align.ForwardIfNotAligned(from, interval)
last := align.Backward(to, interval)

if last < first {
// the requested range is too narrow for the requested interval
Expand Down Expand Up @@ -740,8 +727,8 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client.

if req.Archive == 0 {
rc.From = alignBackward(req.From, req.ArchInterval) + 1
rc.To = alignBackward(req.To, req.ArchInterval) + 1
rc.From = align.Backward(req.From, req.ArchInterval) + 1
rc.To = align.Backward(req.To, req.ArchInterval) + 1
rc.AMKey = schema.AMKey{MKey: req.MKey}
} else {
rc.From = req.From
Expand All @@ -766,7 +753,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// but because we eventually want to consolidate into a point with ts=60, we also need the points that will be fix-adjusted to 40 and 50.
// so From needs to be lowered by 20 to become 35 (or 31 if adjusted).

boundary := alignForward(rc.From, req.OutInterval)
boundary := align.ForwardIfNotAligned(rc.From, req.OutInterval)
rewind := req.AggNum * req.ArchInterval
if boundary < rewind {
panic(fmt.Sprintf("Cannot rewind far back enough (trying to rewind by %d from timestamp %d)", rewind, boundary))
Expand All @@ -792,7 +779,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// 240 - 211 - ...,210 - 210 - 210 - 210
//*240->231 - 211 - ...,210 - 210 - 210 - 210

rc.To = alignBackward(rc.To, req.OutInterval) + 1
rc.To = align.Backward(rc.To, req.OutInterval) + 1
}

return &rc
Expand Down
46 changes: 0 additions & 46 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,52 +306,6 @@ func TestFix(t *testing.T) {

}

type pbCase struct {
ts uint32
span uint32
boundary uint32
}

func TestAlignForward(t *testing.T) {
cases := []pbCase{
{1, 60, 60},
{2, 60, 60},
{3, 60, 60},
{57, 60, 60},
{58, 60, 60},
{59, 60, 60},
{60, 60, 60},
{61, 60, 120},
{62, 60, 120},
{63, 60, 120},
}
for _, c := range cases {
if ret := alignForward(c.ts, c.span); ret != c.boundary {
t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret)
}
}
}

func TestAlignBackward(t *testing.T) {
cases := []pbCase{
{1, 60, 0},
{2, 60, 0},
{3, 60, 0},
{57, 60, 0},
{58, 60, 0},
{59, 60, 0},
{60, 60, 0},
{61, 60, 60},
{62, 60, 60},
{63, 60, 60},
}
for _, c := range cases {
if ret := alignBackward(c.ts, c.span); ret != c.boundary {
t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret)
}
}
}

// TestGetSeriesFixed assures that series data is returned in proper form.
// for each case, we generate a new series of 5 points to cover every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
Expand Down
63 changes: 63 additions & 0 deletions cmd/mt-parrot/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"github.com/grafana/metrictank/clock"
"github.com/grafana/metrictank/schema"
log "github.com/sirupsen/logrus"
"sync/atomic"
)

func produceTestMetrics(metrics []*schema.MetricData) {
for tick := range clock.AlignedTickLossless(testMetricsInterval) {
for _, metric := range metrics {
metric.Time = tick.Unix()
metric.Value = float64(tick.Unix())
}
publisher.Flush(metrics)
atomic.StoreInt64(&lastPublish, tick.Unix())
log.Infof("flushed metrics for ts %d", tick.Unix())
}
}

//generateMetrics generates a MetricData that hashes to each of numPartitions partitions
func generateMetrics(numPartitions int32) []*schema.MetricData {
var metrics []*schema.MetricData
for i := int32(0); i < numPartitions; i++ {
metrics = append(metrics, generateMetric(i))
}
return metrics
}

//generateMetric generates a single MetricData that hashes to the given partition
func generateMetric(desiredPartition int32) *schema.MetricData {
metric := schema.MetricData{
OrgId: orgId,
Unit: "partyparrots",
Mtype: "gauge",
Interval: int(testMetricsInterval.Seconds()),
}

for i := 1; true; i++ {
metric.Name = fmt.Sprintf("parrot.testdata.%d.identity.%s", desiredPartition, generatePartitionSuffix(i))
id, err := metric.PartitionID(partitionMethod, partitionCount)
if err != nil {
log.Fatal(err)
}
if id == desiredPartition {
log.Infof("metric for partition %d: %s", desiredPartition, metric.Name)
return &metric
}
}
return nil
}

var alphabet = []rune("abcdefghijklmnopqrstuvwxyz")

//generatePartitionSuffix deterministically generates a suffix for partition by brute force
func generatePartitionSuffix(i int) string {
if i > 25 {
return generatePartitionSuffix((i/26)-1) + string(alphabet[i%26])
}
return string(alphabet[i%26])
}
144 changes: 144 additions & 0 deletions cmd/mt-parrot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"os"
"strings"
"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/stats"
"github.com/raintank/met/statsd"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
gatewayAddress string
gatewayKey string
orgId int
partitionCount int32
partitionMethodString string
testMetricsInterval time.Duration
queryInterval time.Duration
lookbackPeriod time.Duration
logLevel string
lastPublish int64

statsGraphite *stats.Graphite
statsPrefix string
statsAddr string
statsBufferSize int
statsTimeout time.Duration

partitionMethod schema.PartitionByMethod
publisher out.Out
)

func init() {
parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059", "the url of the metrics gateway")
parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests")
parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to")
parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of kafka partitions in use")
parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv")
parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 10*time.Second, "interval to send test metrics")
Dieterbe marked this conversation as resolved.
Show resolved Hide resolved
parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to check that these intervals are multiples of time.Second

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parrotCmd.Flags().DurationVar(&lookbackPeriod, "lookback-period", 5*time.Minute, "how far to look back when validating metrics")
parrotCmd.Flags().StringVar(&statsPrefix, "stats-prefix", "", "stats prefix (will add trailing dot automatically if needed)")
parrotCmd.Flags().StringVar(&statsAddr, "stats-address", "localhost:2003", "address to send monitoring statistics to")
parrotCmd.Flags().IntVar(&statsBufferSize, "stats-buffer-size", 20000, "how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable.")
parrotCmd.Flags().DurationVar(&statsTimeout, "stats-timeout", time.Second*10, "timeout after which a write is considered not successful")

parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug")

formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
}

func main() {
err := parrotCmd.Execute()
if err != nil {
log.Fatal(err)
}
}

var parrotCmd = &cobra.Command{
Use: "mt-parrot",
Short: `generate deterministic metrics for each metrictank partition, query them back and report on correctness and performance
Correctness:
Monitor the parrot.monitoring.error series. There's 3 potential issues:
* parrot.monitoring.error;error=http // could not execute http request
* parrot.monitoring.error;error=decode // could not decode http response
* parrot.monitoring.error;error=invalid // any other problem with the response itself
Performance:
In addition to these black-and-white measurements above, there are also more subjective measurements
* parrot.monitoring.lag // how far the response is lagging behind
* parrot.monitoring.nans // number of nans included in the response
`,
Run: func(cmd *cobra.Command, args []string) {
lvl, err := log.ParseLevel(logLevel)
if err != nil {
log.Fatalf("failed to parse log-level, %s", err.Error())
}

validateDurationsInSeconds()

log.SetLevel(lvl)
mustParsePartitionMethod()
initGateway()
initStats()

metrics := generateMetrics(partitionCount)
go produceTestMetrics(metrics)

monitor()
},
}

// mustParsePartitionMethod parses the partitionScheme cli flag,
// exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot).
func mustParsePartitionMethod() {
var err error
partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString)
if err != nil {
log.Fatal(err)
}
if partitionMethod == schema.PartitionByOrg {
log.Fatal("byOrg not supported")
}
}

func initGateway() {
var err error
backend, _ := statsd.New(false, "", "")
publisher, err = gnet.New(gatewayAddress+"/metrics", gatewayKey, backend)
if err != nil {
log.Fatal(err)
}
log.Info("gateway initialized")
}

func initStats() {
hostname, _ := os.Hostname()
prefix := strings.Replace(statsPrefix, "$hostname", strings.Replace(hostname, ".", "_", -1), -1)
//need to use a negative interval so we can manually set the report timestamps
statsGraphite = stats.NewGraphite(prefix, statsAddr, -1, statsBufferSize, statsTimeout)
}

func validateDurationsInSeconds() {
if testMetricsInterval%time.Second != 0 {
log.Fatal("test-metrics-interval must be in seconds")
}
if queryInterval%time.Second != 0 {
log.Fatal("query-interval must be in seconds")
}
if lookbackPeriod%time.Second != 0 {
log.Fatal("lookback-period must be in seconds")
}
}
Loading