diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index bf24d2597c..e1fb5f39d8 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -545,8 +545,7 @@ func TestPrevBoundary(t *testing.T) { func TestGetSeriesFixed(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) store := mdata.NewDevnullStore() - metrics := mdata.NewAggMetrics(store, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{}) - Addr = "localhost:6060" + metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{}) srv, _ := NewServer() srv.BindBackendStore(store) srv.BindMemoryStore(metrics) @@ -1231,7 +1230,7 @@ func TestGetSeriesCachedStore(t *testing.T) { srv, _ := NewServer() store := mdata.NewMockStore() srv.BindBackendStore(store) - metrics := mdata.NewAggMetrics(store, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{}) + metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{}) srv.BindMemoryStore(metrics) metric := "metric1" var c *cache.CCache @@ -1407,8 +1406,7 @@ func TestGetSeriesAggMetrics(t *testing.T) { store := mdata.NewMockStore() chunkSpan := uint32(600) numChunks := uint32(10) - metrics := mdata.NewAggMetrics(store, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{}) - Addr = "localhost:6060" + metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{}) srv, _ := NewServer() srv.BindBackendStore(store) srv.BindMemoryStore(metrics) diff --git a/input/carbon/carbon_test.go b/input/carbon/carbon_test.go index 6973fae405..1f4d57221f 100644 --- a/input/carbon/carbon_test.go +++ b/input/carbon/carbon_test.go @@ -13,6 +13,7 @@ import ( "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx/memory" "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/metrictank/usage" "gopkg.in/raintank/schema.v1" ) @@ -20,7 +21,7 @@ import ( func Test_HandleMessage(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) store := mdata.NewDevnullStore() - aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) metricIndex := memory.New() metricIndex.Init() usage := usage.New(300, aggmetrics, metricIndex, clock.New()) diff --git a/input/input_test.go b/input/input_test.go index 0d80512a59..d40f54b09d 100644 --- a/input/input_test.go +++ b/input/input_test.go @@ -9,6 +9,7 @@ import ( "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx/memory" "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/metrictank/usage" "gopkg.in/raintank/schema.v1" ) @@ -16,7 +17,7 @@ import ( func Test_Process(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) store := mdata.NewDevnullStore() - aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) metricIndex := memory.New() metricIndex.Init() usage := usage.New(300, aggmetrics, metricIndex, clock.New()) @@ -87,7 +88,7 @@ func BenchmarkProcess(b *testing.B) { cluster.Init("default", "test", time.Now(), "http", 6060) store := mdata.NewDevnullStore() - aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) metricIndex := memory.New() metricIndex.Init() usage := usage.New(300, aggmetrics, metricIndex, clock.New()) diff --git a/input/kafkamdm/kafkamdm_test.go b/input/kafkamdm/kafkamdm_test.go index 1a35abe0c1..456415468d 100644 --- a/input/kafkamdm/kafkamdm_test.go +++ b/input/kafkamdm/kafkamdm_test.go @@ -10,6 +10,7 @@ import ( "github.com/raintank/metrictank/idx/memory" "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/metrictank/usage" "gopkg.in/raintank/schema.v1" @@ -18,7 +19,7 @@ import ( func Test_HandleMessage(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) store := mdata.NewDevnullStore() - aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) + aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0)) metricIndex := memory.New() metricIndex.Init() usage := usage.New(300, aggmetrics, metricIndex, clock.New()) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index a8032f9daa..86135040dc 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -8,6 +8,7 @@ import ( "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/consolidation" + "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/metrictank/mdata/chunk" "github.com/raintank/worldping-api/pkg/log" ) @@ -20,7 +21,8 @@ import ( // in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation // AggMetric is concurrency-safe type AggMetric struct { - store Store + store Store + cachePusher cache.CachePusher sync.RWMutex Key string CurrentChunkPos int // element in []Chunks that is active. All others are either finished or nil. @@ -34,17 +36,18 @@ type AggMetric struct { // NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long // it optionally also creates aggregations with the given settings -func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric { +func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric { m := AggMetric{ - store: store, - Key: key, - ChunkSpan: chunkSpan, - NumChunks: numChunks, - Chunks: make([]*chunk.Chunk, 0, numChunks), - ttl: ttl, + store: store, + cachePusher: cachePusher, + Key: key, + ChunkSpan: chunkSpan, + NumChunks: numChunks, + Chunks: make([]*chunk.Chunk, 0, numChunks), + ttl: ttl, } for _, as := range aggsetting { - m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl)) + m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl)) } return &m @@ -330,17 +333,35 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) { } } +func (a *AggMetric) pushToCache(c *chunk.Chunk) { + // push into cache + go a.cachePusher.CacheIfHot( + a.Key, + 0, + *chunk.NewBareIterGen( + c.Bytes(), + c.T0, + a.ChunkSpan, + ), + ) +} + // write a chunk to persistent storage. This should only be called while holding a.Lock() func (a *AggMetric) persist(pos int) { + chunk := a.Chunks[pos] + if !cluster.Manager.IsPrimary() { if LogLevel < 2 { log.Debug("AM persist(): node is not primary, not saving chunk.") } + + // no need to continue persisting, we just push the chunk into the local cache and return + chunk.Finish() + a.pushToCache(chunk) return } pre := time.Now() - chunk := a.Chunks[pos] if chunk.Saved || chunk.Saving { // this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call @@ -405,6 +426,7 @@ func (a *AggMetric) persist(pos int) { pending[pendingChunk].chunk.Finish() a.store.Add(pending[pendingChunk]) pending[pendingChunk].chunk.Saving = true + a.pushToCache(pending[pendingChunk].chunk) pendingChunk-- } persistDuration.Value(time.Now().Sub(pre)) diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index a38fa1cac2..3e9e1fae2f 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/raintank/metrictank/cluster" + "github.com/raintank/metrictank/mdata/cache" ) var dnstore = NewDevnullStore() @@ -69,10 +70,56 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) { cluster.Manager.SetPrimary(currentClusterStatus) } +func TestMetricPersistBeingPrimary(t *testing.T) { + testMetricPersistOptionalPrimary(t, true) +} + +func TestMetricPersistBeingSecondary(t *testing.T) { + testMetricPersistOptionalPrimary(t, false) +} + +func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { + // always reset the counter when entering and leaving the test + dnstore.Reset() + defer dnstore.Reset() + + cluster.Init("default", "test", time.Now(), "http", 6060) + cluster.Manager.SetPrimary(primary) + + mockCache := cache.NewMockCache() + + numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300) + agg := NewAggMetric(dnstore, mockCache, "foo", chunkSpan, numChunks, 1, []AggSetting{}...) + + for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan { + agg.Add(ts, 1) + } + + expectedHotAdds := int(chunkAddCount - 1) + + select { + case <-time.After(1 * time.Second): + adds := mockCache.GetAddsIfHot() + t.Fatalf("timed out waiting for %d chunk pushes. only saw %s", expectedHotAdds, adds) + case <-mockCache.AfterAddsIfHot(expectedHotAdds): + break + } + + if primary { + if dnstore.AddCount != chunkAddCount-1 { + t.Fatalf("there should have been %d chunk adds on store, but go %d", chunkAddCount-1, dnstore.AddCount) + } + } else { + if dnstore.AddCount != 0 { + t.Fatalf("there should have been %d chunk adds on store, but go %d", 0, dnstore.AddCount) + } + } +} + func TestAggMetric(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) - c := NewChecker(t, NewAggMetric(dnstore, "foo", 100, 5, 1, []AggSetting{}...)) + c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", 100, 5, 1, []AggSetting{}...)) // basic case, single range c.Add(101, 101) @@ -170,7 +217,7 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) + metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) maxT := 3600 * 24 * uint32(b.N) // b.N in days for t := uint32(1); t < maxT; t += 10 { @@ -182,7 +229,6 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { } func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) { - chunkSpan := uint32(600) numChunks := uint32(5) chunkMaxStale := uint32(3600) @@ -204,7 +250,7 @@ func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) + metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { @@ -237,7 +283,7 @@ func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) + metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { @@ -270,7 +316,7 @@ func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) + metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { diff --git a/mdata/aggmetrics.go b/mdata/aggmetrics.go index 0f1e4d4376..6dcafaaac1 100644 --- a/mdata/aggmetrics.go +++ b/mdata/aggmetrics.go @@ -5,11 +5,13 @@ import ( "time" "github.com/raintank/metrictank/cluster" + "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/worldping-api/pkg/log" ) type AggMetrics struct { - store Store + store Store + cachePusher cache.CachePusher sync.RWMutex Metrics map[string]*AggMetric chunkSpan uint32 @@ -21,9 +23,10 @@ type AggMetrics struct { gcInterval time.Duration } -func NewAggMetrics(store Store, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics { +func NewAggMetrics(store Store, cachePusher cache.CachePusher, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics { ms := AggMetrics{ store: store, + cachePusher: cachePusher, Metrics: make(map[string]*AggMetric), chunkSpan: chunkSpan, numChunks: numChunks, @@ -92,7 +95,7 @@ func (ms *AggMetrics) GetOrCreate(key string) Metric { ms.Lock() m, ok := ms.Metrics[key] if !ok { - m = NewAggMetric(ms.store, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...) + m = NewAggMetric(ms.store, ms.cachePusher, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...) ms.Metrics[key] = m metricsActive.Set(len(ms.Metrics)) } diff --git a/mdata/aggregator.go b/mdata/aggregator.go index c6bebf98d5..b57f60a5ea 100644 --- a/mdata/aggregator.go +++ b/mdata/aggregator.go @@ -1,6 +1,9 @@ package mdata -import "fmt" +import ( + "fmt" + "github.com/raintank/metrictank/mdata/cache" +) type AggSetting struct { Span uint32 // in seconds, controls how many input points go into an aggregated point. @@ -49,15 +52,15 @@ type Aggregator struct { cntMetric *AggMetric } -func NewAggregator(store Store, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator { +func NewAggregator(store Store, cachePusher cache.CachePusher, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator { return &Aggregator{ key: key, span: aggSpan, agg: NewAggregation(), - minMetric: NewAggMetric(store, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - maxMetric: NewAggMetric(store, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - sumMetric: NewAggMetric(store, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - cntMetric: NewAggMetric(store, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), + minMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), + maxMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), + sumMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), + cntMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), } } func (agg *Aggregator) flush() { diff --git a/mdata/aggregator_test.go b/mdata/aggregator_test.go index aa06f40a12..e432d3a647 100644 --- a/mdata/aggregator_test.go +++ b/mdata/aggregator_test.go @@ -2,6 +2,7 @@ package mdata import ( "github.com/raintank/metrictank/cluster" + "github.com/raintank/metrictank/mdata/cache" "gopkg.in/raintank/schema.v1" "testing" "time" @@ -62,13 +63,13 @@ func TestAggregator(t *testing.T) { } cluster.Manager.SetPrimary(false) } - agg := NewAggregator(dnstore, "test", 60, 120, 10, 86400) + agg := NewAggregator(dnstore, &cache.MockCache{}, "test", 60, 120, 10, 86400) agg.Add(100, 123.4) agg.Add(110, 5) expected := []schema.Point{} compare("simple-min-unfinished", agg.minMetric, expected) - agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400) + agg = NewAggregator(dnstore, &cache.MockCache{}, "test", 60, 120, 10, 86400) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(130, 130) @@ -77,7 +78,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-one-block", agg.minMetric, expected) - agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400) + agg = NewAggregator(dnstore, &cache.MockCache{}, "test", 60, 120, 10, 86400) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(120, 4) @@ -86,7 +87,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-one-block-done-cause-last-point-just-right", agg.minMetric, expected) - agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400) + agg = NewAggregator(dnstore, &cache.MockCache{}, "test", 60, 120, 10, 86400) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(150, 1.123) @@ -97,7 +98,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-two-blocks-done-cause-last-point-just-right", agg.minMetric, expected) - agg = NewAggregator(dnstore, "test", 60, 120, 10, 86400) + agg = NewAggregator(dnstore, &cache.MockCache{}, "test", 60, 120, 10, 86400) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(190, 2451.123) diff --git a/mdata/cache/accnt/stats.go b/mdata/cache/accnt/stats.go index ed792e43a4..64f9886d91 100644 --- a/mdata/cache/accnt/stats.go +++ b/mdata/cache/accnt/stats.go @@ -22,6 +22,9 @@ var ( // metric cache.ops.chunk.hit is how many chunks were hit CacheChunkHit = stats.NewCounter32("cache.ops.chunk.hit") + // metric cache.ops.chunk.push-hot is how many chunks have been pushed into the cache because their metric is hot + CacheChunkPushHot = stats.NewCounter32("cache.ops.chunk.push-hot") + // metric cache.ops.chunk.add is how many chunks were added to the cache cacheChunkAdd = stats.NewCounter32("cache.ops.chunk.add") diff --git a/mdata/cache/cache_mock.go b/mdata/cache/cache_mock.go new file mode 100644 index 0000000000..eae9358398 --- /dev/null +++ b/mdata/cache/cache_mock.go @@ -0,0 +1,75 @@ +package cache + +import ( + "github.com/raintank/metrictank/mdata/chunk" +) + +type MockCache struct { + adds int + addsIfHot int + stops int + searches int + ops chan func(mc *MockCache) // operations to execute serially + hooks []func() bool // hooks to execute after each operation, e.g. after each potential state change +} + +func NewMockCache() *MockCache { + mc := &MockCache{ + ops: make(chan func(mc *MockCache), 0), + } + go mc.loop() + return mc +} + +func (mc *MockCache) loop() { + for op := range mc.ops { + op(mc) + // execute hooks. when they return true they are "done" and can be removed + for i, hook := range mc.hooks { + if hook() { + mc.hooks = append(mc.hooks[:i], mc.hooks[i+1:]...) + } + } + } +} + +func (mc *MockCache) Add(m string, t uint32, i chunk.IterGen) { + mc.ops <- func(mc *MockCache) { mc.adds++ } +} + +func (mc *MockCache) CacheIfHot(m string, t uint32, i chunk.IterGen) { + mc.ops <- func(mc *MockCache) { mc.addsIfHot++ } +} + +func (mc *MockCache) GetAddsIfHot() int { + c := make(chan int) + mc.ops <- func(mc *MockCache) { c <- mc.addsIfHot } + return <-c +} + +func (mc *MockCache) Stop() { + mc.ops <- func(mc *MockCache) { + close(mc.ops) + mc.hooks = make([]func() bool, 0) + mc.stops++ + } +} + +func (mc *MockCache) Search(m string, f uint32, u uint32) *CCSearchResult { + mc.ops <- func(mc *MockCache) { mc.searches++ } + return nil +} + +func (mc *MockCache) AfterAddsIfHot(addsIfHot int) chan struct{} { + ret := make(chan struct{}) + mc.ops <- func(mc *MockCache) { + mc.hooks = append(mc.hooks, func() bool { + if mc.addsIfHot >= addsIfHot { + ret <- struct{}{} + return true + } + return false + }) + } + return ret +} diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index 593e84b702..9b7bb35ae3 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -58,6 +58,33 @@ func (c *CCache) evictLoop() { } } +// adds the given chunk to the cache, but only if the metric is sufficiently hot +func (c *CCache) CacheIfHot(metric string, prev uint32, itergen chunk.IterGen) { + c.RLock() + + var met *CCacheMetric + var ok bool + + // if this metric is not cached at all it is not hot + if met, ok = c.metricCache[metric]; !ok { + c.RUnlock() + return + } + + // if the previous chunk is not cached we consider the metric not hot enough to cache this chunk + // only works reliably if the last chunk of that metric is span aware, otherwise lastTs() will be guessed + // conservatively which means that the returned value will probably be lower than the real last ts + if met.lastTs() < itergen.Ts() { + c.RUnlock() + return + } + + accnt.CacheChunkPushHot.Inc() + + c.RUnlock() + c.Add(metric, prev, itergen) +} + func (c *CCache) Add(metric string, prev uint32, itergen chunk.IterGen) { c.Lock() defer c.Unlock() diff --git a/mdata/cache/ccache_metric.go b/mdata/cache/ccache_metric.go index 2c9134439d..9380f998fc 100644 --- a/mdata/cache/ccache_metric.go +++ b/mdata/cache/ccache_metric.go @@ -143,6 +143,14 @@ func (mc *CCacheMetric) nextTs(ts uint32) uint32 { } } +// returns the last Ts of this metric cache +// since ranges are exclusive at the end this is actually the first Ts that is not cached +func (mc *CCacheMetric) lastTs() uint32 { + mc.RLock() + defer mc.RUnlock() + return mc.nextTs(mc.keys[len(mc.keys)-1]) +} + // seekAsc finds the t0 of the chunk that contains ts, by searching from old to recent // if not found or can't be sure returns 0, false // assumes we already have at least a read lock diff --git a/mdata/cache/ccache_test.go b/mdata/cache/ccache_test.go index fed87a9bdd..dca56b35b6 100644 --- a/mdata/cache/ccache_test.go +++ b/mdata/cache/ccache_test.go @@ -8,7 +8,7 @@ import ( "github.com/raintank/metrictank/mdata/chunk" ) -func getItgen(t *testing.T, values []uint32, ts uint32, spanaware bool) *chunk.IterGen { +func getItgen(t *testing.T, values []uint32, ts uint32, spanaware bool) chunk.IterGen { var b []byte buf := new(bytes.Buffer) if spanaware { @@ -28,7 +28,7 @@ func getItgen(t *testing.T, values []uint32, ts uint32, spanaware bool) *chunk.I itgen, _ := chunk.NewGen(buf.Bytes(), ts) - return itgen + return *itgen } func getConnectedChunks(t *testing.T, metric string) *CCache { @@ -41,15 +41,135 @@ func getConnectedChunks(t *testing.T, metric string) *CCache { itgen4 := getItgen(t, values, 1015, false) itgen5 := getItgen(t, values, 1020, false) - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 1000, *itgen2) - cc.Add(metric, 1005, *itgen3) - cc.Add(metric, 1010, *itgen4) - cc.Add(metric, 1015, *itgen5) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + cc.Add(metric, 1005, itgen3) + cc.Add(metric, 1010, itgen4) + cc.Add(metric, 1015, itgen5) return cc } +// test AddIfHot method without passing a previous timestamp on a hot metric +func TestAddIfHotWithoutPrevTsOnHotMetric(t *testing.T) { + metric := "metric1" + cc := NewCCache() + + values := []uint32{1, 2, 3, 4, 5} + itgen1 := getItgen(t, values, 1000, false) + itgen2 := getItgen(t, values, 1005, false) + itgen3 := getItgen(t, values, 1010, false) + + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + + cc.CacheIfHot(metric, 0, itgen3) + + mc := cc.metricCache[metric] + + chunk, ok := mc.chunks[1010] + if !ok { + t.Fatalf("expected cache chunk to have been cached") + } + + if itgen3.Ts() != chunk.Ts { + t.Fatalf("cached chunk wasn't the expected one") + } + + if chunk.Prev != 1005 { + t.Fatalf("expected cache chunk's previous ts to be 1005, but got %d", chunk.Prev) + } + + if mc.chunks[chunk.Prev].Next != chunk.Ts { + t.Fatalf("previous cache chunk didn't point at this one as it's next, got %d", mc.chunks[chunk.Prev].Next) + } +} + +// test AddIfHot method without passing a previous timestamp on a cold metric +func TestAddIfHotWithoutPrevTsOnColdMetric(t *testing.T) { + metric := "metric1" + cc := NewCCache() + + values := []uint32{1, 2, 3, 4, 5} + itgen1 := getItgen(t, values, 1000, false) + itgen3 := getItgen(t, values, 1010, false) + + cc.Add(metric, 0, itgen1) + + cc.CacheIfHot(metric, 0, itgen3) + + mc := cc.metricCache[metric] + + _, ok := mc.chunks[1010] + if ok { + t.Fatalf("expected cache chunk to not have been cached") + } + + if mc.chunks[1000].Next != 0 { + t.Fatalf("previous cache chunk got wrongly connected with a following one, got %d", mc.chunks[1000].Next) + } +} + +// test AddIfHot method on a hot metric +func TestAddIfHotWithPrevTsOnHotMetric(t *testing.T) { + metric := "metric1" + cc := NewCCache() + + values := []uint32{1, 2, 3, 4, 5} + itgen1 := getItgen(t, values, 1000, false) + itgen2 := getItgen(t, values, 1005, false) + itgen3 := getItgen(t, values, 1010, false) + + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + + cc.CacheIfHot(metric, 1005, itgen3) + + mc := cc.metricCache[metric] + + chunk, ok := mc.chunks[1010] + if !ok { + t.Fatalf("expected cache chunk to have been cached") + } + + if itgen3.Ts() != chunk.Ts { + t.Fatalf("cached chunk wasn't the expected one") + } + + if chunk.Prev != 1005 { + t.Fatalf("expected cache chunk's previous ts to be 1005, but got %d", chunk.Prev) + } + + if mc.chunks[chunk.Prev].Next != chunk.Ts { + t.Fatalf("previous cache chunk didn't point at this one as it's next, got %d", mc.chunks[chunk.Prev].Next) + } +} + +// test AddIfHot method on a cold metric +func TestAddIfHotWithPrevTsOnColdMetric(t *testing.T) { + metric := "metric1" + cc := NewCCache() + + values := []uint32{1, 2, 3, 4, 5} + itgen1 := getItgen(t, values, 1000, false) + itgen3 := getItgen(t, values, 1010, false) + + cc.Add(metric, 0, itgen1) + + cc.CacheIfHot(metric, 1005, itgen3) + + mc := cc.metricCache[metric] + + _, ok := mc.chunks[1010] + if ok { + t.Fatalf("expected cache chunk to not have been cached") + } + + if mc.chunks[1000].Next != 0 { + t.Fatalf("previous cache chunk got wrongly connected with a following one, got %d", mc.chunks[1000].Next) + } +} + func TestConsecutiveAdding(t *testing.T) { metric := "metric1" cc := NewCCache() @@ -58,8 +178,8 @@ func TestConsecutiveAdding(t *testing.T) { itgen1 := getItgen(t, values, 1000, false) itgen2 := getItgen(t, values, 1005, false) - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 1000, *itgen2) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) mc := cc.metricCache[metric] chunk1, ok := mc.chunks[1000] @@ -95,9 +215,9 @@ func TestDisconnectedAdding(t *testing.T) { itgen2 := getItgen(t, values, 1005, true) itgen3 := getItgen(t, values, 1010, true) - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 0, *itgen2) - cc.Add(metric, 0, *itgen3) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 0, itgen2) + cc.Add(metric, 0, itgen3) res := cc.Search(metric, 900, 1015) @@ -129,9 +249,9 @@ func TestDisconnectedAddingByGuessing(t *testing.T) { itgen2 := getItgen(t, values, 1005, false) itgen3 := getItgen(t, values, 1010, false) - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 1000, *itgen2) - cc.Add(metric, 0, *itgen3) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + cc.Add(metric, 0, itgen3) res := cc.Search(metric, 900, 1015) @@ -259,19 +379,19 @@ func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { cc.Reset() if ascending { - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 1000, *itgen2) - cc.Add(metric, 1010, *itgen3) - cc.Add(metric, 0, *itgen4) - cc.Add(metric, 1030, *itgen5) - cc.Add(metric, 1040, *itgen6) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + cc.Add(metric, 1010, itgen3) + cc.Add(metric, 0, itgen4) + cc.Add(metric, 1030, itgen5) + cc.Add(metric, 1040, itgen6) } else { - cc.Add(metric, 0, *itgen6) - cc.Add(metric, 0, *itgen5) - cc.Add(metric, 0, *itgen4) - cc.Add(metric, 0, *itgen3) - cc.Add(metric, 0, *itgen2) - cc.Add(metric, 0, *itgen1) + cc.Add(metric, 0, itgen6) + cc.Add(metric, 0, itgen5) + cc.Add(metric, 0, itgen4) + cc.Add(metric, 0, itgen3) + cc.Add(metric, 0, itgen2) + cc.Add(metric, 0, itgen1) } res = cc.Search(metric, from, until) @@ -335,19 +455,19 @@ func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bo cc.Reset() if ascending { - cc.Add(metric, 0, *itgen1) - cc.Add(metric, 1000, *itgen2) - cc.Add(metric, 1010, *itgen3) - cc.Add(metric, 0, *itgen4) - cc.Add(metric, 1040, *itgen5) - cc.Add(metric, 1050, *itgen6) + cc.Add(metric, 0, itgen1) + cc.Add(metric, 1000, itgen2) + cc.Add(metric, 1010, itgen3) + cc.Add(metric, 0, itgen4) + cc.Add(metric, 1040, itgen5) + cc.Add(metric, 1050, itgen6) } else { - cc.Add(metric, 0, *itgen6) - cc.Add(metric, 0, *itgen5) - cc.Add(metric, 0, *itgen4) - cc.Add(metric, 0, *itgen3) - cc.Add(metric, 0, *itgen2) - cc.Add(metric, 0, *itgen1) + cc.Add(metric, 0, itgen6) + cc.Add(metric, 0, itgen5) + cc.Add(metric, 0, itgen4) + cc.Add(metric, 0, itgen3) + cc.Add(metric, 0, itgen2) + cc.Add(metric, 0, itgen1) } res = cc.Search(metric, from, until) diff --git a/mdata/cache/if.go b/mdata/cache/if.go index 637f7fe9c6..665255ca85 100644 --- a/mdata/cache/if.go +++ b/mdata/cache/if.go @@ -6,10 +6,15 @@ import ( type Cache interface { Add(string, uint32, chunk.IterGen) + CacheIfHot(string, uint32, chunk.IterGen) Stop() Search(string, uint32, uint32) *CCSearchResult } +type CachePusher interface { + CacheIfHot(string, uint32, chunk.IterGen) +} + type CCSearchResult struct { // if this result is Complete == false, then the following cassandra query // will need to use this value as from to fill in the missing data diff --git a/mdata/store_devnull.go b/mdata/store_devnull.go index b591167c79..ceeb5931fd 100644 --- a/mdata/store_devnull.go +++ b/mdata/store_devnull.go @@ -3,6 +3,7 @@ package mdata import "github.com/raintank/metrictank/mdata/chunk" type devnullStore struct { + AddCount uint32 } func NewDevnullStore() *devnullStore { @@ -11,6 +12,11 @@ func NewDevnullStore() *devnullStore { } func (c *devnullStore) Add(cwr *ChunkWriteRequest) { + c.AddCount++ +} + +func (c *devnullStore) Reset() { + c.AddCount = 0 } func (c *devnullStore) Search(key string, start, end uint32) ([]chunk.IterGen, error) { diff --git a/metrictank.go b/metrictank.go index 82e92f9f04..d9ef132c66 100644 --- a/metrictank.go +++ b/metrictank.go @@ -307,10 +307,15 @@ func main() { log.Fatal(4, "failed to initialize cassandra. %s", err) } + /*********************************** + Initialize the Chunk Cache + ***********************************/ + ccache := cache.NewCCache() + /*********************************** Initialize our MemoryStore ***********************************/ - metrics = mdata.NewAggMetrics(store, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, gcInterval, finalSettings) + metrics = mdata.NewAggMetrics(store, ccache, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, gcInterval, finalSettings) /*********************************** Initialize our Inputs @@ -352,7 +357,6 @@ func main() { /*********************************** Initialize our MetricIdx ***********************************/ - ccache := cache.NewCCache() pre := time.Now() if memory.Enabled {