Skip to content

Commit

Permalink
Rocksdb Metrics (#1692)
Browse files Browse the repository at this point in the history
* Rocksdb Metrics

* Add rocksdb namespace for options

* Adding help to the metrics

* CR's fixes

* CR's fixes

* CR's fixes
  • Loading branch information
evgeniy-scherbina committed Sep 7, 2023
1 parent a98bdfe commit c992269
Show file tree
Hide file tree
Showing 9 changed files with 1,103 additions and 8 deletions.
166 changes: 166 additions & 0 deletions cmd/kava/opendb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//go:build rocksdb
// +build rocksdb

package opendb

import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

// rocksdbMetrics will be initialized in registerMetrics() if enableRocksdbMetrics flag set to true
var rocksdbMetrics *Metrics

// Metrics contains all rocksdb metrics which will be reported to prometheus
type Metrics struct {
// Keys
NumberKeysWritten metrics.Gauge
NumberKeysRead metrics.Gauge
NumberKeysUpdated metrics.Gauge
EstimateNumKeys metrics.Gauge

// Files
NumberFileOpens metrics.Gauge
NumberFileErrors metrics.Gauge

// Memory
BlockCacheUsage metrics.Gauge
EstimateTableReadersMem metrics.Gauge
CurSizeAllMemTables metrics.Gauge
BlockCachePinnedUsage metrics.Gauge

// Cache
BlockCacheMiss metrics.Gauge
BlockCacheHit metrics.Gauge
BlockCacheAdd metrics.Gauge
BlockCacheAddFailures metrics.Gauge
}

// registerMetrics registers metrics in prometheus and initializes rocksdbMetrics variable
func registerMetrics() {
if rocksdbMetrics != nil {
// metrics already registered
return
}

labels := make([]string, 0)
rocksdbMetrics = &Metrics{
// Keys
NumberKeysWritten: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "key",
Name: "number_keys_written",
Help: "",
}, labels),
NumberKeysRead: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "key",
Name: "number_keys_read",
Help: "",
}, labels),
NumberKeysUpdated: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "key",
Name: "number_keys_updated",
Help: "",
}, labels),
EstimateNumKeys: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "key",
Name: "estimate_num_keys",
Help: "estimated number of total keys in the active and unflushed immutable memtables and storage",
}, labels),

// Files
NumberFileOpens: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "file",
Name: "number_file_opens",
Help: "",
}, labels),
NumberFileErrors: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "file",
Name: "number_file_errors",
Help: "",
}, labels),

// Memory
BlockCacheUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "memory",
Name: "block_cache_usage",
Help: "memory size for the entries residing in block cache",
}, labels),
EstimateTableReadersMem: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "memory",
Name: "estimate_table_readers_mem",
Help: "estimated memory used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks)",
}, labels),
CurSizeAllMemTables: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "memory",
Name: "cur_size_all_mem_tables",
Help: "approximate size of active and unflushed immutable memtables (bytes)",
}, labels),
BlockCachePinnedUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "memory",
Name: "block_cache_pinned_usage",
Help: "returns the memory size for the entries being pinned",
}, labels),

// Cache
BlockCacheMiss: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "cache",
Name: "block_cache_miss",
Help: "block_cache_miss == block_cache_index_miss + block_cache_filter_miss + block_cache_data_miss",
}, labels),
BlockCacheHit: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "cache",
Name: "block_cache_hit",
Help: "block_cache_hit == block_cache_index_hit + block_cache_filter_hit + block_cache_data_hit",
}, labels),
BlockCacheAdd: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "cache",
Name: "block_cache_add",
Help: "number of blocks added to block cache",
}, labels),
BlockCacheAddFailures: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "rocksdb",
Subsystem: "cache",
Name: "block_cache_add_failures",
Help: "number of failures when adding blocks to block cache",
}, labels),
}
}

