Skip to content

Commit

Permalink
feat: remaining storage metrics from OSS engine (#22938)
Browse files Browse the repository at this point in the history
* fix: simplify disk size tracking

* refactor: EngineTags in tsdb package

* fix: fewer compaction buckets and dead code removal

* feat: shard metrics

* chore: formatting

* feat: tsdb store metrics

* feat: retention check metrics

* chore: fix go vet

* fix: review comments
  • Loading branch information
lesam authored Dec 2, 2021
1 parent 3460f1c commit b970e35
Show file tree
Hide file tree
Showing 21 changed files with 420 additions and 248 deletions.
2 changes: 1 addition & 1 deletion cmd/influxd/inspect/build_tsi/build_tsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
}
} else {
log.Debug("Building cache from wal files")
cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only
cache := tsm1.NewCache(maxCacheSize, tsdb.EngineTags{}) // tags are for metrics only
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(log)
if err := loader.Load(cache); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/inspect/verify_wal/verify_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"testing"

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func newTempWALValid(t *testing.T) string {
dir, err := os.MkdirTemp("", "verify-wal")
require.NoError(t, err)

w := tsm1.NewWAL(dir, 0, 0, tsm1.EngineTags{})
w := tsm1.NewWAL(dir, 0, 0, tsdb.EngineTags{})
defer w.Close()
require.NoError(t, w.Open())

Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
m.engine = storage.NewEngine(
opts.EnginePath,
opts.StorageConfig,
storage.WithMetricsDisabled(opts.MetricsDisabled),
storage.WithMetaClient(metaClient),
)
}
Expand Down
4 changes: 0 additions & 4 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type TSDBStoreMock struct {
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measurements int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
Expand Down Expand Up @@ -95,9 +94,6 @@ func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(ctx, auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) {
return s.MeasurementSeriesCountsFn(database)
}
func (s *TSDBStoreMock) MeasurementsCardinality(database string) (int64, error) {
return s.MeasurementsCardinalityFn(database)
}
Expand Down
24 changes: 16 additions & 8 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ type Engine struct {
retentionService *retention.Service
precreatorService *precreator.Service

defaultMetricLabels prometheus.Labels

writePointsValidationEnabled bool

logger *zap.Logger
logger *zap.Logger
metricsDisabled bool
}

// Option provides a set
Expand All @@ -70,6 +69,12 @@ func WithMetaClient(c MetaClient) Option {
}
}

func WithMetricsDisabled(m bool) Option {
return func(e *Engine) {
e.metricsDisabled = m
}
}

type MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
Expand Down Expand Up @@ -109,11 +114,10 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
c.Data.WALDir = filepath.Join(path, "wal")

e := &Engine{
config: c,
path: path,
defaultMetricLabels: prometheus.Labels{},
tsdbStore: tsdb.NewStore(c.Data.Dir),
logger: zap.NewNop(),
config: c,
path: path,
tsdbStore: tsdb.NewStore(c.Data.Dir),
logger: zap.NewNop(),

writePointsValidationEnabled: true,
}
Expand All @@ -127,6 +131,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
// Copy TSDB configuration.
e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine
e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index
e.tsdbStore.EngineOptions.MetricsDisabled = e.metricsDisabled

pw := coordinator.NewPointsWriter(c.WriteTimeout, path)
pw.TSDBStore = e.tsdbStore
Expand Down Expand Up @@ -167,6 +172,9 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, tsm1.PrometheusCollectors()...)
metrics = append(metrics, coordinator.PrometheusCollectors()...)
metrics = append(metrics, tsdb.ShardCollectors()...)
metrics = append(metrics, tsdb.BucketCollectors()...)
metrics = append(metrics, retention.PrometheusCollectors()...)
return metrics
}

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type EngineOptions struct {
OnNewEngine func(Engine)

FileStoreObserver FileStoreObserver
MetricsDisabled bool
}

// NewEngineOptions constructs an EngineOptions object with safe default values.
Expand Down
13 changes: 7 additions & 6 deletions tsdb/engine/tsm1/array_cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("cache", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

const START, END = 10, 1
kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false)
Expand All @@ -96,7 +97,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
t.Run("tsm", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

const START, END = 10, 1

Expand Down Expand Up @@ -133,7 +134,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) {
func TestFileStore_DuplicatePoints(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

makeVals := func(ts ...int64) []Value {
vals := make([]Value, len(ts))
Expand Down Expand Up @@ -218,7 +219,7 @@ func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64) []Value {
Expand Down Expand Up @@ -320,7 +321,7 @@ func (a *FloatArray) Swap(i, j int) {
func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {
Expand Down Expand Up @@ -414,7 +415,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing
func TestFileStore_SeekBoundaries(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := NewFileStore(dir, EngineTags{})
fs := NewFileStore(dir, tsdb.EngineTags{})

// makeVals creates count points starting at ts and incrementing by step
makeVals := func(ts, count, step int64, v float64) []Value {
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots.
// Note tags are for metrics only, so if metrics are not desired tags do not have to be set.
func NewCache(maxSize uint64, tags EngineTags) *Cache {
func NewCache(maxSize uint64, tags tsdb.EngineTags) *Cache {
c := &Cache{
maxSize: maxSize,
store: emptyStore{},
Expand Down Expand Up @@ -224,7 +224,7 @@ type cacheMetrics struct {
}

func newAllCacheMetrics() *allCacheMetrics {
labels := EngineLabelNames()
labels := tsdb.EngineLabelNames()
return &allCacheMetrics{
MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Expand Down Expand Up @@ -276,7 +276,7 @@ func CacheCollectors() []prometheus.Collector {
}
}

func newCacheMetrics(tags EngineTags) *cacheMetrics {
func newCacheMetrics(tags tsdb.EngineTags) *cacheMetrics {
labels := tags.GetLabels()
return &cacheMetrics{
MemBytes: globalCacheMetrics.MemBytes.With(labels),
Expand Down
7 changes: 4 additions & 3 deletions tsdb/engine/tsm1/cache_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"testing"

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
)

Expand All @@ -26,7 +27,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestCacheRace(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestCacheRace2Compacters(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000, tsm1.EngineTags{})
c := tsm1.NewCache(1000000, tsdb.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down
Loading

0 comments on commit b970e35

Please sign in to comment.