diff --git a/balloon/hyper/tree_test.go b/balloon/hyper/tree_test.go index 48368e6e2..f69c333e7 100644 --- a/balloon/hyper/tree_test.go +++ b/balloon/hyper/tree_test.go @@ -17,15 +17,18 @@ package hyper import ( + "context" "encoding/binary" "net/http" "testing" + "time" "github.com/bbva/qed/balloon/cache" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" "github.com/bbva/qed/metrics" "github.com/bbva/qed/storage" + "github.com/bbva/qed/storage/rocks" "github.com/bbva/qed/testutils/rand" storage_utils "github.com/bbva/qed/testutils/storage" "github.com/bbva/qed/util" @@ -272,9 +275,8 @@ func BenchmarkAdd(b *testing.B) { tree := NewHyperTree(hashing.NewSha256Hasher, store, freeCache) - prometheus.MustRegister(metrics.QedHyperAddTotal) - http.Handle("/metrics", promhttp.Handler()) - go http.ListenAndServe(":2112", nil) + srvCloseF := startMetricsServer(store) + defer srvCloseF() b.ResetTimer() b.N = 1000000 @@ -283,9 +285,27 @@ func BenchmarkAdd(b *testing.B) { binary.LittleEndian.PutUint64(index, uint64(i)) elem := append(rand.Bytes(32), index...) _, mutations, err := tree.Add(hasher.Do(elem), uint64(i)) - if err != nil { - b.Fatal(err) + require.NoError(b, err) + require.NoError(b, store.Mutate(mutations)) + metrics.QedHyperAddTotal.Inc() + } + +} + +func startMetricsServer(store *rocks.RocksDBStore) func() { + reg := prometheus.NewRegistry() + reg.Register(metrics.QedHyperAddTotal) + store.RegisterMetrics(reg) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + srv := &http.Server{Addr: ":2112", Handler: mux} + go srv.ListenAndServe() + closeF := func() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + log.Fatal(err) } - store.Mutate(mutations) } + return closeF } diff --git a/storage/rocks/metrics.go b/storage/rocks/metrics.go index 8db84cba8..9ac2d5daa 100644 --- a/storage/rocks/metrics.go +++ b/storage/rocks/metrics.go @@ -17,7 +17,10 @@ package rocks import ( + "fmt" + "github.com/bbva/qed/rocksdb" + "github.com/bbva/qed/storage" "github.com/prometheus/client_golang/prometheus" ) @@ -38,16 +41,22 @@ type rocksDBMetrics struct { *getsMetrics *ioMetrics *compressMetrics + tables []*perTableMetrics } -func newRocksDBMetrics(stats *rocksdb.Statistics, cache *rocksdb.Cache) *rocksDBMetrics { +func newRocksDBMetrics(store *RocksDBStore) *rocksDBMetrics { + tables := make([]*perTableMetrics, 0) + tables = append(tables, newPerTableMetrics(storage.HyperCacheTable, store)) + tables = append(tables, newPerTableMetrics(storage.HistoryCacheTable, store)) + tables = append(tables, newPerTableMetrics(storage.FSMStateTable, store)) return &rocksDBMetrics{ - blockCacheMetrics: newBlockCacheMetrics(stats, cache), - bloomFilterMetrics: newBloomFilterMetrics(stats), - memtableMetrics: newMemtableMetrics(stats), - getsMetrics: newGetsMetrics(stats), - ioMetrics: newIOMetrics(stats), - compressMetrics: newCompressMetrics(stats), + blockCacheMetrics: newBlockCacheMetrics(store.stats, store.blockCache), + bloomFilterMetrics: newBloomFilterMetrics(store.stats), + memtableMetrics: newMemtableMetrics(store.stats), + getsMetrics: newGetsMetrics(store.stats), + ioMetrics: newIOMetrics(store.stats), + compressMetrics: newCompressMetrics(store.stats), + tables: tables, } } @@ -60,6 +69,9 @@ func (m *rocksDBMetrics) collectors() []prometheus.Collector { collectors = append(collectors, m.getsMetrics.collectors()...) collectors = append(collectors, m.ioMetrics.collectors()...) collectors = append(collectors, m.compressMetrics.collectors()...) + for _, table := range m.tables { + collectors = append(collectors, table.collectors()...) + } return collectors } @@ -637,6 +649,291 @@ func (m *compressMetrics) collectors() []prometheus.Collector { } } +type perTableMetrics struct { + NumFilesAtLevelN []prometheus.GaugeFunc + NumImmutableMemtables prometheus.GaugeFunc + NumImmutableMemtablesFlushed prometheus.GaugeFunc + NumRunningFlushes prometheus.GaugeFunc + NumRunningCompactions prometheus.GaugeFunc + CurrentSizeActiveMemtable prometheus.GaugeFunc + CurrentSizeAllMemtables prometheus.GaugeFunc + SizeAllMemtables prometheus.GaugeFunc + NumEntriesActiveMemtable prometheus.GaugeFunc + NumEntriesImmutableMemtables prometheus.GaugeFunc + EstimatedNumKeys prometheus.GaugeFunc + EstimateTableReadersMem prometheus.GaugeFunc + NumLiveVersions prometheus.GaugeFunc + EstimatedLiveDataSize prometheus.GaugeFunc + TotalSSTFilesSize prometheus.GaugeFunc + TotalLiveSSTFilesSize prometheus.GaugeFunc + EstimatedPendingCompactionBytes prometheus.GaugeFunc + ActualDelayedWriteRate prometheus.GaugeFunc + BlockCacheUsage prometheus.GaugeFunc + BlockCachePinnedUsage prometheus.GaugeFunc +} + +func newPerTableMetrics(table storage.Table, store *RocksDBStore) *perTableMetrics { + m := &perTableMetrics{ + NumImmutableMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_immutable_memtables", + Help: "Number of immutable memtables that have not yet been flushed.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-immutable-mem-table", store.cfHandles[table])) + }, + ), + NumImmutableMemtablesFlushed: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_immutable_memtables_flushed", + Help: "Number of immutable memtables that have already been flushed.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-immutable-mem-table-flushed", store.cfHandles[table])) + }, + ), + NumRunningFlushes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_running_flushes", + Help: "Number of currently running flushes.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-running-flushes", store.cfHandles[table])) + }, + ), + NumRunningCompactions: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_running_compactions", + Help: "Number of currently running compactions.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-running-compactions", store.cfHandles[table])) + }, + ), + CurrentSizeActiveMemtable: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "cur_size_active_memtable", + Help: "Approximate size of active memtable (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.cur-size-active-mem-table", store.cfHandles[table])) + }, + ), + CurrentSizeAllMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "cur_size_all_memtables", + Help: "Approximate size of active and unflushed immutable memtables (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.cur-size-all-mem-tables", store.cfHandles[table])) + }, + ), + SizeAllMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "size_all_memtables", + Help: "Approximate size of active, unflushed immutable, and pinned immutable memtables (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.size-all-mem-tables", store.cfHandles[table])) + }, + ), + NumEntriesActiveMemtable: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_entries_active_memtable", + Help: "Total number of entries in the active memtable.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-entries-active-mem-table", store.cfHandles[table])) + }, + ), + NumEntriesImmutableMemtables: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_entries_imm_memtables", + Help: "Total number of entries in the unflushed immutable memtables.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-entries-imm-mem-tables", store.cfHandles[table])) + }, + ), + EstimatedNumKeys: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_num_keys", + Help: "Estimated number of total keys in the active and unflushed immutable memtables and storage.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-num-keys", store.cfHandles[table])) + }, + ), + EstimateTableReadersMem: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_table_readers_mem", + Help: "Estimated memory used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-table-readers-mem", store.cfHandles[table])) + }, + ), + NumLiveVersions: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "num_live_versions", + Help: "Number of live versions.", + // A `Version` is an internal data structure. More live versions often mean more SST files + // are held from being deleted, by iterators or unfinished compactions. + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.num-live-versions", store.cfHandles[table])) + }, + ), + EstimatedLiveDataSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_live_data_size", + Help: "Estimate of the amount of live data (bytes).", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-live-data-size", store.cfHandles[table])) + }, + ), + TotalSSTFilesSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "total_sst_files_size", + Help: "Total size (bytes) of all SST files.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.total-sst-files-size", store.cfHandles[table])) + }, + ), + TotalLiveSSTFilesSize: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "total_live_sst_files_size", + Help: "Total size (bytes) of all live SST files that belongs to theh last LSM tree.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.live-sst-files-size", store.cfHandles[table])) + }, + ), + EstimatedPendingCompactionBytes: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "estimated_pending_compaction_bytes", + Help: "Estimated total number of bytes compaction needs to rewrite to get all levels down to under target size.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.estimate-pending-compaction-bytes", store.cfHandles[table])) + }, + ), + ActualDelayedWriteRate: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "actual_delayed_write_rate", + Help: "Current actual delayed write rate. 0 means no delay.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.actual-delayed-write-rate", store.cfHandles[table])) + }, + ), + BlockCacheUsage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "block_cache_usage", + Help: "Memory size (bytes) for the entries residing in block cache.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.block-cache-usage", store.cfHandles[table])) + }, + ), + BlockCachePinnedUsage: prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: "block_cache_pinned_usage", + Help: "Memory size (bytes) for the entries being pinned in block cache.", + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF("rocksdb.block-cache-pinned-usage", store.cfHandles[table])) + }, + ), + } + numFileAtLevels := make([]prometheus.GaugeFunc, 0) + for i := 0; i <= 5; i++ { + propName := fmt.Sprintf("rocksdb.num-files-at-level%d", i) + numFileAtLevels = append(numFileAtLevels, prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "cf_" + table.String(), + Name: fmt.Sprintf("num_files_at_level_%d", i), + Help: fmt.Sprintf("Number of files at level %d.", i), + }, + func() float64 { + return float64(store.db.GetUint64PropertyCF(propName, store.cfHandles[table])) + }, + )) + } + m.NumFilesAtLevelN = numFileAtLevels + return m +} + +// collectors satisfies the prom.PrometheusCollector interface. +func (m *perTableMetrics) collectors() []prometheus.Collector { + c := []prometheus.Collector{ + m.NumImmutableMemtables, + m.NumImmutableMemtablesFlushed, + m.NumRunningFlushes, + m.NumRunningCompactions, + m.CurrentSizeActiveMemtable, + m.CurrentSizeAllMemtables, + m.SizeAllMemtables, + m.NumEntriesActiveMemtable, + m.NumEntriesImmutableMemtables, + m.EstimatedNumKeys, + m.EstimateTableReadersMem, + m.NumLiveVersions, + m.EstimatedLiveDataSize, + m.TotalSSTFilesSize, + m.TotalLiveSSTFilesSize, + m.EstimatedPendingCompactionBytes, + m.ActualDelayedWriteRate, + m.BlockCacheUsage, + m.BlockCachePinnedUsage, + } + for _, metric := range m.NumFilesAtLevelN { + c = append(c, metric) + } + return c +} + func extractMetric(stats *rocksdb.Statistics, ticker rocksdb.TickerType) func() float64 { return func() float64 { return float64(stats.GetAndResetTickerCount(ticker)) diff --git a/storage/rocks/rocksdb_store.go b/storage/rocks/rocksdb_store.go index 1532dc70e..e13820b21 100644 --- a/storage/rocks/rocksdb_store.go +++ b/storage/rocks/rocksdb_store.go @@ -133,7 +133,7 @@ func NewRocksDBStoreOpts(opts *Options) (*RocksDBStore, error) { } if stats != nil { - store.metrics = newRocksDBMetrics(stats, blockCache) + store.metrics = newRocksDBMetrics(store) } return store, nil