// report reports metrics to prometheus based on rocksdb props and stats
func (m *Metrics) report(props *properties, stats *stats) {
// Keys
m.NumberKeysWritten.Set(float64(stats.NumberKeysWritten))
m.NumberKeysRead.Set(float64(stats.NumberKeysRead))
m.NumberKeysUpdated.Set(float64(stats.NumberKeysUpdated))
m.EstimateNumKeys.Set(float64(props.EstimateNumKeys))

// Files
m.NumberFileOpens.Set(float64(stats.NumberFileOpens))
m.NumberFileErrors.Set(float64(stats.NumberFileErrors))

// Memory
m.BlockCacheUsage.Set(float64(props.BlockCacheUsage))
m.EstimateTableReadersMem.Set(float64(props.EstimateTableReadersMem))
m.CurSizeAllMemTables.Set(float64(props.CurSizeAllMemTables))
m.BlockCachePinnedUsage.Set(float64(props.BlockCachePinnedUsage))

// Cache
m.BlockCacheMiss.Set(float64(stats.BlockCacheMiss))
m.BlockCacheHit.Set(float64(stats.BlockCacheHit))
m.BlockCacheAdd.Set(float64(stats.BlockCacheAdd))
m.BlockCacheAddFailures.Set(float64(stats.BlockCacheAddFailures))
}
80 changes: 74 additions & 6 deletions cmd/kava/opendb/opendb_rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"runtime"
"strings"
"time"

"github.com/cosmos/cosmos-sdk/server/types"
"github.com/linxGnu/grocksdb"
Expand All @@ -42,11 +43,15 @@ const (

defaultColumnFamilyName = "default"

maxOpenFilesDBOptName = "max_open_files"
maxFileOpeningThreadsDBOptName = "max_file_opening_threads"
enableMetricsOptName = "rocksdb.enable-metrics"
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
defaultReportMetricsIntervalSecs = 15

writeBufferSizeCFOptName = "write_buffer_size"
numLevelsCFOptName = "num_levels"
maxOpenFilesDBOptName = "rocksdb.max-open-files"
maxFileOpeningThreadsDBOptName = "rocksdb.max-file-opening-threads"

writeBufferSizeCFOptName = "rocksdb.write-buffer-size"
numLevelsCFOptName = "rocksdb.num-levels"
)

func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType) (dbm.DB, error) {
Expand All @@ -69,7 +74,13 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
dbOpts = overrideDBOpts(dbOpts, appOpts)
cfOpts = overrideCFOpts(cfOpts, appOpts)

return newRocksDBWithOptions("application", dir, dbOpts, cfOpts)
enableMetrics := cast.ToBool(appOpts.Get(enableMetricsOptName))
reportMetricsIntervalSecs := cast.ToInt64(appOpts.Get(reportMetricsIntervalSecsOptName))
if reportMetricsIntervalSecs == 0 {
reportMetricsIntervalSecs = defaultReportMetricsIntervalSecs
}

return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, enableMetrics, reportMetricsIntervalSecs)
}

// loadLatestOptions loads and returns database and column family options
Expand Down Expand Up @@ -128,18 +139,36 @@ func overrideCFOpts(cfOpts *grocksdb.Options, appOpts types.AppOptions) *grocksd

// newRocksDBWithOptions opens rocksdb with provided database and column family options
// newRocksDBWithOptions expects that db has only one column family named default
func newRocksDBWithOptions(name string, dir string, dbOpts, cfOpts *grocksdb.Options) (*dbm.RocksDB, error) {
func newRocksDBWithOptions(
name string,
dir string,
dbOpts *grocksdb.Options,
cfOpts *grocksdb.Options,
enableMetrics bool,
reportMetricsIntervalSecs int64,
) (*dbm.RocksDB, error) {
dbPath := filepath.Join(dir, name+".db")

// Ensure path exists
if err := os.MkdirAll(dbPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create db path: %w", err)
}

// EnableStatistics adds overhead so shouldn't be enabled in production
if enableMetrics {
dbOpts.EnableStatistics()
}

db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
if err != nil {
return nil, err
}

if enableMetrics {
registerMetrics()
go reportMetrics(db, time.Second*time.Duration(reportMetricsIntervalSecs))
}

ro := grocksdb.NewDefaultReadOptions()
wo := grocksdb.NewDefaultWriteOptions()
woSync := grocksdb.NewDefaultWriteOptions()
Expand Down Expand Up @@ -168,3 +197,42 @@ func newDefaultOptions() *grocksdb.Options {

return opts
}

// reportMetrics periodically requests stats from rocksdb and reports to prometheus
// NOTE: should be launched as a goroutine
func reportMetrics(db *grocksdb.DB, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
props, stats, err := getPropsAndStats(db)
if err != nil {
continue
}

rocksdbMetrics.report(props, stats)
}
}
}

