From 96ebc7d27e3ad15bd3fdce48349cc43ca40f2e2c Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 14 Feb 2018 17:56:00 +0800 Subject: [PATCH] move backend stores into their own package As backend stores are pluggable, they should be in their own package. Otherwise, their global variables get initilized when mdata is loaded. --- api/ccache_test.go | 3 +- api/dataprocessor_test.go | 5 +- cmd/mt-schemas-explain/main.go | 4 +- cmd/mt-split-metrics-by-ttl/main.go | 4 +- cmd/mt-store-cat/full.go | 14 ++--- cmd/mt-store-cat/main.go | 4 +- cmd/mt-store-cat/metrics.go | 6 +- cmd/mt-store-cat/series.go | 8 +-- cmd/mt-store-cat/tables.go | 4 +- cmd/mt-view-boundaries/main.go | 4 +- cmd/mt-whisper-importer-writer/main.go | 12 ++-- input/input_test.go | 5 +- mdata/aggmetric_test.go | 57 ++++++++++++------- mdata/aggregator_test.go | 10 ++-- mdata/cwr.go | 11 ---- mdata/ifaces.go | 11 ++++ mdata/store.go | 15 ----- mdata/store_mock.go | 23 ++++++-- metrictank.go | 3 +- {mdata => store/cassandra}/store_cassandra.go | 30 +++++----- .../cassandra}/store_cassandra_test.go | 2 +- {mdata => store}/store_devnull.go | 5 +- 22 files changed, 133 insertions(+), 107 deletions(-) delete mode 100644 mdata/store.go rename {mdata => store/cassandra}/store_cassandra.go (96%) rename {mdata => store/cassandra}/store_cassandra_test.go (99%) rename {mdata => store}/store_devnull.go (82%) diff --git a/api/ccache_test.go b/api/ccache_test.go index 2b0e9a3935..a4dbd8bd70 100644 --- a/api/ccache_test.go +++ b/api/ccache_test.go @@ -25,7 +25,8 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) { mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max) mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true)) - store := mdata.NewDevnullStore() + store := mdata.NewMockStore() + store.Drop = true srv.BindBackendStore(store) mockCache := cache.NewMockCache() diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 055cbf9949..baac8cd967 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -340,7 +340,8 @@ func TestPrevBoundary(t *testing.T) { func TestGetSeriesFixed(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) - store := mdata.NewDevnullStore() + store := mdata.NewMockStore() + store.Drop = true mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max) mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true)) @@ -746,7 +747,7 @@ func TestGetSeriesCachedStore(t *testing.T) { // stop cache go routines before reinstantiating it at the top of the loop c.Stop() - store.ResetMock() + store.Reset() } } } diff --git a/cmd/mt-schemas-explain/main.go b/cmd/mt-schemas-explain/main.go index 93c8b67b08..0324e9dc77 100644 --- a/cmd/mt-schemas-explain/main.go +++ b/cmd/mt-schemas-explain/main.go @@ -9,7 +9,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/grafana/metrictank/conf" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" ) var ( @@ -65,7 +65,7 @@ func main() { fmt.Printf("retentions:%10s %10s %10s %10s %10s %15s %10s\n", "interval", "retention", "chunkspan", "numchunks", "ready", "tablename", "windowsize") for _, ret := range schema.Retentions { retention := ret.MaxRetention() - table := mdata.GetTTLTable(uint32(retention), *windowFactor, mdata.Table_name_format) + table := cassandra.GetTTLTable(uint32(retention), *windowFactor, cassandra.Table_name_format) retStr := time.Duration(time.Duration(retention) * time.Second).String() if retention%(3600*24) == 0 { retStr = fmt.Sprintf("%dd", retention/3600/24) diff --git a/cmd/mt-split-metrics-by-ttl/main.go b/cmd/mt-split-metrics-by-ttl/main.go index 9f783bada6..2533e24463 100644 --- a/cmd/mt-split-metrics-by-ttl/main.go +++ b/cmd/mt-split-metrics-by-ttl/main.go @@ -8,7 +8,7 @@ import ( "path" "strings" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" "github.com/raintank/dur" ) @@ -69,7 +69,7 @@ func main() { panic(fmt.Sprintf("Error creating directory: %s", err)) } - store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls) + store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls) if err != nil { panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err)) } diff --git a/cmd/mt-store-cat/full.go b/cmd/mt-store-cat/full.go index f1564924be..4295888562 100644 --- a/cmd/mt-store-cat/full.go +++ b/cmd/mt-store-cat/full.go @@ -7,19 +7,19 @@ import ( "strings" "time" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" ) -func chunkSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error { +func chunkSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error { now := uint32(time.Now().Unix()) - end_month := now - (now % mdata.Month_sec) + end_month := now - (now % cassandra.Month_sec) for _, tbl := range tables { - // actual TTL may be up to 2x what's in tablename. see mdata/store_cassandra.go for details + // actual TTL may be up to 2x what's in tablename. see store/cassandra/store_cassandra.go for details // we query up to 4x so that we also include data that should have been dropped already but still sticks around for whatever reason. TTLHours, _ := strconv.Atoi(strings.Split(tbl, "_")[1]) start := now - uint32(4*TTLHours*3600) - start_month := start - (start % mdata.Month_sec) + start_month := start - (start % cassandra.Month_sec) fmt.Println("## Table", tbl) if len(metrics) == 0 { query := fmt.Sprintf("select key, ttl(data) from %s", tbl) @@ -27,8 +27,8 @@ func chunkSummary(ctx context.Context, store *mdata.CassandraStore, tables []str showKeyTTL(iter, groupTTL) } else { for _, metric := range metrics { - for month := start_month; month <= end_month; month += mdata.Month_sec { - row_key := fmt.Sprintf("%s_%d", metric.id, month/mdata.Month_sec) + for month := start_month; month <= end_month; month += cassandra.Month_sec { + row_key := fmt.Sprintf("%s_%d", metric.id, month/cassandra.Month_sec) query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl) iter := store.Session.Query(query, row_key).Iter() showKeyTTL(iter, groupTTL) diff --git a/cmd/mt-store-cat/main.go b/cmd/mt-store-cat/main.go index 7565de5989..c0085dcc77 100644 --- a/cmd/mt-store-cat/main.go +++ b/cmd/mt-store-cat/main.go @@ -13,7 +13,7 @@ import ( "github.com/grafana/metrictank/conf" opentracing "github.com/opentracing/opentracing-go" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" "github.com/raintank/dur" "github.com/rakyll/globalconf" ) @@ -160,7 +160,7 @@ func main() { } } - store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil) + store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil) if err != nil { log.Fatal(4, "failed to initialize cassandra. %s", err) } diff --git a/cmd/mt-store-cat/metrics.go b/cmd/mt-store-cat/metrics.go index 611ff5cf91..93e873d952 100644 --- a/cmd/mt-store-cat/metrics.go +++ b/cmd/mt-store-cat/metrics.go @@ -5,7 +5,7 @@ import ( "sort" "strings" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" ) type Metric struct { @@ -20,7 +20,7 @@ func (m MetricsByName) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m MetricsByName) Less(i, j int) bool { return m[i].name < m[j].name } // prefix is optional -func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) { +func getMetrics(store *cassandra.CassandraStore, prefix string) ([]Metric, error) { var metrics []Metric iter := store.Session.Query("select id, metric from metric_idx").Iter() var m Metric @@ -37,7 +37,7 @@ func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) { return metrics, nil } -func getMetric(store *mdata.CassandraStore, id string) ([]Metric, error) { +func getMetric(store *cassandra.CassandraStore, id string) ([]Metric, error) { var metrics []Metric iter := store.Session.Query("select id, metric from metric_idx where id=? ALLOW FILTERING", id).Iter() var m Metric diff --git a/cmd/mt-store-cat/series.go b/cmd/mt-store-cat/series.go index e3be2b6d71..2988eefb0f 100644 --- a/cmd/mt-store-cat/series.go +++ b/cmd/mt-store-cat/series.go @@ -8,12 +8,12 @@ import ( "time" "github.com/grafana/metrictank/api" - "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/chunk" + "github.com/grafana/metrictank/store/cassandra" "gopkg.in/raintank/schema.v1" ) -func points(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) { +func points(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) { for _, metric := range metrics { fmt.Println("## Metric", metric) for _, table := range tables { @@ -32,7 +32,7 @@ func points(ctx context.Context, store *mdata.CassandraStore, tables []string, m } } -func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) { +func pointSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) { for _, metric := range metrics { fmt.Println("## Metric", metric) for _, table := range tables { @@ -51,7 +51,7 @@ func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []str } } -func getSeries(ctx context.Context, store *mdata.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point { +func getSeries(ctx context.Context, store *cassandra.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point { var points []schema.Point itgens, err := store.SearchTable(ctx, id, table, fromUnix, toUnix) if err != nil { diff --git a/cmd/mt-store-cat/tables.go b/cmd/mt-store-cat/tables.go index 9c4aa501b1..f7830e8d61 100644 --- a/cmd/mt-store-cat/tables.go +++ b/cmd/mt-store-cat/tables.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/store/cassandra" ) type TablesByTTL []string @@ -19,7 +19,7 @@ func (t TablesByTTL) Less(i, j int) bool { return iTTL < jTTL } -func getTables(store *mdata.CassandraStore, keyspace string, match string) ([]string, error) { +func getTables(store *cassandra.CassandraStore, keyspace string, match string) ([]string, error) { var tables []string meta, err := store.Session.KeyspaceMetadata(keyspace) if err != nil { diff --git a/cmd/mt-view-boundaries/main.go b/cmd/mt-view-boundaries/main.go index 6dd3c0dede..a40e5c68ae 100644 --- a/cmd/mt-view-boundaries/main.go +++ b/cmd/mt-view-boundaries/main.go @@ -7,8 +7,8 @@ import ( "runtime" "time" - "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/chunk" + "github.com/grafana/metrictank/store/cassandra" "github.com/raintank/dur" ) @@ -54,7 +54,7 @@ func main() { return } - display(int64(mdata.Month_sec), "cassandra rowkey month") + display(int64(cassandra.Month_sec), "cassandra rowkey month") if *spanStr != "" { span := dur.MustParseNDuration("span", *spanStr) diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 1f8e744617..d25e9dd27b 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -16,9 +16,9 @@ import ( "github.com/grafana/metrictank/cluster/partitioner" "github.com/grafana/metrictank/idx" "github.com/grafana/metrictank/idx/cassandra" - "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/chunk" "github.com/grafana/metrictank/mdata/chunk/archive" + cassandraStore "github.com/grafana/metrictank/store/cassandra" "github.com/raintank/dur" ) @@ -95,7 +95,7 @@ var ( type Server struct { Session *gocql.Session - TTLTables mdata.TTLTables + TTLTables cassandraStore.TTLTables Partitioner partitioner.Partitioner Index idx.MetricIndex HTTPServer *http.Server @@ -149,7 +149,7 @@ func main() { log.SetLevel(log.InfoLevel) } - store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil) + store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil) if err != nil { panic(fmt.Sprintf("Failed to initialize cassandra: %q", err)) } @@ -159,7 +159,7 @@ func main() { for _, split := range splits { ttls = append(ttls, dur.MustParseNDuration("ttl", split)) } - ttlTables := mdata.GetTTLTables(ttls, *windowFactor, mdata.Table_name_format) + ttlTables := cassandraStore.GetTTLTables(ttls, *windowFactor, cassandraStore.Table_name_format) p, err := partitioner.NewKafka(*partitionScheme) if err != nil { @@ -258,11 +258,11 @@ func (s *Server) insertChunks(table, id string, ttl uint32, itergens []chunk.Ite } log.Debug(query) for _, ig := range itergens { - rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/mdata.Month_sec) + rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/cassandraStore.Month_sec) success := false attempts := 0 for !success { - err := s.Session.Query(query, rowKey, ig.Ts, mdata.PrepareChunkData(ig.Span, ig.Bytes())).Exec() + err := s.Session.Query(query, rowKey, ig.Ts, cassandraStore.PrepareChunkData(ig.Span, ig.Bytes())).Exec() if err != nil { if (attempts % 20) == 0 { log.Warnf("CS: failed to save chunk to cassandra after %d attempts. %s", attempts+1, err) diff --git a/input/input_test.go b/input/input_test.go index 197dc813de..97aa7a48d5 100644 --- a/input/input_test.go +++ b/input/input_test.go @@ -10,13 +10,14 @@ import ( "github.com/grafana/metrictank/idx/memory" "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/cache" + backendStore "github.com/grafana/metrictank/store" "gopkg.in/raintank/schema.v1" ) func BenchmarkProcessUniqueMetrics(b *testing.B) { cluster.Init("default", "test", time.Now(), "http", 6060) - store := mdata.NewDevnullStore() + store := backendStore.NewDevnullStore() mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true)) mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max) @@ -54,7 +55,7 @@ func BenchmarkProcessUniqueMetrics(b *testing.B) { func BenchmarkProcessSameMetric(b *testing.B) { cluster.Init("default", "test", time.Now(), "http", 6060) - store := mdata.NewDevnullStore() + store := backendStore.NewDevnullStore() mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true)) mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max) diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 9c463190fa..56106cc416 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -13,7 +13,7 @@ import ( "github.com/grafana/metrictank/test" ) -var dnstore = NewDevnullStore() +var mockstore = NewMockStore() type point struct { ts uint32 @@ -109,8 +109,8 @@ func TestMetricPersistBeingSecondary(t *testing.T) { func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { // always reset the counter when entering and leaving the test - dnstore.Reset() - defer dnstore.Reset() + mockstore.Reset() + defer mockstore.Reset() cluster.Init("default", "test", time.Now(), "http", 6060) cluster.Manager.SetPrimary(primary) @@ -123,7 +123,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)} - agg := NewAggMetric(dnstore, &mockCache, "foo", ret, 0, nil, false) + agg := NewAggMetric(mockstore, &mockCache, "foo", ret, 0, nil, false) for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan { agg.Add(ts, 1) @@ -145,12 +145,12 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { } if primary { - if dnstore.AddCount != chunkAddCount-1 { - t.Fatalf("there should have been %d chunk adds on store, but go %d", chunkAddCount-1, dnstore.AddCount) + if uint32(mockstore.Items()) != chunkAddCount-1 { + t.Fatalf("there should have been %d chunk adds on store, but got %d", chunkAddCount-1, mockstore.Items()) } } else { - if dnstore.AddCount != 0 { - t.Fatalf("there should have been %d chunk adds on store, but go %d", 0, dnstore.AddCount) + if mockstore.Items() != 0 { + t.Fatalf("there should have been %d chunk adds on store, but go %d", 0, mockstore.Items()) } } } @@ -159,7 +159,7 @@ func TestAggMetric(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) ret := []conf.Retention{conf.NewRetentionMT(1, 1, 100, 5, true)} - c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", ret, 0, nil, false)) + c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, "foo", ret, 0, nil, false)) // basic case, single range c.Add(101, 101) @@ -237,7 +237,7 @@ func TestAggMetricWithReorderBuffer(t *testing.T) { AggregationMethod: []conf.Method{conf.Avg}, } ret := []conf.Retention{conf.NewRetentionMT(1, 1, 100, 5, true)} - c := NewChecker(t, NewAggMetric(dnstore, &cache.MockCache{}, "foo", ret, 10, &agg, false)) + c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, "foo", ret, 10, &agg, false)) // basic adds and verifies with test data c.Add(101, 101) @@ -273,11 +273,11 @@ func TestAggMetricWithReorderBuffer(t *testing.T) { func TestAggMetricDropFirstChunk(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) cluster.Manager.SetPrimary(true) - store := NewMockStore() + mockstore.Reset() chunkSpan := uint32(10) numChunks := uint32(5) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, true)} - m := NewAggMetric(store, &cache.MockCache{}, "foo", ret, 0, nil, true) + m := NewAggMetric(mockstore, &cache.MockCache{}, "foo", ret, 0, nil, true) m.Add(10, 10) m.Add(11, 11) m.Add(12, 12) @@ -288,7 +288,7 @@ func TestAggMetricDropFirstChunk(t *testing.T) { m.Add(31, 31) m.Add(32, 32) m.Add(40, 40) - itgens, err := store.Search(test.NewContext(), "foo", 0, 0, 1000) + itgens, err := mockstore.Search(test.NewContext(), "foo", 0, 0, 1000) if err != nil { t.Fatal(err) } @@ -307,6 +307,11 @@ func TestAggMetricDropFirstChunk(t *testing.T) { // TODO update once we clean old data, then we should look at numChunks func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { cluster.Init("default", "test", time.Now(), "http", 6060) + mockstore.Reset() + mockstore.Drop = true + defer func() { + mockstore.Drop = false + }() // we will store 10s metrics in 5 chunks of 2 hours // aggregate them in 5min buckets, stored in 1 chunk of 24hours SetSingleAgg(conf.Avg, conf.Min, conf.Max) @@ -322,7 +327,7 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) + metrics := NewAggMetrics(mockstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) maxT := 3600 * 24 * uint32(b.N) // b.N in days for t := uint32(1); t < maxT; t += 10 { @@ -337,7 +342,11 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) { chunkMaxStale := uint32(3600) metricMaxStale := uint32(21600) - + mockstore.Reset() + mockstore.Drop = true + defer func() { + mockstore.Drop = false + }() SetSingleAgg(conf.Avg, conf.Min, conf.Max) SetSingleSchema( conf.NewRetentionMT(1, 84600, 600, 5, true), @@ -351,7 +360,7 @@ func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) + metrics := NewAggMetrics(mockstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { @@ -366,7 +375,11 @@ func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) { func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) { chunkMaxStale := uint32(3600) metricMaxStale := uint32(21600) - + mockstore.Reset() + mockstore.Drop = true + defer func() { + mockstore.Drop = false + }() SetSingleAgg(conf.Avg, conf.Min, conf.Max) SetSingleSchema( conf.NewRetentionMT(1, 84600, 600, 5, true), @@ -380,7 +393,7 @@ func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) + metrics := NewAggMetrics(mockstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { @@ -395,7 +408,11 @@ func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) { func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) { chunkMaxStale := uint32(3600) metricMaxStale := uint32(21600) - + mockstore.Reset() + mockstore.Drop = true + defer func() { + mockstore.Drop = false + }() SetSingleAgg(conf.Avg, conf.Min, conf.Max) SetSingleSchema( conf.NewRetentionMT(1, 84600, 600, 5, true), @@ -409,7 +426,7 @@ func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) { keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i) } - metrics := NewAggMetrics(dnstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) + metrics := NewAggMetrics(mockstore, &cache.MockCache{}, false, chunkMaxStale, metricMaxStale, 0) maxT := uint32(1200) for t := uint32(1); t < maxT; t += 10 { diff --git a/mdata/aggregator_test.go b/mdata/aggregator_test.go index 275ee24278..cc3b6c57fa 100644 --- a/mdata/aggregator_test.go +++ b/mdata/aggregator_test.go @@ -70,13 +70,13 @@ func TestAggregator(t *testing.T) { AggregationMethod: []conf.Method{conf.Avg, conf.Min, conf.Max, conf.Sum, conf.Lst}, } - agg := NewAggregator(dnstore, &cache.MockCache{}, "test", ret, aggs, false) + agg := NewAggregator(mockstore, &cache.MockCache{}, "test", ret, aggs, false) agg.Add(100, 123.4) agg.Add(110, 5) expected := []schema.Point{} compare("simple-min-unfinished", agg.minMetric, expected) - agg = NewAggregator(dnstore, &cache.MockCache{}, "test", ret, aggs, false) + agg = NewAggregator(mockstore, &cache.MockCache{}, "test", ret, aggs, false) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(130, 130) @@ -85,7 +85,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-one-block", agg.minMetric, expected) - agg = NewAggregator(dnstore, &cache.MockCache{}, "test", ret, aggs, false) + agg = NewAggregator(mockstore, &cache.MockCache{}, "test", ret, aggs, false) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(120, 4) @@ -94,7 +94,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-one-block-done-cause-last-point-just-right", agg.minMetric, expected) - agg = NewAggregator(dnstore, &cache.MockCache{}, "test", ret, aggs, false) + agg = NewAggregator(mockstore, &cache.MockCache{}, "test", ret, aggs, false) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(150, 1.123) @@ -105,7 +105,7 @@ func TestAggregator(t *testing.T) { } compare("simple-min-two-blocks-done-cause-last-point-just-right", agg.minMetric, expected) - agg = NewAggregator(dnstore, &cache.MockCache{}, "test", ret, aggs, false) + agg = NewAggregator(mockstore, &cache.MockCache{}, "test", ret, aggs, false) agg.Add(100, 123.4) agg.Add(110, 5) agg.Add(190, 2451.123) diff --git a/mdata/cwr.go b/mdata/cwr.go index 2843847e08..4f96e5264b 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -1,22 +1,11 @@ package mdata import ( - "context" "time" "github.com/grafana/metrictank/mdata/chunk" ) -type ChunkReadRequest struct { - month uint32 - sortKey uint32 - q string - p []interface{} - timestamp time.Time - out chan outcome - ctx context.Context -} - type ChunkWriteRequest struct { Metric *AggMetric Key string diff --git a/mdata/ifaces.go b/mdata/ifaces.go index ab0e667de3..074089c9cc 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -1,7 +1,11 @@ package mdata import ( + "context" + "github.com/grafana/metrictank/consolidation" + "github.com/grafana/metrictank/mdata/chunk" + opentracing "github.com/opentracing/opentracing-go" ) type Metrics interface { @@ -14,3 +18,10 @@ type Metric interface { Get(from, to uint32) Result GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) } + +type Store interface { + Add(cwr *ChunkWriteRequest) + Search(ctx context.Context, key string, ttl, start, end uint32) ([]chunk.IterGen, error) + Stop() + SetTracer(t opentracing.Tracer) +} diff --git a/mdata/store.go b/mdata/store.go deleted file mode 100644 index 62fd9685e9..0000000000 --- a/mdata/store.go +++ /dev/null @@ -1,15 +0,0 @@ -package mdata - -import ( - "context" - - "github.com/grafana/metrictank/mdata/chunk" - opentracing "github.com/opentracing/opentracing-go" -) - -type Store interface { - Add(cwr *ChunkWriteRequest) - Search(ctx context.Context, key string, ttl, start, end uint32) ([]chunk.IterGen, error) - Stop() - SetTracer(t opentracing.Tracer) -} diff --git a/mdata/store_mock.go b/mdata/store_mock.go index 724c756141..4ebfa6f77d 100644 --- a/mdata/store_mock.go +++ b/mdata/store_mock.go @@ -12,20 +12,35 @@ import ( type MockStore struct { // the itgens to be searched and returned, indexed by metric results map[string][]chunk.IterGen + // count of chunks in the store. + items int + // dont save any data. + Drop bool } func NewMockStore() *MockStore { - return &MockStore{make(map[string][]chunk.IterGen)} + return &MockStore{ + results: make(map[string][]chunk.IterGen), + Drop: false, + } } -func (c *MockStore) ResetMock() { +func (c *MockStore) Reset() { c.results = make(map[string][]chunk.IterGen) + c.items = 0 +} + +func (c *MockStore) Items() int { + return c.items } // Add adds a chunk to the store func (c *MockStore) Add(cwr *ChunkWriteRequest) { - itgen := chunk.NewBareIterGen(cwr.Chunk.Series.Bytes(), cwr.Chunk.Series.T0, cwr.Span) - c.results[cwr.Key] = append(c.results[cwr.Key], *itgen) + if !c.Drop { + itgen := chunk.NewBareIterGen(cwr.Chunk.Series.Bytes(), cwr.Chunk.Series.T0, cwr.Span) + c.results[cwr.Key] = append(c.results[cwr.Key], *itgen) + c.items++ + } } // searches through the mock results and returns the right ones according to start / end diff --git a/metrictank.go b/metrictank.go index 720a0e31c5..4c0fe371be 100644 --- a/metrictank.go +++ b/metrictank.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/metrictank/mdata/notifierNsq" "github.com/grafana/metrictank/stats" statsConfig "github.com/grafana/metrictank/stats/config" + cassandraStore "github.com/grafana/metrictank/store/cassandra" "github.com/raintank/dur" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" @@ -271,7 +272,7 @@ func main() { /*********************************** Initialize our backendStore ***********************************/ - store, err = mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, mdata.TTLs()) + store, err = cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, mdata.TTLs()) if err != nil { log.Fatal(4, "failed to initialize cassandra. %s", err) } diff --git a/mdata/store_cassandra.go b/store/cassandra/store_cassandra.go similarity index 96% rename from mdata/store_cassandra.go rename to store/cassandra/store_cassandra.go index 5196f50b05..5fb0b7c11b 100644 --- a/mdata/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -1,4 +1,4 @@ -package mdata +package cassandra import ( "bytes" @@ -13,6 +13,7 @@ import ( "github.com/gocql/gocql" "github.com/grafana/metrictank/cassandra" + "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/chunk" "github.com/grafana/metrictank/stats" "github.com/grafana/metrictank/tracing" @@ -80,12 +81,15 @@ var ( errmetrics = cassandra.NewErrMetrics("store.cassandra") ) -/* -https://godoc.org/github.com/gocql/gocql#Session -Session is the interface used by users to interact with the database. -It's safe for concurrent use by multiple goroutines and a typical usage scenario is to have one global session -object to interact with the whole Cassandra cluster. -*/ +type ChunkReadRequest struct { + month uint32 + sortKey uint32 + q string + p []interface{} + timestamp time.Time + out chan outcome + ctx context.Context +} type TTLTables map[uint32]ttlTable type ttlTable struct { @@ -95,7 +99,7 @@ type ttlTable struct { type CassandraStore struct { Session *gocql.Session - writeQueues []chan *ChunkWriteRequest + writeQueues []chan *mdata.ChunkWriteRequest writeQueueMeters []*stats.Range32 readQueue chan *ChunkReadRequest ttlTables TTLTables @@ -285,7 +289,7 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, log.Debug("CS: created session to %s keysp %s cons %v with policy %s timeout %d readers %d writers %d readq %d writeq %d retries %d proto %d ssl %t auth %t hostverif %t", addrs, keyspace, consistency, hostSelectionPolicy, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, ssl, auth, hostVerification) c := &CassandraStore{ Session: session, - writeQueues: make([]chan *ChunkWriteRequest, writers), + writeQueues: make([]chan *mdata.ChunkWriteRequest, writers), writeQueueMeters: make([]*stats.Range32, writers), readQueue: make(chan *ChunkReadRequest, readqsize), omitReadTimeout: time.Duration(omitReadTimeout) * time.Second, @@ -295,7 +299,7 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, } for i := 0; i < writers; i++ { - c.writeQueues[i] = make(chan *ChunkWriteRequest, writeqsize) + c.writeQueues[i] = make(chan *mdata.ChunkWriteRequest, writeqsize) c.writeQueueMeters[i] = stats.NewRange32(fmt.Sprintf("store.cassandra.write_queue.%d.items", i+1)) go c.processWriteQueue(c.writeQueues[i], c.writeQueueMeters[i]) } @@ -311,7 +315,7 @@ func (c *CassandraStore) SetTracer(t opentracing.Tracer) { c.tracer = t } -func (c *CassandraStore) Add(cwr *ChunkWriteRequest) { +func (c *CassandraStore) Add(cwr *mdata.ChunkWriteRequest) { sum := 0 for _, char := range cwr.Key { sum += int(char) @@ -323,7 +327,7 @@ func (c *CassandraStore) Add(cwr *ChunkWriteRequest) { /* process writeQueue. */ -func (c *CassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter *stats.Range32) { +func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest, meter *stats.Range32) { tick := time.Tick(time.Duration(1) * time.Second) for { select { @@ -344,7 +348,7 @@ func (c *CassandraStore) processWriteQueue(queue chan *ChunkWriteRequest, meter if err == nil { success = true cwr.Metric.SyncChunkSaveState(cwr.Chunk.T0) - SendPersistMessage(cwr.Key, cwr.Chunk.T0) + mdata.SendPersistMessage(cwr.Key, cwr.Chunk.T0) log.Debug("CS: save complete. %s:%d %v", cwr.Key, cwr.Chunk.T0, cwr.Chunk) chunkSaveOk.Inc() } else { diff --git a/mdata/store_cassandra_test.go b/store/cassandra/store_cassandra_test.go similarity index 99% rename from mdata/store_cassandra_test.go rename to store/cassandra/store_cassandra_test.go index a97ccf8094..4a7d38bd5f 100644 --- a/mdata/store_cassandra_test.go +++ b/store/cassandra/store_cassandra_test.go @@ -1,4 +1,4 @@ -package mdata +package cassandra import ( "fmt" diff --git a/mdata/store_devnull.go b/store/store_devnull.go similarity index 82% rename from mdata/store_devnull.go rename to store/store_devnull.go index 812bdb1a57..7dc2880a8c 100644 --- a/mdata/store_devnull.go +++ b/store/store_devnull.go @@ -1,8 +1,9 @@ -package mdata +package store import ( "context" + "github.com/grafana/metrictank/mdata" "github.com/grafana/metrictank/mdata/chunk" opentracing "github.com/opentracing/opentracing-go" ) @@ -16,7 +17,7 @@ func NewDevnullStore() *devnullStore { return d } -func (c *devnullStore) Add(cwr *ChunkWriteRequest) { +func (c *devnullStore) Add(cwr *mdata.ChunkWriteRequest) { c.AddCount++ }