Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
use storage-schemas.conf and storage-aggregation.conf
Browse files Browse the repository at this point in the history
to configure numchunks, chunkspans, ttls and aggregations.
For carbon also to find out the raw interval.
For other input plugins we only honor interval specified in the data
  • Loading branch information
Dieterbe committed Feb 28, 2017
1 parent f8e9991 commit e40a6a0
Show file tree
Hide file tree
Showing 27 changed files with 153 additions and 289 deletions.
2 changes: 1 addition & 1 deletion api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestGetSeriesFixed(t *testing.T) {
for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive)
name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to)

metric := metrics.GetOrCreate(name)
metric := metrics.GetOrCreate(name, name)
metric.Add(offset, 10) // this point will always be quantized to 10
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
Expand Down
2 changes: 1 addition & 1 deletion api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
ctx.Req.Method, from, to, len(request.Targets), request.MaxDataPoints)
}

reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.AggSettings())
reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.Schemas())
if err != nil {
log.Error(3, "HTTP Render alignReq error: %s", err)
response.Write(ctx, response.WrapError(err))
Expand Down
10 changes: 7 additions & 3 deletions api/query_engine.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package api

import (
"github.com/lomik/go-carbon/persister"
"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/util"
)
Expand All @@ -18,10 +18,10 @@ var (
)

// alignRequests updates the requests with all details for fetching, making sure all metrics are in the same, optimal interval
// luckily, all metrics still use the same aggSettings, making this a bit simpler
// note: it is assumed that all requests have the same from & to.
// also takes a "now" value which we compare the TTL against
func alignRequests(now uint32, reqs []models.Req, s mdata.AggSettings) ([]models.Req, error) {
func alignRequests(now uint32, reqs []models.Req, s persister.WhisperSchemas) ([]models.Req, error) {
// TODO figure out how to use schemas here

tsRange := (reqs[0].To - reqs[0].From)

Expand Down Expand Up @@ -65,6 +65,10 @@ func alignRequests(now uint32, reqs []models.Req, s mdata.AggSettings) ([]models
interval = s.Aggs[archive-1].Span
}

// then, make sure all series are at the same step, runtime consolidating as necessary
// note: we could have been able to just fetch a rollup to get the correct data.
// maybe for v2 of this.

/* we now just need to update the following properties for each req:
archive int // 0 means original data, 1 means first agg level, 2 means 2nd, etc.
archInterval uint32 // the interval corresponding to the archive we'll fetch
Expand Down
25 changes: 0 additions & 25 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ accounting-period = 5min

# see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details

# duration of raw chunks. e.g. 10min, 30min, 1h, 90min...
# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
chunkspan = 10min
# number of raw chunks to keep in in-memory ring buffer
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra.
numchunks = 7
# minimum wait before raw metrics are removed from storage
ttl = 35d

# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
Expand All @@ -32,18 +22,6 @@ gc-interval = 1h
# in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts
warm-up-period = 1h

# settings for rollups (aggregation for archives)
# comma-separated list of archive specifications.
# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true]
# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get:
# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra
# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying.
# When running a cluster of metrictank instances, all instances should have the same agg-settings.
# Note:
# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md
agg-settings =

## metric data storage in cassandra ##

# see https://github.com/raintank/metrictank/blob/master/docs/cassandra.md for more details
Expand Down Expand Up @@ -159,9 +137,6 @@ enabled = true
addr = :2003
# represents the "partition" of your data if you decide to partition your data.
partition = 0
# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that.
schemas-file = /etc/raintank/storage-schemas.conf

### kafka-mdm input (optional, recommended)
[kafka-mdm-in]
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
volumes:
- ../../build/metrictank:/usr/bin/metrictank
- ./metrictank.ini:/etc/raintank/metrictank.ini
- ./storage-schemas.conf:/etc/raintank/storage-schemas.conf
- ./storage-aggregation.conf:/etc/raintank/storage-aggregation.conf
environment:
WAIT_HOSTS: cassandra:9042,kafka:9092
WAIT_TIMEOUT: 60
Expand Down
25 changes: 0 additions & 25 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ accounting-period = 5min

# see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details

# duration of raw chunks. e.g. 10min, 30min, 1h, 90min...
# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
chunkspan = 2min
# number of raw chunks to keep in in-memory ring buffer
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra.
numchunks = 2
# minimum wait before raw metrics are removed from storage
ttl = 35d

# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
Expand All @@ -32,18 +22,6 @@ gc-interval = 1h
# in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts
warm-up-period = 1h

# settings for rollups (aggregation for archives)
# comma-separated list of archive specifications.
# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true]
# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get:
# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra
# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying.
# When running a cluster of metrictank instances, all instances should have the same agg-settings.
# Note:
# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md
agg-settings =

## metric data storage in cassandra ##

# see https://github.com/raintank/metrictank/blob/master/docs/cassandra.md for more details
Expand Down Expand Up @@ -159,9 +137,6 @@ enabled = true
addr = :2003
# represents the "partition" of your data if you decide to partition your data.
partition = 0
# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that.
schemas-file = /etc/raintank/storage-schemas.conf

### kafka-mdm input (optional, recommended)
[kafka-mdm-in]
Expand Down
23 changes: 0 additions & 23 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ accounting-period = 5min

```
# see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details
# duration of raw chunks. e.g. 10min, 30min, 1h, 90min...
# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
chunkspan = 10min
# number of raw chunks to keep in in-memory ring buffer
# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra.
numchunks = 7
# minimum wait before raw metrics are removed from storage
ttl = 35d
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
Expand All @@ -54,17 +45,6 @@ gc-interval = 1h
# shorter warmup means metrictank will need to query cassandra more if it doesn't have requested data yet.
# in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts
warm-up-period = 1h
# settings for rollups (aggregation for archives)
# comma-separated list of archive specifications.
# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true]
# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get:
# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra
# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying.
# When running a cluster of metrictank instances, all instances should have the same agg-settings.
# Note:
# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md
agg-settings =
```

## metric data storage in cassandra ##
Expand Down Expand Up @@ -189,9 +169,6 @@ enabled = false
addr = :2003
# represents the "partition" of your data if you decide to partition your data.
partition = 0
# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that.
schemas-file = /path/to/your/schemas-file
```

### kafka-mdm input (optional, recommended)
Expand Down
31 changes: 2 additions & 29 deletions input/carbon/carbon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"net"
"sync"

"github.com/lomik/go-carbon/persister"
"github.com/metrics20/go-metrics20/carbon20"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/input"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
Expand All @@ -29,7 +29,6 @@ type Carbon struct {
input.Handler
addrStr string
addr *net.TCPAddr
schemas persister.WhisperSchemas
listener *net.TCPListener
handlerWaitGroup sync.WaitGroup
quit chan struct{}
Expand Down Expand Up @@ -73,42 +72,20 @@ func (c *Carbon) Name() string {

var Enabled bool
var addr string
var schemasFile string
var schemas persister.WhisperSchemas
var partitionId int

func ConfigSetup() {
inCarbon := flag.NewFlagSet("carbon-in", flag.ExitOnError)
inCarbon.BoolVar(&Enabled, "enabled", false, "")
inCarbon.StringVar(&addr, "addr", ":2003", "tcp listen address")
inCarbon.IntVar(&partitionId, "partition", 0, "partition Id.")
inCarbon.StringVar(&schemasFile, "schemas-file", "/path/to/your/schemas-file", "see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf")
globalconf.Register("carbon-in", inCarbon)
}

func ConfigProcess() {
if !Enabled {
return
}
var err error
schemas, err = persister.ReadWhisperSchemas(schemasFile)
if err != nil {
log.Fatal(4, "carbon-in: can't read schemas file %q: %s", schemasFile, err.Error())
}
var defaultFound bool
for _, schema := range schemas {
if schema.Pattern.String() == ".*" {
defaultFound = true
}
if len(schema.Retentions) == 0 {
log.Fatal(4, "carbon-in: retention setting cannot be empty")
}
}
if !defaultFound {
// good graphite health (not sure what graphite does if there's no .*)
// but we definitely need to always be able to determine which interval to use
log.Fatal(4, "carbon-in: storage-conf does not have a default '.*' pattern")
}
cluster.Manager.SetPartitions([]int32{int32(partitionId)})
}

Expand All @@ -120,7 +97,6 @@ func New() *Carbon {
return &Carbon{
addrStr: addr,
addr: addrT,
schemas: schemas,
connTrack: NewConnTrack(),
}
}
Expand Down Expand Up @@ -196,10 +172,7 @@ func (c *Carbon) handle(conn net.Conn) {
continue
}
name := string(key)
s, ok := c.schemas.Match(name)
if !ok {
log.Fatal(4, "carbon-in: couldn't find a schema for %q - this is impossible since we asserted there was a default with patt .*", name)
}
s := mdata.Match(name)
interval := s.Retentions[0].SecondsPerPoint()
md := &schema.MetricData{
Name: name,
Expand Down
2 changes: 1 addition & 1 deletion input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (in DefaultHandler) Process(metric *schema.MetricData, partition int32) {
}

pre = time.Now()
m := in.metrics.GetOrCreate(metric.Id)
m := in.metrics.GetOrCreate(metric.Id, metric.Name)
m.Add(uint32(metric.Time), metric.Value)
if in.usage != nil {
in.usage.Add(metric.OrgId, metric.Id)
Expand Down
15 changes: 8 additions & 7 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

whisper "github.com/lomik/go-whisper"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata/cache"
Expand Down Expand Up @@ -39,21 +40,21 @@ type AggMetric struct {

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
// it optionally also creates aggregations with the given settings
func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric {
func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, retentions whisper.Retentions) *AggMetric {
m := AggMetric{
cachePusher: cachePusher,
store: store,
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
ChunkSpan: retentions[0].ChunkSpan,
NumChunks: retentions[0].NumChunks,
Chunks: make([]*chunk.Chunk, 0, retentions[0].NumChunks), // note: during parsing of retentions, we assure there's at least 1.
ttl: uint32(retentions[0].MaxRetention()),
// we set LastWrite here to make sure a new Chunk doesn't get immediately
// garbage collected right after creating it, before we can push to it.
lastWrite: uint32(time.Now().Unix()),
}
for _, as := range aggsetting {
m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, as.Span, as.ChunkSpan, as.NumChunks, as.TTL))
for _, ret := range retentions[1:] {
m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, *ret))
}

return &m
Expand Down
3 changes: 2 additions & 1 deletion mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
maxT := 3600 * 24 * uint32(b.N) // b.N in days
for t := uint32(1); t < maxT; t += 10 {
for metricI := 0; metricI < 1000; metricI++ {
m := metrics.GetOrCreate(keys[metricI])
k := keys[metricI]
m := metrics.GetOrCreate(k, k)
m.Add(t, float64(t))
}
}
Expand Down
Loading

0 comments on commit e40a6a0

Please sign in to comment.