Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Better mt store cat #590

Merged
merged 8 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 20 additions & 77 deletions cmd/mt-store-cat/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,37 @@ package main

import (
"fmt"
"sort"
"strconv"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/raintank/metrictank/mdata"
)

type byTTL []string
func chunkSummary(store *mdata.CassandraStore, tables []string, metrics []Metric, keyspace string, roundTTL int) error {
now := uint32(time.Now().Unix())
end_month := now - (now % mdata.Month_sec)

func (t byTTL) Len() int { return len(t) }
func (t byTTL) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byTTL) Less(i, j int) bool {
iTTL, _ := strconv.Atoi(strings.Split(t[i], "_")[1])
jTTL, _ := strconv.Atoi(strings.Split(t[j], "_")[1])
return iTTL < jTTL
}

func Dump(keyspace, prefix string, store *mdata.CassandraStore, roundTTL int) error {
targets := make(map[string]string)

if prefix == "" {
fmt.Println("# Looking for ALL metrics")
} else {
fmt.Println("# Looking for these metrics:")
iter := store.Session.Query("select id,metric from metric_idx").Iter()
var id, metric string
for iter.Scan(&id, &metric) {
if strings.HasPrefix(metric, prefix) {
fmt.Println(id, metric)
targets[id] = metric
}
}
err := iter.Close()
if err != nil {
log.Error(3, "cassandra query error. %s", err)
return err
}
}

fmt.Printf("# Keyspace %q contents:\n", keyspace)

meta, err := store.Session.KeyspaceMetadata(keyspace)
if err != nil {
return err
}
var tables []string
for tbl := range meta.Tables {
if tbl == "metric_idx" || !strings.HasPrefix(tbl, "metric_") {
continue
}
tables = append(tables, tbl)
}

sort.Sort(byTTL(tables))
for _, tbl := range tables {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl)
// ret := c.session.Query(query, row_key, t0, data).Exec()
// actual TTL may be up to 2x what's in tablename. see mdata/store_cassandra.go for details
// we query up to 4x so that we also include data that should have been dropped already but still sticks around for whatever reason.
TTLHours, _ := strconv.Atoi(strings.Split(tbl, "_")[1])
start := now - uint32(4*TTLHours)
start_month := start - (start % mdata.Month_sec)
fmt.Println("## Table", tbl)
iter := store.Session.Query(query).Iter()
var row, prevRow string
var ttl, prevTTL, cnt int
for iter.Scan(&row, &ttl) {
rowParts := strings.Split(row, "_")
if prefix != "" {
_, ok := targets[rowParts[0]]
if !ok {
continue
if len(metrics) == 0 {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl)
iter := store.Session.Query(query).Iter()
showKeyTTL(iter, roundTTL)
} else {
for _, metric := range metrics {
for month := start_month; month <= end_month; month++ {
row_key := fmt.Sprintf("%s_%d", metric.id, month/mdata.Month_sec)
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl)
iter := store.Session.Query(query, row_key).Iter()
showKeyTTL(iter, roundTTL)
}
}
ttl = ttl / roundTTL
if ttl == prevTTL && row == prevRow {
cnt += 1
} else {
if prevRow != "" && prevTTL != 0 {
fmt.Println(prevRow, prevTTL, cnt)
}
cnt = 0
prevTTL = ttl
prevRow = row
}
}
if cnt != 0 {
fmt.Println(prevRow, prevTTL, cnt)
}
err := iter.Close()
if err != nil {
log.Error(3, "cassandra query error. %s", err)
}
}
return nil
Expand Down
170 changes: 104 additions & 66 deletions cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -46,12 +46,12 @@ var (
cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication")

// our own flags
from = flag.String("from", "-24h", "get data from (inclusive)")
to = flag.String("to", "now", "get data until (exclusive)")
mdp = flag.Int("mdp", 0, "max data points to return")
fix = flag.Int("fix", 0, "fix data to this interval like metrictank does quantization")
from = flag.String("from", "-24h", "get data from (inclusive). only for points and points-summary format")
to = flag.String("to", "now", "get data until (exclusive). only for points and points-summary format")
fix = flag.Int("fix", 0, "fix data to this interval like metrictank does quantization. only for points and points-summary format")
printTs = flag.Bool("print-ts", false, "print time stamps instead of formatted dates. only for points and poins-summary format")
roundTTL = flag.Int("roundTTL", 3600, "group chunks in buckets based on rounded TTL with this modulo. only for chunk-summary format")
windowFactor = flag.Int("window-factor", 20, "the window factor be used when creating the metric table schema")
printTs = flag.Bool("print-ts", false, "print time stamps instead of formatted dates")
)

func main() {
Expand All @@ -61,75 +61,57 @@ func main() {
fmt.Println("Retrieves timeseries data from the cassandra store. Either raw or with minimal processing")
fmt.Println()
fmt.Println("Usage:")
fmt.Printf(" mt-store-cat [flags] <normal|summary> id <metric-id> <ttl>\n")
fmt.Printf(" mt-store-cat [flags] <normal|summary> query <org-id> <graphite query> (not supported yet)\n")
fmt.Printf(" mt-store-cat [flags] <normal|summary> full [roundTTL. defaults to 3600] [prefix match]\n")
fmt.Println()
fmt.Printf(" mt-store-cat [flags] tables\n")
fmt.Println()
fmt.Printf(" mt-store-cat [flags] <table-selector> <metric-selector> <format>\n")
fmt.Printf(" table-selector: '*' or name of a table. e.g. 'metric_128'\n")
fmt.Printf(" metric-selector: '*' or an id or prefix:<prefix>\n")
fmt.Printf(" format:\n")
fmt.Printf(" - points\n")
fmt.Printf(" - point-summary\n")
fmt.Printf(" - chunk-summary (shows TTL's in seconds, subject to roundTTL)\n")
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("mt-store-cat -cassandra-keyspace metrictank -from='-1min' '*' '1.77c8c77afa22b67ef5b700c2a2b88d5f' points")
fmt.Println("mt-store-cat -cassandra-keyspace metrictank -from='-1month' '*' 'prefix:fake' point-summary")
fmt.Println("mt-store-cat -cassandra-keyspace metrictank '*' 'prefix:fake' chunk-summary")
fmt.Println("mt-store-cat -roundTTL 1000000 -cassandra-keyspace metrictank 'metric_512' '1.37cf8e3731ee4c79063c1d55280d1bbe' chunk-summary")
fmt.Println("Flags:")
flag.PrintDefaults()
fmt.Println("Notes:")
fmt.Println(" * Using `*` as metric-selector may bring down your cassandra. Especially chunk-summary ignores from/to and queries all data.")
fmt.Println(" With great power comes great responsability")
fmt.Println(" * points that are not in the `from <= ts < to` range, are prefixed with `-`. In range has prefix of '>`")
fmt.Println(" * When using chunk-summary, if there's data that should have been expired by cassandra, but for some reason didn't, we won't see or report it")
}
flag.Parse()

if *showVersion {
fmt.Printf("mt-store-cat (built with %s, git hash %s)\n", runtime.Version(), GitHash)
return
}
if flag.NArg() < 2 {
if flag.NArg() < 1 {
flag.Usage()
os.Exit(-1)
}

selector := flag.Arg(1)
var id string
var ttl uint32
var roundTTL = 3600
var prefix string
// var query string
// var org int

switch selector {
case "id":
if flag.NArg() < 4 {
var tableSelector, metricSelector, format string
tableSelector = flag.Arg(0)
if tableSelector != "tables" {
if flag.NArg() < 3 {
flag.Usage()
os.Exit(-1)
}

id = flag.Arg(2)
ttl = dur.MustParseUNsec("ttl", flag.Arg(3))
case "full":
if flag.NArg() >= 3 {
var err error
roundTTL, err = strconv.Atoi(flag.Arg(2))
if err != nil {
flag.Usage()
os.Exit(-1)
}
}
if flag.NArg() == 4 {
prefix = flag.Arg(3)
}
if flag.NArg() > 4 {
metricSelector = flag.Arg(1)
format = flag.Arg(2)
if format != "points" && format != "point-summary" && format != "chunk-summary" {
flag.Usage()
os.Exit(-1)
}
if metricSelector == "prefix:" {
log.Fatal("prefix cannot be empty")
}

case "query":
// if flag.NArg() < 4 {
// flag.Usage()
// os.Exit(-1)
// }
// org64, err := strconv.ParseInt(flag.Arg(3), 10, 32)
// if err != nil {
// flag.Usage()
// os.Exit(-1)
// }
// org = int(org64)
// query = flag.Arg(4)
panic("sorry, queries not supported yet")
default:
flag.Usage()
os.Exit(-1)
}

// Only try and parse the conf file if it exists
Expand All @@ -153,33 +135,89 @@ func main() {
return
}

mode := flag.Arg(0)
if mode != "normal" && mode != "summary" {
panic("unsupported mode " + mode)
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil)
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, []uint32{ttl})
if tableSelector == "tables" {
tables, err := getTables(store, *cassandraKeyspace, "")
if err != nil {
log.Fatal(4, "%s", err)
}
for _, tbl := range tables {
fmt.Println(tbl)
}
return
}
tables, err := getTables(store, *cassandraKeyspace, tableSelector)
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
log.Fatal(4, "%s", err)
}

switch selector {
case "id":
var fromUnix, toUnix uint32

if format == "points" || format == "point-summary" {
now := time.Now()
defaultFrom := uint32(now.Add(-time.Duration(24) * time.Hour).Unix())
defaultTo := uint32(now.Add(time.Duration(1) * time.Second).Unix())

fromUnix, err := dur.ParseTSpec(*from, now, defaultFrom)
fromUnix, err = dur.ParseTSpec(*from, now, defaultFrom)
if err != nil {
log.Fatal(err)
}

toUnix, err := dur.ParseTSpec(*to, now, defaultTo)
toUnix, err = dur.ParseTSpec(*to, now, defaultTo)
if err != nil {
log.Fatal(err)
}
catId(id, ttl, fromUnix, toUnix, uint32(*fix), mode, store)
case "full":
Dump(*cassandraKeyspace, prefix, store, roundTTL)
}
var metrics []Metric
if metricSelector == "*" {
fmt.Println("# Looking for ALL metrics")
// chunk-summary doesn't need an explicit listing. it knows if metrics is empty, to query all
// but the other two do need an explicit listing.
if format == "points" || format == "point-summary" {
metrics, err = getMetrics(store, "")
if err != nil {
log.Error(3, "cassandra query error. %s", err)
return
}
}
} else if strings.HasPrefix(metricSelector, "prefix:") {
fmt.Println("# Looking for these metrics:")
metrics, err = getMetrics(store, strings.Replace(metricSelector, "prefix:", "", 1))
if err != nil {
log.Error(3, "cassandra query error. %s", err)
return
}
for _, m := range metrics {
fmt.Println(m.id, m.name)
}
} else {
fmt.Println("# Looking for this metric:")
metrics, err = getMetric(store, metricSelector)
if err != nil {
log.Error(3, "cassandra query error. %s", err)
return
}
if len(metrics) == 0 {
fmt.Printf("metric id %q not found", metricSelector)
return
}
for _, m := range metrics {
fmt.Println(m.id, m.name)
}
}

fmt.Printf("# Keyspace %q:\n", *cassandraKeyspace)

switch format {
case "points":
points(store, tables, metrics, fromUnix, toUnix, uint32(*fix))
case "point-summary":
pointSummary(store, tables, metrics, fromUnix, toUnix, uint32(*fix))
case "chunk-summary":
chunkSummary(store, tables, metrics, *cassandraKeyspace, *roundTTL)
}
}
Loading