// getPropsAndStats gets statistics from rocksdb
func getPropsAndStats(db *grocksdb.DB) (*properties, *stats, error) {
propsLoader := newPropsLoader(db)
props, err := propsLoader.load()
if err != nil {
return nil, nil, err
}

statMap, err := parseSerializedStats(props.OptionsStatistics)
if err != nil {
return nil, nil, err
}

statLoader := newStatLoader(statMap)
stats, err := statLoader.load()
if err != nil {
return nil, nil, err
}

return props, stats, nil
}
4 changes: 2 additions & 2 deletions cmd/kava/opendb/opendb_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
}()

db, err := newRocksDBWithOptions(name, dir, tc.dbOpts, tc.cfOpts)
db, err := newRocksDBWithOptions(name, dir, tc.dbOpts, tc.cfOpts, true, defaultReportMetricsIntervalSecs)
require.NoError(t, err)
require.NoError(t, db.Close())

Expand Down Expand Up @@ -337,7 +337,7 @@ func TestNewRocksDBWithOptions(t *testing.T) {
cfOpts := newDefaultOptions()
cfOpts.SetWriteBufferSize(999_999)

db, err := newRocksDBWithOptions(name, dir, dbOpts, cfOpts)
db, err := newRocksDBWithOptions(name, dir, dbOpts, cfOpts, true, defaultReportMetricsIntervalSecs)
require.NoError(t, err)
require.NoError(t, db.Close())

Expand Down
87 changes: 87 additions & 0 deletions cmd/kava/opendb/props_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//go:build rocksdb
// +build rocksdb

package opendb

import (
"fmt"
"strings"

"errors"
)

type propsGetter interface {
GetProperty(propName string) (value string)
GetIntProperty(propName string) (value uint64, success bool)
}

type propsLoader struct {
db propsGetter
errorMsgs []string
}

func newPropsLoader(db propsGetter) *propsLoader {
return &propsLoader{
db: db,
errorMsgs: make([]string, 0),
}
}

func (l *propsLoader) load() (*properties, error) {
props := &properties{
BaseLevel: l.getIntProperty("rocksdb.base-level"),
BlockCacheCapacity: l.getIntProperty("rocksdb.block-cache-capacity"),
BlockCachePinnedUsage: l.getIntProperty("rocksdb.block-cache-pinned-usage"),
BlockCacheUsage: l.getIntProperty("rocksdb.block-cache-usage"),
CurSizeActiveMemTable: l.getIntProperty("rocksdb.cur-size-active-mem-table"),
CurSizeAllMemTables: l.getIntProperty("rocksdb.cur-size-all-mem-tables"),
EstimateLiveDataSize: l.getIntProperty("rocksdb.estimate-live-data-size"),
EstimateNumKeys: l.getIntProperty("rocksdb.estimate-num-keys"),
EstimateTableReadersMem: l.getIntProperty("rocksdb.estimate-table-readers-mem"),
LiveSSTFilesSize: l.getIntProperty("rocksdb.live-sst-files-size"),
SizeAllMemTables: l.getIntProperty("rocksdb.size-all-mem-tables"),
OptionsStatistics: l.getProperty("rocksdb.options-statistics"),
}

if len(l.errorMsgs) != 0 {
errorMsg := strings.Join(l.errorMsgs, ";")
return nil, errors.New(errorMsg)
}

return props, nil
}

func (l *propsLoader) getProperty(propName string) string {
value := l.db.GetProperty(propName)
if value == "" {
l.errorMsgs = append(l.errorMsgs, fmt.Sprintf("property %v is empty", propName))
return ""
}

return value
}

func (l *propsLoader) getIntProperty(propName string) uint64 {
value, ok := l.db.GetIntProperty(propName)
if !ok {
l.errorMsgs = append(l.errorMsgs, fmt.Sprintf("can't get %v int property", propName))
return 0
}

return value
}

type properties struct {
BaseLevel uint64
BlockCacheCapacity uint64
BlockCachePinnedUsage uint64
BlockCacheUsage uint64
CurSizeActiveMemTable uint64
CurSizeAllMemTables uint64
EstimateLiveDataSize uint64
EstimateNumKeys uint64
EstimateTableReadersMem uint64
LiveSSTFilesSize uint64
SizeAllMemTables uint64
OptionsStatistics string
}
Loading

0 comments on commit c992269

Please sign in to comment.