Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #851 from grafana/moveCassandraStore
Browse files Browse the repository at this point in the history
move backend stores into their own package
  • Loading branch information
Anthony Woods authored Feb 16, 2018
2 parents 0ee6e80 + 96ebc7d commit 82d0f4d
Show file tree
Hide file tree
Showing 22 changed files with 133 additions and 107 deletions.
3 changes: 2 additions & 1 deletion api/ccache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) {
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

store := mdata.NewDevnullStore()
store := mdata.NewMockStore()
store.Drop = true
srv.BindBackendStore(store)

mockCache := cache.NewMockCache()
Expand Down
5 changes: 3 additions & 2 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func TestPrevBoundary(t *testing.T) {

func TestGetSeriesFixed(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewDevnullStore()
store := mdata.NewMockStore()
store.Drop = true

mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))
Expand Down Expand Up @@ -746,7 +747,7 @@ func TestGetSeriesCachedStore(t *testing.T) {

// stop cache go routines before reinstantiating it at the top of the loop
c.Stop()
store.ResetMock()
store.Reset()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-schemas-explain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

var (
Expand Down Expand Up @@ -65,7 +65,7 @@ func main() {
fmt.Printf("retentions:%10s %10s %10s %10s %10s %15s %10s\n", "interval", "retention", "chunkspan", "numchunks", "ready", "tablename", "windowsize")
for _, ret := range schema.Retentions {
retention := ret.MaxRetention()
table := mdata.GetTTLTable(uint32(retention), *windowFactor, mdata.Table_name_format)
table := cassandra.GetTTLTable(uint32(retention), *windowFactor, cassandra.Table_name_format)
retStr := time.Duration(time.Duration(retention) * time.Second).String()
if retention%(3600*24) == 0 {
retStr = fmt.Sprintf("%dd", retention/3600/24)
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-split-metrics-by-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -69,7 +69,7 @@ func main() {
panic(fmt.Sprintf("Error creating directory: %s", err))
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls)
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls)
if err != nil {
panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err))
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/mt-store-cat/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ import (
"strings"
"time"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

func chunkSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error {
func chunkSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, keyspace, groupTTL string) error {
now := uint32(time.Now().Unix())
end_month := now - (now % mdata.Month_sec)
end_month := now - (now % cassandra.Month_sec)

for _, tbl := range tables {
// actual TTL may be up to 2x what's in tablename. see mdata/store_cassandra.go for details
// actual TTL may be up to 2x what's in tablename. see store/cassandra/store_cassandra.go for details
// we query up to 4x so that we also include data that should have been dropped already but still sticks around for whatever reason.
TTLHours, _ := strconv.Atoi(strings.Split(tbl, "_")[1])
start := now - uint32(4*TTLHours*3600)
start_month := start - (start % mdata.Month_sec)
start_month := start - (start % cassandra.Month_sec)
fmt.Println("## Table", tbl)
if len(metrics) == 0 {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl)
iter := store.Session.Query(query).Iter()
showKeyTTL(iter, groupTTL)
} else {
for _, metric := range metrics {
for month := start_month; month <= end_month; month += mdata.Month_sec {
row_key := fmt.Sprintf("%s_%d", metric.id, month/mdata.Month_sec)
for month := start_month; month <= end_month; month += cassandra.Month_sec {
row_key := fmt.Sprintf("%s_%d", metric.id, month/cassandra.Month_sec)
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl)
iter := store.Session.Query(query, row_key).Iter()
showKeyTTL(iter, groupTTL)
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/grafana/metrictank/conf"
opentracing "github.com/opentracing/opentracing-go"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
"github.com/rakyll/globalconf"
)
Expand Down Expand Up @@ -160,7 +160,7 @@ func main() {
}
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/mt-store-cat/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sort"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

type Metric struct {
Expand All @@ -20,7 +20,7 @@ func (m MetricsByName) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m MetricsByName) Less(i, j int) bool { return m[i].name < m[j].name }

// prefix is optional
func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) {
func getMetrics(store *cassandra.CassandraStore, prefix string) ([]Metric, error) {
var metrics []Metric
iter := store.Session.Query("select id, metric from metric_idx").Iter()
var m Metric
Expand All @@ -37,7 +37,7 @@ func getMetrics(store *mdata.CassandraStore, prefix string) ([]Metric, error) {
return metrics, nil
}

func getMetric(store *mdata.CassandraStore, id string) ([]Metric, error) {
func getMetric(store *cassandra.CassandraStore, id string) ([]Metric, error) {
var metrics []Metric
iter := store.Session.Query("select id, metric from metric_idx where id=? ALLOW FILTERING", id).Iter()
var m Metric
Expand Down
8 changes: 4 additions & 4 deletions cmd/mt-store-cat/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/grafana/metrictank/api"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/store/cassandra"
"gopkg.in/raintank/schema.v1"
)

func points(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
func points(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
for _, metric := range metrics {
fmt.Println("## Metric", metric)
for _, table := range tables {
Expand All @@ -32,7 +32,7 @@ func points(ctx context.Context, store *mdata.CassandraStore, tables []string, m
}
}

func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
func pointSummary(ctx context.Context, store *cassandra.CassandraStore, tables []string, metrics []Metric, fromUnix, toUnix, fix uint32) {
for _, metric := range metrics {
fmt.Println("## Metric", metric)
for _, table := range tables {
Expand All @@ -51,7 +51,7 @@ func pointSummary(ctx context.Context, store *mdata.CassandraStore, tables []str
}
}

func getSeries(ctx context.Context, store *mdata.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point {
func getSeries(ctx context.Context, store *cassandra.CassandraStore, table, id string, fromUnix, toUnix, interval uint32) []schema.Point {
var points []schema.Point
itgens, err := store.SearchTable(ctx, id, table, fromUnix, toUnix)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-store-cat/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"
"strings"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/store/cassandra"
)

type TablesByTTL []string
Expand All @@ -19,7 +19,7 @@ func (t TablesByTTL) Less(i, j int) bool {
return iTTL < jTTL
}

func getTables(store *mdata.CassandraStore, keyspace string, match string) ([]string, error) {
func getTables(store *cassandra.CassandraStore, keyspace string, match string) ([]string, error) {
var tables []string
meta, err := store.Session.KeyspaceMetadata(keyspace)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/mt-view-boundaries/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"runtime"
"time"

"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -54,7 +54,7 @@ func main() {
return
}

display(int64(mdata.Month_sec), "cassandra rowkey month")
display(int64(cassandra.Month_sec), "cassandra rowkey month")

if *spanStr != "" {
span := dur.MustParseNDuration("span", *spanStr)
Expand Down
12 changes: 6 additions & 6 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/grafana/metrictank/cluster/partitioner"
"github.com/grafana/metrictank/idx"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/mdata/chunk/archive"
cassandraStore "github.com/grafana/metrictank/store/cassandra"
"github.com/raintank/dur"
)

Expand Down Expand Up @@ -95,7 +95,7 @@ var (

type Server struct {
Session *gocql.Session
TTLTables mdata.TTLTables
TTLTables cassandraStore.TTLTables
Partitioner partitioner.Partitioner
Index idx.MetricIndex
HTTPServer *http.Server
Expand Down Expand Up @@ -149,7 +149,7 @@ func main() {
log.SetLevel(log.InfoLevel)
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
}
Expand All @@ -159,7 +159,7 @@ func main() {
for _, split := range splits {
ttls = append(ttls, dur.MustParseNDuration("ttl", split))
}
ttlTables := mdata.GetTTLTables(ttls, *windowFactor, mdata.Table_name_format)
ttlTables := cassandraStore.GetTTLTables(ttls, *windowFactor, cassandraStore.Table_name_format)

p, err := partitioner.NewKafka(*partitionScheme)
if err != nil {
Expand Down Expand Up @@ -258,11 +258,11 @@ func (s *Server) insertChunks(table, id string, ttl uint32, itergens []chunk.Ite
}
log.Debug(query)
for _, ig := range itergens {
rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/mdata.Month_sec)
rowKey := fmt.Sprintf("%s_%d", id, ig.Ts/cassandraStore.Month_sec)
success := false
attempts := 0
for !success {
err := s.Session.Query(query, rowKey, ig.Ts, mdata.PrepareChunkData(ig.Span, ig.Bytes())).Exec()
err := s.Session.Query(query, rowKey, ig.Ts, cassandraStore.PrepareChunkData(ig.Span, ig.Bytes())).Exec()
if err != nil {
if (attempts % 20) == 0 {
log.Warnf("CS: failed to save chunk to cassandra after %d attempts. %s", attempts+1, err)
Expand Down
5 changes: 3 additions & 2 deletions input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
backendStore "github.com/grafana/metrictank/store"
"gopkg.in/raintank/schema.v1"
)

func BenchmarkProcessUniqueMetrics(b *testing.B) {
cluster.Init("default", "test", time.Now(), "http", 6060)

store := mdata.NewDevnullStore()
store := backendStore.NewDevnullStore()

mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true))
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
Expand Down Expand Up @@ -54,7 +55,7 @@ func BenchmarkProcessUniqueMetrics(b *testing.B) {
func BenchmarkProcessSameMetric(b *testing.B) {
cluster.Init("default", "test", time.Now(), "http", 6060)

store := mdata.NewDevnullStore()
store := backendStore.NewDevnullStore()

mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, true))
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
Expand Down
Loading

0 comments on commit 82d0f4d

Please sign in to comment.