Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1680 from grafana/parrot-init
Browse files Browse the repository at this point in the history
mt-parrot: continuous validation by sending dummy stats and querying them back
  • Loading branch information
Dieterbe authored Apr 6, 2020
2 parents d1c01c2 + 5c3aace commit a8b1091
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 77 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-keygen/mt-keygen
/cmd/mt-parrot/mt-parrot
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 7 additions & 20 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/metrictank/expr"
"github.com/grafana/metrictank/util/align"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/cluster"
Expand Down Expand Up @@ -55,20 +56,6 @@ type getTargetsResp struct {
err error
}

// alignForward aligns ts to the next timestamp that divides by the interval, except if it is already aligned
func alignForward(ts, interval uint32) uint32 {
remain := ts % interval
if remain == 0 {
return ts
}
return ts + interval - remain
}

// alignBackward aligns the ts to the previous ts that divides by the interval, even if it is already aligned
func alignBackward(ts uint32, interval uint32) uint32 {
return ts - ((ts-1)%interval + 1)
}

// Fix assures a series is in quantized form:
// all points are nicely aligned (quantized) and padded with nulls in case there's gaps in data
// graphite does this quantization before storing, we may want to do that as well at some point
Expand All @@ -77,8 +64,8 @@ func alignBackward(ts uint32, interval uint32) uint32 {
// values to earlier in time.
func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {

first := alignForward(from, interval)
last := alignBackward(to, interval)
first := align.ForwardIfNotAligned(from, interval)
last := align.Backward(to, interval)

if last < first {
// the requested range is too narrow for the requested interval
Expand Down Expand Up @@ -741,8 +728,8 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client.

if req.Archive == 0 {
rc.From = alignBackward(req.From, req.ArchInterval) + 1
rc.To = alignBackward(req.To, req.ArchInterval) + 1
rc.From = align.Backward(req.From, req.ArchInterval) + 1
rc.To = align.Backward(req.To, req.ArchInterval) + 1
rc.AMKey = schema.AMKey{MKey: req.MKey}
} else {
rc.From = req.From
Expand All @@ -767,7 +754,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// but because we eventually want to consolidate into a point with ts=60, we also need the points that will be fix-adjusted to 40 and 50.
// so From needs to be lowered by 20 to become 35 (or 31 if adjusted).

boundary := alignForward(rc.From, req.OutInterval)
boundary := align.ForwardIfNotAligned(rc.From, req.OutInterval)
rewind := req.AggNum * req.ArchInterval
if boundary < rewind {
panic(fmt.Sprintf("Cannot rewind far back enough (trying to rewind by %d from timestamp %d)", rewind, boundary))
Expand All @@ -793,7 +780,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// 240 - 211 - ...,210 - 210 - 210 - 210
//*240->231 - 211 - ...,210 - 210 - 210 - 210

rc.To = alignBackward(rc.To, req.OutInterval) + 1
rc.To = align.Backward(rc.To, req.OutInterval) + 1
}

return &rc
Expand Down
46 changes: 0 additions & 46 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,52 +306,6 @@ func TestFix(t *testing.T) {

}

type pbCase struct {
ts uint32
span uint32
boundary uint32
}

func TestAlignForward(t *testing.T) {
cases := []pbCase{
{1, 60, 60},
{2, 60, 60},
{3, 60, 60},
{57, 60, 60},
{58, 60, 60},
{59, 60, 60},
{60, 60, 60},
{61, 60, 120},
{62, 60, 120},
{63, 60, 120},
}
for _, c := range cases {
if ret := alignForward(c.ts, c.span); ret != c.boundary {
t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret)
}
}
}

func TestAlignBackward(t *testing.T) {
cases := []pbCase{
{1, 60, 0},
{2, 60, 0},
{3, 60, 0},
{57, 60, 0},
{58, 60, 0},
{59, 60, 0},
{60, 60, 0},
{61, 60, 60},
{62, 60, 60},
{63, 60, 60},
}
for _, c := range cases {
if ret := alignBackward(c.ts, c.span); ret != c.boundary {
t.Fatalf("alignBackward for ts %d with span %d should be %d, not %d", c.ts, c.span, c.boundary, ret)
}
}
}

// TestGetSeriesFixed assures that series data is returned in proper form.
// for each case, we generate a new series of 5 points to cover every possible combination of:
// * every possible data offset (against its quantized version) e.g. offset between 0 and interval-1
Expand Down
63 changes: 63 additions & 0 deletions cmd/mt-parrot/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"github.com/grafana/metrictank/clock"
"github.com/grafana/metrictank/schema"
log "github.com/sirupsen/logrus"
"sync/atomic"
)

func produceTestMetrics(metrics []*schema.MetricData) {
for tick := range clock.AlignedTickLossless(testMetricsInterval) {
for _, metric := range metrics {
metric.Time = tick.Unix()
metric.Value = float64(tick.Unix())
}
publisher.Flush(metrics)
atomic.StoreInt64(&lastPublish, tick.Unix())
log.Infof("flushed metrics for ts %d", tick.Unix())
}
}

//generateMetrics generates a MetricData that hashes to each of numPartitions partitions
func generateMetrics(numPartitions int32) []*schema.MetricData {
var metrics []*schema.MetricData
for i := int32(0); i < numPartitions; i++ {
metrics = append(metrics, generateMetric(i))
}
return metrics
}

//generateMetric generates a single MetricData that hashes to the given partition
func generateMetric(desiredPartition int32) *schema.MetricData {
metric := schema.MetricData{
OrgId: orgId,
Unit: "partyparrots",
Mtype: "gauge",
Interval: int(testMetricsInterval.Seconds()),
}

for i := 1; true; i++ {
metric.Name = fmt.Sprintf("parrot.testdata.%d.identity.%s", desiredPartition, generatePartitionSuffix(i))
id, err := metric.PartitionID(partitionMethod, partitionCount)
if err != nil {
log.Fatal(err)
}
if id == desiredPartition {
log.Infof("metric for partition %d: %s", desiredPartition, metric.Name)
return &metric
}
}
return nil
}

var alphabet = []rune("abcdefghijklmnopqrstuvwxyz")

//generatePartitionSuffix deterministically generates a suffix for partition by brute force
func generatePartitionSuffix(i int) string {
if i > 25 {
return generatePartitionSuffix((i/26)-1) + string(alphabet[i%26])
}
return string(alphabet[i%26])
}
144 changes: 144 additions & 0 deletions cmd/mt-parrot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"os"
"strings"
"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/stats"
"github.com/raintank/met/statsd"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
gatewayAddress string
gatewayKey string
orgId int
partitionCount int32
partitionMethodString string
testMetricsInterval time.Duration
queryInterval time.Duration
lookbackPeriod time.Duration
logLevel string
lastPublish int64

statsGraphite *stats.Graphite
statsPrefix string
statsAddr string
statsBufferSize int
statsTimeout time.Duration

partitionMethod schema.PartitionByMethod
publisher out.Out
)

func init() {
parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059", "the url of the metrics gateway")
parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests")
parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to")
parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of kafka partitions in use")
parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method in use on the gateway, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv")
parrotCmd.Flags().DurationVar(&testMetricsInterval, "test-metrics-interval", 10*time.Second, "interval to send test metrics")
parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics")
parrotCmd.Flags().DurationVar(&lookbackPeriod, "lookback-period", 5*time.Minute, "how far to look back when validating metrics")
parrotCmd.Flags().StringVar(&statsPrefix, "stats-prefix", "", "stats prefix (will add trailing dot automatically if needed)")
parrotCmd.Flags().StringVar(&statsAddr, "stats-address", "localhost:2003", "address to send monitoring statistics to")
parrotCmd.Flags().IntVar(&statsBufferSize, "stats-buffer-size", 20000, "how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable.")
parrotCmd.Flags().DurationVar(&statsTimeout, "stats-timeout", time.Second*10, "timeout after which a write is considered not successful")

parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug")

formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
}

func main() {
err := parrotCmd.Execute()
if err != nil {
log.Fatal(err)
}
}

var parrotCmd = &cobra.Command{
Use: "mt-parrot",
Short: `generate deterministic metrics for each metrictank partition, query them back and report on correctness and performance
Correctness:
Monitor the parrot.monitoring.error series. There's 3 potential issues:
* parrot.monitoring.error;error=http // could not execute http request
* parrot.monitoring.error;error=decode // could not decode http response
* parrot.monitoring.error;error=invalid // any other problem with the response itself
Performance:
In addition to these black-and-white measurements above, there are also more subjective measurements
* parrot.monitoring.lag // how far the response is lagging behind
* parrot.monitoring.nans // number of nans included in the response
`,
Run: func(cmd *cobra.Command, args []string) {
lvl, err := log.ParseLevel(logLevel)
if err != nil {
log.Fatalf("failed to parse log-level, %s", err.Error())
}

validateDurationsInSeconds()

log.SetLevel(lvl)
mustParsePartitionMethod()
initGateway()
initStats()

metrics := generateMetrics(partitionCount)
go produceTestMetrics(metrics)

monitor()
},
}

// mustParsePartitionMethod parses the partitionScheme cli flag,
// exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot).
func mustParsePartitionMethod() {
var err error
partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString)
if err != nil {
log.Fatal(err)
}
if partitionMethod == schema.PartitionByOrg {
log.Fatal("byOrg not supported")
}
}

func initGateway() {
var err error
backend, _ := statsd.New(false, "", "")
publisher, err = gnet.New(gatewayAddress+"/metrics", gatewayKey, backend)
if err != nil {
log.Fatal(err)
}
log.Info("gateway initialized")
}

func initStats() {
hostname, _ := os.Hostname()
prefix := strings.Replace(statsPrefix, "$hostname", strings.Replace(hostname, ".", "_", -1), -1)
//need to use a negative interval so we can manually set the report timestamps
statsGraphite = stats.NewGraphite(prefix, statsAddr, -1, statsBufferSize, statsTimeout)
}

func validateDurationsInSeconds() {
if testMetricsInterval%time.Second != 0 {
log.Fatal("test-metrics-interval must be in seconds")
}
if queryInterval%time.Second != 0 {
log.Fatal("query-interval must be in seconds")
}
if lookbackPeriod%time.Second != 0 {
log.Fatal("lookback-period must be in seconds")
}
}
Loading

0 comments on commit a8b1091

Please sign in to comment.