Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

457 experiment #487

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,7 @@ func TestPrevBoundary(t *testing.T) {
func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
metrics := mdata.NewAggMetrics(store, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})
Addr = "localhost:6060"
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 0, 0, 0, 0, []mdata.AggSetting{})
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand Down Expand Up @@ -1231,7 +1230,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
srv, _ := NewServer()
store := mdata.NewMockStore()
srv.BindBackendStore(store)
metrics := mdata.NewAggMetrics(store, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 1, 1, 0, 0, 0, 0, []mdata.AggSetting{})
srv.BindMemoryStore(metrics)
metric := "metric1"
var c *cache.CCache
Expand Down Expand Up @@ -1407,8 +1406,7 @@ func TestGetSeriesAggMetrics(t *testing.T) {
store := mdata.NewMockStore()
chunkSpan := uint32(600)
numChunks := uint32(10)
metrics := mdata.NewAggMetrics(store, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})
Addr = "localhost:6060"
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, chunkSpan, numChunks, 0, 0, 0, 0, []mdata.AggSetting{})
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand Down
3 changes: 2 additions & 1 deletion input/carbon/carbon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)

func Test_HandleMessage(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
5 changes: 3 additions & 2 deletions input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)

func Test_Process(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down Expand Up @@ -87,7 +88,7 @@ func BenchmarkProcess(b *testing.B) {
cluster.Init("default", "test", time.Now(), "http", 6060)

store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
3 changes: 2 additions & 1 deletion input/kafkamdm/kafkamdm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/input"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/usage"

"gopkg.in/raintank/schema.v1"
Expand All @@ -18,7 +19,7 @@ import (
func Test_HandleMessage(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
aggmetrics := mdata.NewAggMetrics(store, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, 600, 10, 800, 8000, 10000, 0, make([]mdata.AggSetting, 0))
metricIndex := memory.New()
metricIndex.Init()
usage := usage.New(300, aggmetrics, metricIndex, clock.New())
Expand Down
42 changes: 32 additions & 10 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/worldping-api/pkg/log"
)
Expand All @@ -20,7 +21,8 @@ import (
// in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation
// AggMetric is concurrency-safe
type AggMetric struct {
store Store
store Store
cachePusher cache.CachePusher
sync.RWMutex
Key string
CurrentChunkPos int // element in []Chunks that is active. All others are either finished or nil.
Expand All @@ -34,17 +36,18 @@ type AggMetric struct {

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
// it optionally also creates aggregations with the given settings
func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric {
func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric {
m := AggMetric{
store: store,
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
store: store,
cachePusher: cachePusher,
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*chunk.Chunk, 0, numChunks),
ttl: ttl,
}
for _, as := range aggsetting {
m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
}

return &m
Expand Down Expand Up @@ -330,17 +333,35 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
}
}

func (a *AggMetric) pushToCache(c *chunk.Chunk) {
// push into cache
go a.cachePusher.CacheIfHot(
a.Key,
0,
*chunk.NewBareIterGen(
c.Bytes(),
c.T0,
a.ChunkSpan,
),
)
}

// write a chunk to persistent storage. This should only be called while holding a.Lock()
func (a *AggMetric) persist(pos int) {
chunk := a.Chunks[pos]

if !cluster.Manager.IsPrimary() {
if LogLevel < 2 {
log.Debug("AM persist(): node is not primary, not saving chunk.")
}

// no need to continue persisting, we just push the chunk into the local cache and return
chunk.Finish()
a.pushToCache(chunk)
return
}

pre := time.Now()
chunk := a.Chunks[pos]

if chunk.Saved || chunk.Saving {
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call
Expand Down Expand Up @@ -405,6 +426,7 @@ func (a *AggMetric) persist(pos int) {
pending[pendingChunk].chunk.Finish()
a.store.Add(pending[pendingChunk])
pending[pendingChunk].chunk.Saving = true
a.pushToCache(pending[pendingChunk].chunk)
pendingChunk--
}
persistDuration.Value(time.Now().Sub(pre))
Expand Down
58 changes: 52 additions & 6 deletions mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/mdata/cache"
)

var dnstore = NewDevnullStore()
Expand Down Expand Up @@ -69,10 +70,56 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) {
cluster.Manager.SetPrimary(currentClusterStatus)
}

func TestMetricPersistBeingPrimary(t *testing.T) {
testMetricPersistOptionalPrimary(t, true)
}

func TestMetricPersistBeingSecondary(t *testing.T) {
testMetricPersistOptionalPrimary(t, false)
}

func testMetricPersistOptionalPrimary(t *testing.T, primary bool) {
// always reset the counter when entering and leaving the test
dnstore.Reset()
defer dnstore.Reset()

cluster.Init("default", "test", time.Now(), "http", 6060)
cluster.Manager.SetPrimary(primary)

mockCache := cache.NewMockCache()

numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300)
agg := NewAggMetric(dnstore, mockCache, "foo", chunkSpan, numChunks, 1, []AggSetting{}...)

for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan {
agg.Add(ts, 1)
}

expectedHotAdds := int(chunkAddCount - 1)

select {
case <-time.After(1 * time.Second):
adds := mockCache.GetAddsIfHot()
t.Fatalf("timed out waiting for %d chunk pushes. only saw %s", expectedHotAdds, adds)
case <-mockCache.AfterAddsIfHot(expectedHotAdds):
break
}

if primary {
if dnstore.AddCount != chunkAddCount-1 {
t.Fatalf("there should have been %d chunk adds on store, but go %d", chunkAddCount-1, dnstore.AddCount)
}
} else {
if dnstore.AddCount != 0 {
t.Fatalf("there should have been %d chunk adds on store, but go %d", 0, dnstore.AddCount)
}
}
}

func TestAggMetric(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)

c := NewChecker(t, NewAggMetric(dnstore, "foo", 100, 5, 1, []AggSetting{}...))
c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", 100, 5, 1, []AggSetting{}...))

// basic case, single range
c.Add(101, 101)
Expand Down Expand Up @@ -170,7 +217,7 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := 3600 * 24 * uint32(b.N) // b.N in days
for t := uint32(1); t < maxT; t += 10 {
Expand All @@ -182,7 +229,6 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
}

func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) {

chunkSpan := uint32(600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
Expand All @@ -204,7 +250,7 @@ func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down Expand Up @@ -237,7 +283,7 @@ func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down Expand Up @@ -270,7 +316,7 @@ func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(dnstore, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)
metrics := NewAggMetrics(dnstore, &cache.MockCache{}, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, 0, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
Expand Down
9 changes: 6 additions & 3 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"time"

"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/mdata/cache"
"github.com/raintank/worldping-api/pkg/log"
)

type AggMetrics struct {
store Store
store Store
cachePusher cache.CachePusher
sync.RWMutex
Metrics map[string]*AggMetric
chunkSpan uint32
Expand All @@ -21,9 +23,10 @@ type AggMetrics struct {
gcInterval time.Duration
}

func NewAggMetrics(store Store, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics {
func NewAggMetrics(store Store, cachePusher cache.CachePusher, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics {
ms := AggMetrics{
store: store,
cachePusher: cachePusher,
Metrics: make(map[string]*AggMetric),
chunkSpan: chunkSpan,
numChunks: numChunks,
Expand Down Expand Up @@ -92,7 +95,7 @@ func (ms *AggMetrics) GetOrCreate(key string) Metric {
ms.Lock()
m, ok := ms.Metrics[key]
if !ok {
m = NewAggMetric(ms.store, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...)
m = NewAggMetric(ms.store, ms.cachePusher, key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...)
ms.Metrics[key] = m
metricsActive.Set(len(ms.Metrics))
}
Expand Down
15 changes: 9 additions & 6 deletions mdata/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package mdata

import "fmt"
import (
"fmt"
"github.com/raintank/metrictank/mdata/cache"
)

type AggSetting struct {
Span uint32 // in seconds, controls how many input points go into an aggregated point.
Expand Down Expand Up @@ -49,15 +52,15 @@ type Aggregator struct {
cntMetric *AggMetric
}

func NewAggregator(store Store, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator {
func NewAggregator(store Store, cachePusher cache.CachePusher, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator {
return &Aggregator{
key: key,
span: aggSpan,
agg: NewAggregation(),
minMetric: NewAggMetric(store, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
maxMetric: NewAggMetric(store, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
sumMetric: NewAggMetric(store, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
cntMetric: NewAggMetric(store, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
minMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
maxMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
sumMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
cntMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl),
}
}
func (agg *Aggregator) flush() {
Expand Down
Loading