Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
initial version of cache pushing for hot metrics
Browse files Browse the repository at this point in the history
refers to ticket #457
  • Loading branch information
replay committed Jan 10, 2017
1 parent c4bf293 commit 5d37148
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 32 deletions.
9 changes: 6 additions & 3 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ func TestPrevBoundary(t *testing.T) {
func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now())
store := mdata.NewDevnullStore()
metrics := mdata.NewAggMetrics(store, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})
cacheCb := func(string, uint32, *chunk.IterGen) {}
metrics := mdata.NewAggMetrics(store, cacheCb, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})
addr = "localhost:6060"
srv, _ := NewServer()
srv.BindBackendStore(store)
Expand Down Expand Up @@ -1188,7 +1189,8 @@ func TestGetSeriesCachedStore(t *testing.T) {
srv, _ := NewServer()
store := mdata.NewMockStore()
srv.BindBackendStore(store)
metrics := mdata.NewAggMetrics(store, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})
cacheCb := func(string, uint32, *chunk.IterGen) {}
metrics := mdata.NewAggMetrics(store, cacheCb, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})
srv.BindMemoryStore(metrics)
metric := "metric1"
var c *cache.CCache
Expand Down Expand Up @@ -1311,7 +1313,8 @@ func TestGetSeriesAggMetrics(t *testing.T) {
store := mdata.NewMockStore()
chunkSpan := uint32(600)
numChunks := uint32(10)
metrics := mdata.NewAggMetrics(store, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})
cacheCb := func(string, uint32, *chunk.IterGen) {}
metrics := mdata.NewAggMetrics(store, cacheCb, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})
addr = "localhost:6060"
srv, _ := NewServer()
srv.BindBackendStore(store)
Expand Down
4 changes: 3 additions & 1 deletion input/carbon/carbon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)

