Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

report MetricPoint and MetricPointWithoutOrg separately #891

Merged
merged 3 commits into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/mt-kafka-mdm-sniff-out-of-order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
)

var (
Expand Down Expand Up @@ -111,7 +112,7 @@ func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition
ip.lock.Unlock()
}

func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, partition int32) {
func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) {
now := Msg{
Part: partition,
Seen: time.Now(),
Expand Down
3 changes: 2 additions & 1 deletion cmd/mt-kafka-mdm-sniff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
)

var (
Expand Down Expand Up @@ -72,7 +73,7 @@ func (ip inputPrinter) ProcessMetricData(metric *schema.MetricData, partition in
}
}

func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
stdoutLock.Lock()
err := ip.tplP.Execute(os.Stdout, DataP{
partition,
Expand Down
17 changes: 13 additions & 4 deletions dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,25 @@
"yaxis": 2
},
{
"alias": "/received/",
"alias": "/metricdata.received/",
"lines": true,
"points": false,
"color": "#3f6833"
"color": "#2a4422",
"stack": "A"
},
{
"alias": "/metricpoint.received/",
"lines": true,
"points": false,
"color": "#7eb26d"
"color": "#3f6833",
"stack": "A"
},
{
"alias": "/metricpoint_no_org.received/",
"lines": true,
"points": false,
"color": "#7eb26d",
"stack": "A"
},
{
"alias": "/invalid/",
Expand Down Expand Up @@ -180,7 +189,7 @@
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
"value_type": "individual"
},
"type": "graph",
"xaxis": {
Expand Down
9 changes: 9 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/gw/tsdb-gw.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ metrics-buffer-max-msgs = 100000
metrics-buffer-max-ms = 100
batch-num-messages = 10000

# enable optimized MetricPoint payload
v2 = true
# encode org-id in messages
v2-org = true
# expire keys (and resend MetricData if seen again) if not seen for this much time
v2-stale-thresh = 6h
# check interval for expiring keys
v2-prune-interval = 1h

graphite-url = http://metrictank:8080
metrictank-url = http://metrictank:6060

Expand Down
2 changes: 1 addition & 1 deletion docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ partition = 0
[kafka-mdm-in]
enabled = true
# For incoming MetricPoint messages without org-id, assume this org id
org-id = 0
org-id = 1
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
Expand Down
29 changes: 24 additions & 5 deletions docker/extra/dashboards/tsdb-gw.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
"panels": [
{
"alerting": {},
"aliasColors": {
"published.metricdata": "#2f575e",
"published.metricpoint": "#7eb26d"
},
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
Expand Down Expand Up @@ -87,7 +84,29 @@
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"seriesOverrides": [
{
"alias": "/metricdata/",
"lines": true,
"points": false,
"color": "#2a4422",
"stack": "A"
},
{
"alias": "/metricpoint/",
"lines": true,
"points": false,
"color": "#3f6833",
"stack": "A"
},
{
"alias": "/metricpoint_no_org/",
"lines": true,
"points": false,
"color": "#7eb26d",
"stack": "A"
}
],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
Expand Down
15 changes: 11 additions & 4 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"fmt"
"time"

schema "gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"

"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/mdata"
Expand All @@ -16,7 +17,7 @@ import (

type Handler interface {
ProcessMetricData(md *schema.MetricData, partition int32)
ProcessMetricPoint(point schema.MetricPoint, partition int32)
ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32)
}

// TODO: clever way to document all metrics for all different inputs
Expand All @@ -25,6 +26,7 @@ type Handler interface {
type DefaultHandler struct {
receivedMD *stats.Counter32
receivedMP *stats.Counter32
receivedMPNO *stats.Counter32
invalidMD *stats.Counter32
invalidMP *stats.Counter32
unknownMP *stats.Counter32
Expand All @@ -39,6 +41,7 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
return DefaultHandler{
receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)),
receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)),
receivedMPNO: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint_no_org.received", input)),
invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
Expand All @@ -52,8 +55,12 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input

// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
// concurrency-safe.
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) {
in.receivedMP.Inc()
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) {
if format == msg.FormatMetricPoint {
in.receivedMP.Inc()
} else {
in.receivedMPNO.Inc()
}
if !point.Valid() {
in.invalidMP.Inc()
log.Debug("in: Invalid metric %v", point)
Expand Down
5 changes: 3 additions & 2 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,15 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
}

func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
if msg.IsPointMsg(data) {
format, isPointMsg := msg.IsPointMsg(data)
if isPointMsg {
_, point, err := msg.ReadPointMsg(data, uint32(orgId))
if err != nil {
metricsDecodeErr.Inc()
log.Error(3, "kafka-mdm decode error, skipping message. %s", err)
return
}
k.Handler.ProcessMetricPoint(point, partition)
k.Handler.ProcessMetricPoint(point, format, partition)
return
}

Expand Down
12 changes: 9 additions & 3 deletions vendor/gopkg.in/raintank/schema.v1/msg/msg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.