diff --git a/cmd/kava/opendb/metrics.go b/cmd/kava/opendb/metrics.go new file mode 100644 index 0000000000..f9ee175c24 --- /dev/null +++ b/cmd/kava/opendb/metrics.go @@ -0,0 +1,156 @@ +//go:build rocksdb +// +build rocksdb + +package opendb + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +type Metrics struct { + // Keys + NumberKeysWritten metrics.Gauge + NumberKeysRead metrics.Gauge + NumberKeysUpdated metrics.Gauge + EstimateNumKeys metrics.Gauge + + // Files + NumberFileOpens metrics.Gauge + NumberFileErrors metrics.Gauge + + // Memory + BlockCacheUsage metrics.Gauge + EstimateTableReadersMem metrics.Gauge + CurSizeAllMemTables metrics.Gauge + BlockCachePinnedUsage metrics.Gauge + + // Cache + BlockCacheMiss metrics.Gauge + BlockCacheHit metrics.Gauge + BlockCacheAdd metrics.Gauge + BlockCacheAddFailures metrics.Gauge +} + +func (m *Metrics) send(props *properties, stats *stats) { + // Keys + m.NumberKeysWritten.Set(float64(stats.NumberKeysWritten)) + m.NumberKeysRead.Set(float64(stats.NumberKeysRead)) + m.NumberKeysUpdated.Set(float64(stats.NumberKeysUpdated)) + m.EstimateNumKeys.Set(float64(props.EstimateNumKeys)) + + // Files + m.NumberFileOpens.Set(float64(stats.NumberFileOpens)) + m.NumberFileErrors.Set(float64(stats.NumberFileErrors)) + + // Memory + m.BlockCacheUsage.Set(float64(props.BlockCacheUsage)) + m.EstimateTableReadersMem.Set(float64(props.EstimateTableReadersMem)) + m.CurSizeAllMemTables.Set(float64(props.CurSizeAllMemTables)) + m.BlockCachePinnedUsage.Set(float64(props.BlockCachePinnedUsage)) + + // Cache + m.BlockCacheMiss.Set(float64(stats.BlockCacheMiss)) + m.BlockCacheHit.Set(float64(stats.BlockCacheHit)) + m.BlockCacheAdd.Set(float64(stats.BlockCacheAdd)) + m.BlockCacheAddFailures.Set(float64(stats.BlockCacheAddFailures)) +} + +func newMetrics() *Metrics { + labels := make([]string, 0) + + return &Metrics{ + // Keys + NumberKeysWritten: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "key", + Name: "number_keys_written", + Help: "", + }, labels), + NumberKeysRead: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "key", + Name: "number_keys_read", + Help: "", + }, labels), + NumberKeysUpdated: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "key", + Name: "number_keys_updated", + Help: "", + }, labels), + EstimateNumKeys: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "key", + Name: "estimate_num_keys", + Help: "", + }, labels), + + // Files + NumberFileOpens: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "file", + Name: "number_file_opens", + Help: "", + }, labels), + NumberFileErrors: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "file", + Name: "number_file_errors", + Help: "", + }, labels), + + // Memory + BlockCacheUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "memory", + Name: "block_cache_usage", + Help: "", + }, labels), + EstimateTableReadersMem: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "memory", + Name: "estimate_table_readers_mem", + Help: "", + }, labels), + CurSizeAllMemTables: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "memory", + Name: "cur_size_all_mem_tables", + Help: "", + }, labels), + BlockCachePinnedUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "memory", + Name: "block_cache_pinned_usage", + Help: "", + }, labels), + + // Cache + BlockCacheMiss: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "cache", + Name: "block_cache_miss", + Help: "", + }, labels), + BlockCacheHit: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "cache", + Name: "block_cache_hit", + Help: "", + }, labels), + BlockCacheAdd: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "cache", + Name: "block_cache_add", + Help: "", + }, labels), + BlockCacheAddFailures: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "rocksdb", + Subsystem: "cache", + Name: "block_cache_add_failures", + Help: "", + }, labels), + } +} diff --git a/cmd/kava/opendb/opendb_rocksdb.go b/cmd/kava/opendb/opendb_rocksdb.go index d8042e1ec9..613dacc305 100644 --- a/cmd/kava/opendb/opendb_rocksdb.go +++ b/cmd/kava/opendb/opendb_rocksdb.go @@ -23,10 +23,12 @@ package opendb import ( "errors" "fmt" + "log" "os" "path/filepath" "runtime" "strings" + "time" "github.com/cosmos/cosmos-sdk/server/types" "github.com/linxGnu/grocksdb" @@ -136,10 +138,31 @@ func newRocksDBWithOptions(name string, dir string, dbOpts, cfOpts *grocksdb.Opt return nil, fmt.Errorf("failed to create db path: %w", err) } + dbOpts.EnableStatistics() + //cfOpts.EnableStatistics() db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts}) if err != nil { return nil, err } + + keyMetrics := newMetrics() + + go func() { + ticker := time.NewTicker(time.Second * 15) + for { + select { + case <-ticker.C: + props, stats, err := getPropsAndStats(db) + if err != nil { + log.Fatal(err) + continue + } + + keyMetrics.send(props, stats) + } + } + }() + ro := grocksdb.NewDefaultReadOptions() wo := grocksdb.NewDefaultWriteOptions() woSync := grocksdb.NewDefaultWriteOptions() @@ -168,3 +191,24 @@ func newDefaultOptions() *grocksdb.Options { return opts } + +func getPropsAndStats(db *grocksdb.DB) (*properties, *stats, error) { + propsLoader := newPropsLoader(db) + props, err := propsLoader.load() + if err != nil { + return nil, nil, err + } + + metricMap, err := parseSerializedStats(props.OptionsStatistics) + if err != nil { + return nil, nil, err + } + + statLoader := newStatLoader(metricMap) + stats, err := statLoader.load() + if err != nil { + return nil, nil, err + } + + return props, stats, nil +} diff --git a/cmd/kava/opendb/props_loader.go b/cmd/kava/opendb/props_loader.go new file mode 100644 index 0000000000..b1e5871eab --- /dev/null +++ b/cmd/kava/opendb/props_loader.go @@ -0,0 +1,73 @@ +//go:build rocksdb +// +build rocksdb + +package opendb + +import ( + "fmt" + "strings" + + "github.com/linxGnu/grocksdb" + "github.com/pkg/errors" +) + +type propsLoader struct { + db *grocksdb.DB + errorMsgs []string +} + +func newPropsLoader(db *grocksdb.DB) *propsLoader { + return &propsLoader{ + db: db, + errorMsgs: make([]string, 0), + } +} + +func (l *propsLoader) load() (*properties, error) { + props := &properties{ + BaseLevel: l.getIntProperty("rocksdb.base-level"), + BlockCacheCapacity: l.getIntProperty("rocksdb.block-cache-capacity"), + BlockCachePinnedUsage: l.getIntProperty("rocksdb.block-cache-pinned-usage"), + BlockCacheUsage: l.getIntProperty("rocksdb.block-cache-usage"), + CurSizeActiveMemTable: l.getIntProperty("rocksdb.cur-size-active-mem-table"), + CurSizeAllMemTables: l.getIntProperty("rocksdb.cur-size-all-mem-tables"), + EstimateLiveDataSize: l.getIntProperty("rocksdb.estimate-live-data-size"), + EstimateNumKeys: l.getIntProperty("rocksdb.estimate-num-keys"), + EstimateTableReadersMem: l.getIntProperty("rocksdb.estimate-table-readers-mem"), + LiveSSTFilesSize: l.getIntProperty("rocksdb.live-sst-files-size"), + SizeAllMemTables: l.getIntProperty("rocksdb.size-all-mem-tables"), + OptionsStatistics: l.db.GetProperty("rocksdb.options-statistics"), + } + + if len(l.errorMsgs) != 0 { + errorMsg := strings.Join(l.errorMsgs, ";") + return nil, errors.New(errorMsg) + } + + return props, nil +} + +func (l *propsLoader) getIntProperty(propName string) uint64 { + value, ok := l.db.GetIntProperty(propName) + if !ok { + l.errorMsgs = append(l.errorMsgs, fmt.Sprintf("can't get %v int property", propName)) + return 0 + } + + return value +} + +type properties struct { + BaseLevel uint64 + BlockCacheCapacity uint64 + BlockCachePinnedUsage uint64 + BlockCacheUsage uint64 + CurSizeActiveMemTable uint64 + CurSizeAllMemTables uint64 + EstimateLiveDataSize uint64 + EstimateNumKeys uint64 + EstimateTableReadersMem uint64 + LiveSSTFilesSize uint64 + SizeAllMemTables uint64 + OptionsStatistics string +} diff --git a/cmd/kava/opendb/stat_parser.go b/cmd/kava/opendb/stat_parser.go new file mode 100644 index 0000000000..3bcfeee0b9 --- /dev/null +++ b/cmd/kava/opendb/stat_parser.go @@ -0,0 +1,87 @@ +//go:build rocksdb +// +build rocksdb + +package opendb + +import ( + "strings" + + "github.com/pkg/errors" +) + +type stat struct { + name string + props map[string]string +} + +func parseSerializedStats(serializedStats string) (map[string]*stat, error) { + stats := make(map[string]*stat, 0) + + serializedStatList := strings.Split(serializedStats, "\n") + serializedStatList = serializedStatList[:len(serializedStatList)-1] + for _, serializedStat := range serializedStatList { + stat, err := parseSerializedStat(serializedStat) + if err != nil { + return nil, err + } + + stats[stat.name] = stat + } + + return stats, nil +} + +func parseSerializedStat(serializedStat string) (*stat, error) { + tokens := strings.Split(serializedStat, " ") + tokensNum := len(tokens) + if err := validateTokens(tokens); err != nil { + return nil, errors.Wrap(err, "tokens are invalid") + } + + props := make(map[string]string) + for idx := 1; idx < tokensNum; idx += 3 { + key := tokens[idx] + sep := tokens[idx+1] + value := tokens[idx+2] + + if err := validateStatProperty(key, value, sep); err != nil { + return nil, errors.Wrap(err, "invalid stat property") + } + + props[key] = value + } + + return &stat{ + name: tokens[0], + props: props, + }, nil +} + +func validateTokens(tokens []string) error { + tokensNum := len(tokens) + if tokensNum < 4 { + return errors.Errorf("invalid number of tokens: %v, tokens: %v", tokensNum, tokens) + } + if (tokensNum-1)%3 != 0 { + return errors.Errorf("invalid number of tokens: %v, tokens: %v", tokensNum, tokens) + } + if tokens[0] == "" { + return errors.Errorf("stat name shouldn't be empty") + } + + return nil +} + +func validateStatProperty(key, value, sep string) error { + if key == "" { + return errors.Errorf("key shouldn't be empty") + } + if sep != ":" { + return errors.Errorf("separator should be :") + } + if value == "" { + return errors.Errorf("value shouldn't be empty") + } + + return nil +} diff --git a/cmd/kava/opendb/stats_loader.go b/cmd/kava/opendb/stats_loader.go new file mode 100644 index 0000000000..236cc5350d --- /dev/null +++ b/cmd/kava/opendb/stats_loader.go @@ -0,0 +1,253 @@ +//go:build rocksdb +// +build rocksdb + +package opendb + +import ( + "strconv" + + "github.com/pkg/errors" +) + +const ( + sum = "SUM" + count = "COUNT" + p50 = "P50" + p95 = "P95" + p99 = "P99" + p100 = "P100" +) + +type statLoader struct { + statMap map[string]*stat + + errors []error +} + +func newStatLoader(statMap map[string]*stat) *statLoader { + return &statLoader{ + statMap: statMap, + errors: make([]error, 0), + } +} + +type stats struct { + NumberKeysWritten int64 + NumberKeysRead int64 + NumberKeysUpdated int64 + + // total block cache misses + // BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + + // BLOCK_CACHE_FILTER_MISS + + // BLOCK_CACHE_DATA_MISS; + // BLOCK_CACHE_INDEX_MISS: # of times cache miss when accessing index block from block cache. + // BLOCK_CACHE_FILTER_MISS: # of times cache miss when accessing filter block from block cache. + // BLOCK_CACHE_DATA_MISS: # of times cache miss when accessing data block from block cache. + BlockCacheMiss int64 + + // total block cache hit + // BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + + // BLOCK_CACHE_FILTER_HIT + + // BLOCK_CACHE_DATA_HIT; + // BLOCK_CACHE_INDEX_HIT: # of times cache hit when accessing index block from block cache. + // BLOCK_CACHE_FILTER_HIT: # of times cache hit when accessing filter block from block cache. + // BLOCK_CACHE_DATA_HIT: # of times cache hit when accessing data block from block cache. + BlockCacheHit int64 + + // # of blocks added to block cache. + BlockCacheAdd int64 + // # of failures when adding blocks to block cache. + BlockCacheAddFailures int64 + + CompactReadBytes int64 // Bytes read during compaction + CompactWriteBytes int64 // Bytes written during compaction + + CompactionTimesMicros *float64Histogram + CompactionTimesCPUMicros *float64Histogram + NumFilesInSingleCompaction *float64Histogram + + // Read amplification statistics. + // Read amplification can be calculated using this formula + // (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES) + // + // REQUIRES: ReadOptions::read_amp_bytes_per_bit to be enabled + // TODO(yevhenii): seems not working? + ReadAmpEstimateUsefulBytes int64 // Estimate of total bytes actually used. + ReadAmpTotalReadBytes int64 // Total size of loaded data blocks. + + NumberFileOpens int64 + NumberFileErrors int64 + + // # of times bloom filter has avoided file reads, i.e., negatives. + BloomFilterUseful int64 + // # of times bloom FullFilter has not avoided the reads. + BloomFilterFullPositive int64 + // # of times bloom FullFilter has not avoided the reads and data actually + // exist. + BloomFilterFullTruePositive int64 + + // # of memtable hits. + MemtableHit int64 + // # of memtable misses. + MemtableMiss int64 + + // # of Get() queries served by L0 + GetHitL0 int64 + // # of Get() queries served by L1 + GetHitL1 int64 + // # of Get() queries served by L2 and up + GetHitL2AndUp int64 + + // The number of uncompressed bytes issued by DB::Put(), DB::Delete(), + // DB::Merge(), and DB::Write(). + BytesWritten int64 + // The number of uncompressed bytes read from DB::Get(). It could be + // either from memtables, cache, or table files. + // For the number of logical bytes read from DB::MultiGet(), + // please use NUMBER_MULTIGET_BYTES_READ. + BytesRead int64 + + // Writer has to wait for compaction or flush to finish. + StallMicros int64 + + // Last level and non-last level read statistics + LastLevelReadBytes int64 + LastLevelReadCount int64 + NonLastLevelReadBytes int64 + NonLastLevelReadCount int64 + + DBGetMicros *float64Histogram + DBWriteMicros *float64Histogram + + // Value size distribution in each operation + BytesPerRead *float64Histogram + BytesPerWrite *float64Histogram + BytesPerMultiget *float64Histogram + + // Time spent flushing memtable to disk + FlushMicros *float64Histogram +} + +type float64Histogram struct { + Sum float64 + Count float64 + P50 float64 + P95 float64 + P99 float64 + P100 float64 +} + +func (l *statLoader) error() error { + if len(l.errors) != 0 { + return errors.Errorf("%v", l.errors) + } + + return nil +} + +func (l *statLoader) load() (*stats, error) { + stats := &stats{ + NumberKeysWritten: l.getInt64StatValue("rocksdb.number.keys.written", count), + NumberKeysRead: l.getInt64StatValue("rocksdb.number.keys.read", count), + NumberKeysUpdated: l.getInt64StatValue("rocksdb.number.keys.updated", count), + BlockCacheMiss: l.getInt64StatValue("rocksdb.block.cache.miss", count), + BlockCacheHit: l.getInt64StatValue("rocksdb.block.cache.hit", count), + BlockCacheAdd: l.getInt64StatValue("rocksdb.block.cache.add", count), + BlockCacheAddFailures: l.getInt64StatValue("rocksdb.block.cache.add.failures", count), + CompactReadBytes: l.getInt64StatValue("rocksdb.compact.read.bytes", count), + CompactWriteBytes: l.getInt64StatValue("rocksdb.compact.write.bytes", count), + CompactionTimesMicros: l.getFloat64HistogramStatValue("rocksdb.compaction.times.micros"), + CompactionTimesCPUMicros: l.getFloat64HistogramStatValue("rocksdb.compaction.times.cpu_micros"), + NumFilesInSingleCompaction: l.getFloat64HistogramStatValue("rocksdb.numfiles.in.singlecompaction"), + ReadAmpEstimateUsefulBytes: l.getInt64StatValue("rocksdb.read.amp.estimate.useful.bytes", count), + ReadAmpTotalReadBytes: l.getInt64StatValue("rocksdb.read.amp.total.read.bytes", count), + NumberFileOpens: l.getInt64StatValue("rocksdb.no.file.opens", count), + NumberFileErrors: l.getInt64StatValue("rocksdb.no.file.errors", count), + BloomFilterUseful: l.getInt64StatValue("rocksdb.bloom.filter.useful", count), + BloomFilterFullPositive: l.getInt64StatValue("rocksdb.bloom.filter.full.positive", count), + BloomFilterFullTruePositive: l.getInt64StatValue("rocksdb.bloom.filter.full.true.positive", count), + MemtableHit: l.getInt64StatValue("rocksdb.memtable.hit", count), + MemtableMiss: l.getInt64StatValue("rocksdb.memtable.miss", count), + GetHitL0: l.getInt64StatValue("rocksdb.l0.hit", count), + GetHitL1: l.getInt64StatValue("rocksdb.l1.hit", count), + GetHitL2AndUp: l.getInt64StatValue("rocksdb.l2andup.hit", count), + BytesWritten: l.getInt64StatValue("rocksdb.bytes.written", count), + BytesRead: l.getInt64StatValue("rocksdb.bytes.read", count), + StallMicros: l.getInt64StatValue("rocksdb.stall.micros", count), + LastLevelReadBytes: l.getInt64StatValue("rocksdb.last.level.read.bytes", count), + LastLevelReadCount: l.getInt64StatValue("rocksdb.last.level.read.count", count), + NonLastLevelReadBytes: l.getInt64StatValue("rocksdb.non.last.level.read.bytes", count), + NonLastLevelReadCount: l.getInt64StatValue("rocksdb.non.last.level.read.count", count), + DBGetMicros: l.getFloat64HistogramStatValue("rocksdb.db.get.micros"), + DBWriteMicros: l.getFloat64HistogramStatValue("rocksdb.db.write.micros"), + BytesPerRead: l.getFloat64HistogramStatValue("rocksdb.bytes.per.read"), + BytesPerWrite: l.getFloat64HistogramStatValue("rocksdb.bytes.per.write"), + BytesPerMultiget: l.getFloat64HistogramStatValue("rocksdb.bytes.per.multiget"), + FlushMicros: l.getFloat64HistogramStatValue("rocksdb.db.flush.micros"), + } + + err := l.error() + if err != nil { + return nil, err + } + + return stats, nil +} + +func (l *statLoader) getFloat64HistogramStatValue(statName string) *float64Histogram { + return &float64Histogram{ + Sum: l.getFloat64StatValue(statName, sum), + Count: l.getFloat64StatValue(statName, count), + P50: l.getFloat64StatValue(statName, p50), + P95: l.getFloat64StatValue(statName, p95), + P99: l.getFloat64StatValue(statName, p99), + P100: l.getFloat64StatValue(statName, p100), + } +} + +func (l *statLoader) getInt64StatValue(statName, propName string) int64 { + stringVal := l.getStatValue(statName, propName) + if stringVal == "" { + l.errors = append(l.errors, errors.Errorf("can't get stat by name: %v", statName)) + return 0 + } + + intVal, err := strconv.ParseInt(stringVal, 10, 64) + if err != nil { + l.errors = append(l.errors, errors.Wrap(err, "can't parse int")) + return 0 + } + + return intVal +} + +func (l *statLoader) getFloat64StatValue(statName, propName string) float64 { + stringVal := l.getStatValue(statName, propName) + if stringVal == "" { + l.errors = append(l.errors, errors.Errorf("can't get stat by name: %v", statName)) + return 0 + } + + floatVal, err := strconv.ParseFloat(stringVal, 64) + if err != nil { + l.errors = append(l.errors, errors.Wrap(err, "can't parse float")) + return 0 + } + + return floatVal +} + +func (l *statLoader) getStatValue(statName, propName string) string { + stat, ok := l.statMap[statName] + if !ok { + l.errors = append(l.errors, errors.Errorf("stat %v doesn't exist", statName)) + return "" + } + prop, ok := stat.props[propName] + if !ok { + l.errors = append(l.errors, errors.Errorf("stat %v doesn't have %v property", statName, propName)) + return "" + } + + return prop +} diff --git a/go.mod b/go.mod index a872a2e7e3..d0d9a69436 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/linxGnu/grocksdb v1.8.0 github.com/pelletier/go-toml/v2 v2.0.6 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 github.com/spf13/cast v1.5.0 github.com/spf13/cobra v1.6.1 @@ -147,7 +148,6 @@ require ( github.com/mtibben/percent v0.2.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.40.0 // indirect