func Test_HandleMessage(t *testing.T) {
cluster.Init("default", "test", time.Now())
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
cacheCb := func(string, uint32, *chunk.IterGen) {}
aggmetrics := mdata.NewAggMetrics(store, cacheCb, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
7 changes: 5 additions & 2 deletions input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)

func Test_Process(t *testing.T) {
cluster.Init("default", "test", time.Now())
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
cacheCb := func(string, uint32, *chunk.IterGen) {}
aggmetrics := mdata.NewAggMetrics(store, cacheCb, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down Expand Up @@ -86,8 +88,9 @@ func test_Process(worker int, in *Input, t *testing.T) map[string]int {
func BenchmarkProcess(b *testing.B) {
cluster.Init("default", "test", time.Now())

cacheCb := func(string, uint32, *chunk.IterGen) {}
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
aggmetrics := mdata.NewAggMetrics(store, cacheCb, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
4 changes: 3 additions & 1 deletion input/kafkamdm/kafkamdm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/input"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/usage"

"gopkg.in/raintank/schema.v1"
Expand All @@ -18,7 +19,8 @@ import (
func Test_HandleMessage(t *testing.T) {
cluster.Init("default", "test", time.Now())
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
cacheCb := func(string, uint32, *chunk.IterGen) {}
aggmetrics := mdata.NewAggMetrics(store, cacheCb, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
28 changes: 25 additions & 3 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/worldping-api/pkg/log"
)
Expand All @@ -21,6 +22,7 @@ import (
// AggMetric is concurrency-safe
type AggMetric struct {
store Store
cache cache.CacheCb
sync.RWMutex
Key string
CurrentChunkPos int // element in []Chunks that is active. All others are either finished or nil.
Expand All @@ -34,17 +36,18 @@ type AggMetric struct {

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
// it optionally also creates aggregations with the given settings
func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric {
func NewAggMetric(store Store, cache cache.CacheCb, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric {
m := AggMetric{
store: store,
cache: cache,
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
}
for _, as := range aggsetting {
m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
m.aggregators = append(m.aggregators, NewAggregator(store, cache, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
}

return &m
Expand Down Expand Up @@ -330,17 +333,35 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
}
}

func (a *AggMetric) pushToCache(c *chunk.Chunk) {
// push into cache
go a.cache(
a.Key,
0,
chunk.NewBareIterGen(
c.Bytes(),
c.T0,
a.ChunkSpan,
),
)
}

// write a chunk to persistent storage. This should only be called while holding a.Lock()
func (a *AggMetric) persist(pos int) {
chunk := a.Chunks[pos]

if !cluster.ThisNode.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is not primary, not saving chunk.")
}

// no need to continue persisting, we just push the chunk into the local cache and return
chunk.Finish()
a.pushToCache(chunk)
return
}

pre := time.Now()
chunk := a.Chunks[pos]

if chunk.Saved || chunk.Saving {
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call
Expand Down Expand Up @@ -405,6 +426,7 @@ func (a *AggMetric) persist(pos int) {
pending[pendingChunk].chunk.Finish()
a.store.Add(pending[pendingChunk])
pending[pendingChunk].chunk.Saving = true
a.pushToCache(pending[pendingChunk].chunk)
pendingChunk--
}
persistDuration.Value(time.Now().Sub(pre))
Expand Down
15 changes: 9 additions & 6 deletions mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/mdata/chunk"
)

var dnstore = NewDevnullStore()
Expand All @@ -25,6 +27,8 @@ type Checker struct {
points []point
}

var cacheCb cache.CacheCb = func(string, uint32, *chunk.IterGen) {}

func NewChecker(t *testing.T, agg *AggMetric) *Checker {
return &Checker{t, agg, make([]point, 0)}
}
Expand Down Expand Up @@ -72,7 +76,7 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) {
func TestAggMetric(t *testing.T) {
cluster.Init("default", "test", time.Now())

c := NewChecker(t, NewAggMetric(dnstore, "foo", 100, 5, 1, []AggSetting{}...))
c := NewChecker(t, NewAggMetric(dnstore, cacheCb, "foo", 100, 5, 1, []AggSetting{}...))

// basic case, single range
c.Add(101, 101)
Expand Down Expand Up @@ -170,7 +174,7 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, cacheCb, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := 3600 * 24 * uint32(b.N) // b.N in days
for t := uint32(1); t < maxT; t += 10 {
Expand All @@ -182,7 +186,6 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
}

func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) {

chunkSpan := uint32(600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
Expand All @@ -204,7 +207,7 @@ func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, cacheCb, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down Expand Up @@ -237,7 +240,7 @@ func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, cacheCb, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down Expand Up @@ -270,7 +273,7 @@ func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, cacheCb, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down
9 changes: 6 additions & 3 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"time"

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/worldping-api/pkg/log"
)

type AggMetrics struct {
store Store
store Store
cacheCb cache.CacheCb
sync.RWMutex
Metrics map[string]*AggMetric
chunkSpan uint32
Expand All @@ -21,9 +23,10 @@ type AggMetrics struct {
gcInterval time.Duration
}

func NewAggMetrics(store Store, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics {
func NewAggMetrics(store Store, cacheCb cache.CacheCb, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics {
ms := AggMetrics{
store: store,
cacheCb: cacheCb,
Metrics: make(map[string]*AggMetric),
chunkSpan: chunkSpan,
numChunks: numChunks,
Expand Down Expand Up @@ -92,7 +95,7 @@ func (ms *AggMetrics) GetOrCreate(key string) Metric {
ms.Lock()
m, ok := ms.Metrics[key]
if !ok {
m = NewAggMetric(ms.store, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...)
m = NewAggMetric(ms.store, ms.cacheCb, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...)
ms.Metrics[key] = m
metricsActive.Set(len(ms.Metrics))
}
Expand Down
15 changes: 9 additions & 6 deletions mdata/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package mdata

import "fmt"
import (
"fmt"
"github.com/raintank/metrictank/mdata/cache"
)

type AggSetting struct {
Span uint32 // in seconds, controls how many input points go into an aggregated point.
Expand Down Expand Up @@ -49,15 +52,15 @@ type Aggregator struct {
cntMetric *AggMetric
}

func NewAggregator(store Store, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator {
func NewAggregator(store Store, cacheCb cache.CacheCb, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator {
return &Aggregator{
key: key,
span: aggSpan,
agg: NewAggregation(),
minMetric: NewAggMetric(store, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
maxMetric: NewAggMetric(store, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
sumMetric: NewAggMetric(store, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
cntMetric: NewAggMetric(store, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
minMetric: NewAggMetric(store, cacheCb, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
maxMetric: NewAggMetric(store, cacheCb, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
sumMetric: NewAggMetric(store, cacheCb, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
cntMetric: NewAggMetric(store, cacheCb, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
}
}
func (agg *Aggregator) flush() {
Expand Down
12 changes: 7 additions & 5 deletions mdata/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mdata

import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/mdata/chunk"
"gopkg.in/raintank/schema.v1"
"testing"
"time"
Expand Down Expand Up @@ -38,6 +39,7 @@ func TestAggBoundary(t *testing.T) {

// note that values don't get "committed" to the metric until the aggregation interval is complete
func TestAggregator(t *testing.T) {
cacheCb := func(string, uint32, *chunk.IterGen) {}
cluster.Init("default", "test", time.Now())
compare := func(key string, metric Metric, expected []schema.Point) {
cluster.ThisNode.SetPrimary(true)
Expand All @@ -62,13 +64,13 @@ func TestAggregator(t *testing.T) {
}
cluster.ThisNode.SetPrimary(false)
}
agg := NewAggregator(dnstore, "test", 60, 120, 10, 86400)
agg := NewAggregator(dnstore, cacheCb, "test", 60, 120, 10, 86400)
agg.Add(100, 123.4)
agg.Add(110, 5)
expected := []schema.Point{}
compare("simple-min-unfinished", agg.minMetric, expected)

agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400)
agg = NewAggregator(dnstore, cacheCb, "test", 60, 120, 10, 86400)
agg.Add(100, 123.4)
agg.Add(110, 5)
agg.Add(130, 130)
Expand All @@ -77,7 +79,7 @@ func TestAggregator(t *testing.T) {
}
compare("simple-min-one-block", agg.minMetric, expected)

agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400)
agg = NewAggregator(dnstore, cacheCb, "test", 60, 120, 10, 86400)
agg.Add(100, 123.4)
agg.Add(110, 5)
agg.Add(120, 4)
Expand All @@ -86,7 +88,7 @@ func TestAggregator(t *testing.T) {
}
compare("simple-min-one-block-done-cause-last-point-just-right", agg.minMetric, expected)

agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400)
agg = NewAggregator(dnstore, cacheCb, "test", 60, 120, 10, 86400)
agg.Add(100, 123.4)
agg.Add(110, 5)
agg.Add(150, 1.123)
Expand All @@ -97,7 +99,7 @@ func TestAggregator(t *testing.T) {
}
compare("simple-min-two-blocks-done-cause-last-point-just-right", agg.minMetric, expected)

agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400)
agg = NewAggregator(dnstore, cacheCb, "test", 60, 120, 10, 86400)
agg.Add(100, 123.4)
agg.Add(110, 5)
agg.Add(190, 2451.123)
Expand Down
3 changes: 3 additions & 0 deletions mdata/cache/accnt/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
// metric cache.ops.chunk.hit is how many chunks were hit
CacheChunkHit = stats.NewCounter32("cache.ops.chunk.hit")

// metric cache.ops.chunk.push-hot is how many chunks have been pushed into the cache because their metric is hot
CacheChunkPushHot = stats.NewCounter32("cache.ops.chunk.push-hot")

// metric cache.ops.chunk.add is how many chunks were added to the cache
cacheChunkAdd = stats.NewCounter32("cache.ops.chunk.add")

Expand Down
Loading

0 comments on commit 5d37148

Please sign in to comment.