Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Add rocksdb compact command #1804

Merged
merged 13 commits into from
Jan 19, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Features

- (cli) [#1804] Add `rocksdb compact` command for manual DB compaction of state or blockstore.

## [v0.25.0]

Expand Down Expand Up @@ -317,6 +320,7 @@ the [changelog](https://github.com/cosmos/cosmos-sdk/blob/v0.38.4/CHANGELOG.md).
- [#257](https://github.com/Kava-Labs/kava/pulls/257) Include scripts to run
large-scale simulations remotely using aws-batch

[#1804]: https://github.com/Kava-Labs/kava/pull/1804
[#1784]: https://github.com/Kava-Labs/kava/pull/1784
[#1776]: https://github.com/Kava-Labs/kava/pull/1776
[#1770]: https://github.com/Kava-Labs/kava/pull/1770
Expand Down
194 changes: 194 additions & 0 deletions cmd/kava/cmd/rocksdb/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server"
"github.com/kava-labs/kava/cmd/kava/opendb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"

"github.com/tendermint/tendermint/libs/log"
)

var allowedDBs = []string{"application", "blockstore", "state"}

var CompactRocksDBCmd = &cobra.Command{
Use: fmt.Sprintf(
"compact <%s>",
strings.Join(allowedDBs, "|"),
),
Short: "force compacts RocksDB",
Long: `This is a utility command that performs a force compaction on the state or
blockstore. This should only be run once the node has stopped.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))

clientCtx := client.GetClientContextFromCmd(cmd)
ctx := server.GetServerContextFromCmd(cmd)

if server.GetAppDBBackend(ctx.Viper) != "rocksdb" {
return errors.New("compaction is currently only supported with rocksdb")
}

if !slices.Contains(allowedDBs, args[0]) {
return fmt.Errorf(
"invalid db name, must be one of the following: %s",
strings.Join(allowedDBs, ", "),
)
}

compactRocksDBs(clientCtx.HomeDir, logger, args[0])
return nil
},
}

// compactRocksDBs performs a manual compaction on the given db.
func compactRocksDBs(
rootDir string,
logger log.Logger,
dbName string,
) error {
dbPath := filepath.Join(rootDir, "data", dbName+".db")

dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath)
if err != nil {
return err
}

logger.Info("opening db", "path", dbPath)
db, _, err := grocksdb.OpenDbColumnFamilies(
dbOpts,
dbPath,
[]string{opendb.DefaultColumnFamilyName},
[]*grocksdb.Options{cfOpts},
)
if err != nil {
return err
}

if err != nil {
logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err)
return fmt.Errorf("failed to open db %s %w", dbPath, err)
}
defer db.Close()

logColumnFamilyMetadata(db, logger)

logger.Info("starting compaction...", "db", dbPath)

done := make(chan bool, 1)
drklee3 marked this conversation as resolved.
Show resolved Hide resolved
registerSignalHandler(db, logger, done)
startCompactionStatsOutput(db, logger, done)

// Actually run the compaction
db.CompactRange(grocksdb.Range{Start: nil, Limit: nil})
logger.Info("done compaction", "db", dbPath)

done <- true
return nil
}

// bytesToMB converts bytes to megabytes.
func bytesToMB(bytes uint64) float64 {
return float64(bytes) / 1024 / 1024
}

// logColumnFamilyMetadata outputs the column family and level metadata.
func logColumnFamilyMetadata(
store *grocksdb.DB,
logger log.Logger,
) {
metadata := store.GetColumnFamilyMetadata()

logger.Info(
"column family metadata",
"name", metadata.Name(),
"sizeMB", bytesToMB(metadata.Size()),
"fileCount", metadata.FileCount(),
"levels", len(metadata.LevelMetas()),
)

for _, level := range metadata.LevelMetas() {
logger.Info(
fmt.Sprintf("level %d metadata", level.Level()),
"sstMetas", strconv.Itoa(len(level.SstMetas())),
"sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64),
)
}
}

// startCompactionStatsOutput starts a goroutine that outputs compaction stats
// every minute.
func startCompactionStatsOutput(
store *grocksdb.DB,
logger log.Logger,
done chan bool,
) {
go func() {
ticker := time.NewTicker(1 * time.Minute)
drklee3 marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-done:
// Make sure we don't try reading from the closed db
return
case <-ticker.C:
{
compactionStats := store.GetProperty("rocksdb.stats")
fmt.Printf("%s\n", compactionStats)
}
}
}
}()
}

// registerSignalHandler registers a signal handler that will cancel any running
// compaction when the user presses Ctrl+C.
func registerSignalHandler(
store *grocksdb.DB,
logger log.Logger,
done chan bool,
) {
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
// Q: Can I close the DB when a manual compaction is in progress?
//
// A: No, it's not safe to do that. However, you call
// CancelAllBackgroundWork(db, true) in another thread to abort the
// running compactions, so that you can close the DB sooner. Since
// 6.5, you can also speed it up using
// DB::DisableManualCompaction().
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

go func() {
for sig := range c {
logger.Info(fmt.Sprintf(
"received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.",
sig,
))
store.DisableManualCompaction()
logger.Info("manual compaction disabled")

// Stop the logging
done <- true

logger.Info("exiting...")
os.Exit(0)
}
}()
}
19 changes: 19 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command containing subcommands that assist in
// rocksdb related tasks such as manual compaction.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands",
}

func init() {
RocksDBCmd.AddCommand(CompactRocksDBCmd)
}
14 changes: 14 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build !rocksdb
// +build !rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command when the rocksdb build tag is not set.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands, disabled because rocksdb build tag not set",
}
2 changes: 2 additions & 0 deletions cmd/kava/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/kava-labs/kava/app"
"github.com/kava-labs/kava/app/params"
"github.com/kava-labs/kava/cmd/kava/cmd/rocksdb"
"github.com/kava-labs/kava/cmd/kava/opendb"
)

Expand Down Expand Up @@ -123,5 +124,6 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de
newQueryCmd(),
newTxCmd(),
keyCommands(app.DefaultNodeHome),
rocksdb.RocksDBCmd,
)
}
12 changes: 6 additions & 6 deletions cmd/kava/opendb/opendb_rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
// default tm-db block cache size for RocksDB
defaultBlockCacheSize = 1 << 30

defaultColumnFamilyName = "default"
DefaultColumnFamilyName = "default"

enableMetricsOptName = "rocksdb.enable-metrics"
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
Expand Down Expand Up @@ -91,7 +91,7 @@ func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType)
// option will be overridden only in case if it explicitly specified in appOpts
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
optionsPath := filepath.Join(dir, "application.db")
dbOpts, cfOpts, err := loadLatestOptions(optionsPath)
dbOpts, cfOpts, err := LoadLatestOptions(optionsPath)
if err != nil {
return nil, err
}
Expand All @@ -112,10 +112,10 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, readOpts, enableMetrics, reportMetricsIntervalSecs)
}

// loadLatestOptions loads and returns database and column family options
// LoadLatestOptions loads and returns database and column family options
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
// if database exists it should have only one column family named default
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
func LoadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
return newDefaultOptions(), newDefaultOptions(), nil
Expand All @@ -127,7 +127,7 @@ func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error)
cfNames := latestOpts.ColumnFamilyNames()
cfOpts := latestOpts.ColumnFamilyOpts()
// db should have only one column family named default
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
ok := len(cfNames) == 1 && cfNames[0] == DefaultColumnFamilyName
if !ok {
return nil, nil, ErrUnexpectedConfiguration
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func newRocksDBWithOptions(
dbOpts.EnableStatistics()
}

db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{DefaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/kava/opendb/opendb_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -108,7 +108,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -210,7 +210,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
}()

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestNewRocksDBWithOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err = LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, 999, dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down
Loading