Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

move backend stores into their own package #851

Merged
merged 1 commit into from
Feb 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/ccache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) {
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

store := mdata.NewDevnullStore()
store := mdata.NewMockStore()
store.Drop = true
srv.BindBackendStore(store)

mockCache := cache.NewMockCache()
Expand Down
5 changes: 3 additions & 2 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func TestPrevBoundary(t *testing.T) {

func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
store := mdata.NewMockStore()
store.Drop = true

mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))
Expand Down Expand Up @@ -746,7 +747,7 @@ func TestGetSeriesCachedStore(t *testing.T) {

// stop cache go routines before reinstantiating it at the top of the loop
c.Stop()
store.ResetMock()
store.Reset()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-schemas-explain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

var (
Expand Down Expand Up @@ -65,7 +65,7 @@ func main() {
fmt.Printf("retentions:%10s %10s %10s %10s %10s %15s %10s\n", "interval", "retention", "chunkspan", "numchunks", "ready", "tablename", "windowsize")
for _, ret := range schema.Retentions {
retention := ret.MaxRetention()
table := mdata.GetTTLTable(uint32(retention), *windowFactor, mdata.Table_name_format)
table := cassandra.GetTTLTable(uint32(retention), *windowFactor, cassandra.Table_name_format)
retStr := time.Duration(time.Duration(retention) * time.Second).String()
if retention%(3600*24) == 0 {
retStr = fmt.Sprintf("%dd", retention/3600/24)
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-split-metrics-by-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -69,7 +69,7 @@ func main() {
panic(fmt.Sprintf("Error creating directory: %s", err))
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls)
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls)
if err != nil {
panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err))
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/mt-store-cat/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ import (
"strings"
"time"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

func chunkSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error {
func chunkSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error {
now := uint32(time.Now().Unix())
end_month := now - (now % mdata.Month_sec)
end_month := now - (now % cassandra.Month_sec)

for _, tbl := range tables {
// actual TTL may be up to 2x what's in tablename. see mdata/store_cassandra.go for details
// actual TTL may be up to 2x what's in tablename. see store/cassandra/store_cassandra.go for details
// we query up to 4x so that we also include data that should have been dropped already but still sticks around for whatever reason.
TTLHours, _ := strconv.Atoi(strings.Split(tbl, "_")[1])
start := now - uint32(4*TTLHours*3600)
start_month := start - (start % mdata.Month_sec)
start_month := start - (start % cassandra.Month_sec)
fmt.Println("## Table", tbl)
if len(metrics) == 0 {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl)
iter := store.Session.Query(query).Iter()
showKeyTTL(iter, groupTTL)
} else {
for _, metric := range metrics {
for month := start_month; month <= end_month; month += mdata.Month_sec {
row_key := fmt.Sprintf("%s_%d", metric.id, month/mdata.Month_sec)
for month := start_month; month <= end_month; month += cassandra.Month_sec {
row_key := fmt.Sprintf("%s_%d", metric.id, month/cassandra.Month_sec)
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl)
iter := store.Session.Query(query, row_key).Iter()
showKeyTTL(iter, groupTTL)
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/grafana/metrictank/conf"
opentracing "github.com/opentracing/opentracing-go"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
"github.com/rakyll/globalconf"
)
Expand Down Expand Up @@ -160,7 +160,7 @@ func main() {
}
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/mt-store-cat/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sort"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

type Metric struct {
Expand All @@ -20,7 +20,7 @@ func (m MetricsByName) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m MetricsByName) Less(i, j int) bool { return m[i].name < m[j].name }

// prefix is optional
func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) {
func getMetrics(store *cassandra.CassandraStore, prefix string) ([]Metric, error) {
var metrics []Metric
iter := store.Session.Query("select id, metric from metric_idx").Iter()
var m Metric
Expand All @@ -37,7 +37,7 @@ func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) {
return metrics, nil
}

func getMetric(store *mdata.CassandraStore, id string) ([]Metric, error) {
func getMetric(store *cassandra.CassandraStore, id string) ([]Metric, error) {
var metrics []Metric
iter := store.Session.Query("select id, metric from metric_idx where id=? ALLOW FILTERING", id).Iter()
var m Metric
Expand Down
8 changes: 4 additions & 4 deletions cmd/mt-store-cat/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/grafana/metrictank/api"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/store/cassandra"
"gopkg.in/raintank/schema.v1"
)

func points(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
func points(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
for _, metric := range metrics {
fmt.Println("## Metric", metric)
for _, table := range tables {
Expand All @@ -32,7 +32,7 @@ func points(ctx context.Context, store *mdata.CassandraStore, tables []string, m
}
}

func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
func pointSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
for _, metric := range metrics {
fmt.Println("## Metric", metric)
for _, table := range tables {
Expand All @@ -51,7 +51,7 @@ func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []str
}
}

func getSeries(ctx context.Context, store *mdata.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point {
func getSeries(ctx context.Context, store *cassandra.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point {
var points []schema.Point
itgens, err := store.SearchTable(ctx, id, table, fromUnix, toUnix)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-store-cat/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

type TablesByTTL []string
Expand All @@ -19,7 +19,7 @@ func (t TablesByTTL) Less(i, j int) bool {
return iTTL < jTTL
}

func getTables(store *mdata.CassandraStore, keyspace string, match string) ([]string, error) {
func getTables(store *cassandra.CassandraStore, keyspace string, match string) ([]string, error) {
var tables []string
meta, err := store.Session.KeyspaceMetadata(keyspace)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-view-boundaries/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"runtime"
"time"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -54,7 +54,7 @@ func main() {
return
}

display(int64(mdata.Month_sec), "cassandra rowkey month")
display(int64(cassandra.Month_sec), "cassandra rowkey month")

if *spanStr != "" {
span := dur.MustParseNDuration("span", *spanStr)
Expand Down
12 changes: 6 additions & 6 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/grafana/metrictank/cluster/partitioner"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/mdata/chunk/archive"
cassandraStore "github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -95,7 +95,7 @@ var (

type Server struct {
Session *gocql.Session
TTLTables mdata.TTLTables
TTLTables cassandraStore.TTLTables
Partitioner partitioner.Partitioner
Index idx.MetricIndex
HTTPServer *http.Server
Expand Down Expand Up @@ -149,7 +149,7 @@ func main() {
log.SetLevel(log.InfoLevel)
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
}
Expand All @@ -159,7 +159,7 @@ func main() {
for _, split := range splits {
ttls = append(ttls, dur.MustParseNDuration("ttl", split))
}
ttlTables := mdata.GetTTLTables(ttls, *windowFactor, mdata.Table_name_format)
ttlTables := cassandraStore.GetTTLTables(ttls, *windowFactor, cassandraStore.Table_name_format)

p, err := partitioner.NewKafka(*partitionScheme)
if err != nil {
Expand Down Expand Up @@ -258,11 +258,11 @@ func (s *Server) insertChunks(table, id string, ttl uint32, itergens []chunk.Ite
}
log.Debug(query)
for _, ig := range itergens {
rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/mdata.Month_sec)
rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/cassandraStore.Month_sec)
success := false
attempts := 0
for !success {
err := s.Session.Query(query, rowKey, ig.Ts, mdata.PrepareChunkData(ig.Span, ig.Bytes())).Exec()
err := s.Session.Query(query, rowKey, ig.Ts, cassandraStore.PrepareChunkData(ig.Span, ig.Bytes())).Exec()
if err != nil {
if (attempts % 20) == 0 {
log.Warnf("CS: failed to save chunk to cassandra after %d attempts. %s", attempts+1, err)
Expand Down
5 changes: 3 additions & 2 deletions input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
backendStore "github.com/grafana/metrictank/store"
"gopkg.in/raintank/schema.v1"
)

func BenchmarkProcessUniqueMetrics(b *testing.B) {
cluster.Init("default", "test", time.Now(), "http", 6060)

store := mdata.NewDevnullStore()
store := backendStore.NewDevnullStore()

mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true))
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
Expand Down Expand Up @@ -54,7 +55,7 @@ func BenchmarkProcessUniqueMetrics(b *testing.B) {
func BenchmarkProcessSameMetric(b *testing.B) {
cluster.Init("default", "test", time.Now(), "http", 6060)

store := mdata.NewDevnullStore()
store := backendStore.NewDevnullStore()

mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true))
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
Expand Down
Loading