Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Add rocksdb compact command #1804

Merged
merged 13 commits into from
Jan 19, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features

- (cli) [#1785] Add `shard` CLI command to support creating partitions of data for standalone nodes
- (cli) [#1804] Add `rocksdb compact` command for manual DB compaction of state or blockstore.

## [v0.25.0]

Expand Down Expand Up @@ -320,6 +321,7 @@ the [changelog](https://github.com/cosmos/cosmos-sdk/blob/v0.38.4/CHANGELOG.md).
- [#257](https://github.com/Kava-Labs/kava/pulls/257) Include scripts to run
large-scale simulations remotely using aws-batch

[#1804]: https://github.com/Kava-Labs/kava/pull/1804
[#1785]: https://github.com/Kava-Labs/kava/pull/1785
[#1784]: https://github.com/Kava-Labs/kava/pull/1784
[#1776]: https://github.com/Kava-Labs/kava/pull/1776
Expand Down
216 changes: 216 additions & 0 deletions cmd/kava/cmd/rocksdb/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server"
"github.com/kava-labs/kava/cmd/kava/opendb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"

"github.com/tendermint/tendermint/libs/log"
)

const (
flagPrintStatsInterval = "print-stats-interval"
)

var allowedDBs = []string{"application", "blockstore", "state"}

func CompactRocksDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: fmt.Sprintf(
"compact <%s>",
strings.Join(allowedDBs, "|"),
),
Short: "force compacts RocksDB",
Long: `This is a utility command that performs a force compaction on the state or
blockstore. This should only be run once the node has stopped.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))

statsIntervalStr, err := cmd.Flags().GetString(flagPrintStatsInterval)
if err != nil {
return err
}

statsInterval, err := time.ParseDuration(statsIntervalStr)
if err != nil {
return fmt.Errorf("failed to parse duration for --%s: %w", flagPrintStatsInterval, err)
}

clientCtx := client.GetClientContextFromCmd(cmd)
ctx := server.GetServerContextFromCmd(cmd)

if server.GetAppDBBackend(ctx.Viper) != "rocksdb" {
return errors.New("compaction is currently only supported with rocksdb")
}

if !slices.Contains(allowedDBs, args[0]) {
return fmt.Errorf(
"invalid db name, must be one of the following: %s",
strings.Join(allowedDBs, ", "),
)
}

return compactRocksDBs(clientCtx.HomeDir, logger, args[0], statsInterval)
},
}

cmd.Flags().String(flagPrintStatsInterval, "1m", "duration string for how often to print compaction stats")

return cmd
}

// compactRocksDBs performs a manual compaction on the given db.
func compactRocksDBs(
rootDir string,
logger log.Logger,
dbName string,
statsInterval time.Duration,
) error {
dbPath := filepath.Join(rootDir, "data", dbName+".db")

dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath)
if err != nil {
return err
}

logger.Info("opening db", "path", dbPath)
db, _, err := grocksdb.OpenDbColumnFamilies(
dbOpts,
dbPath,
[]string{opendb.DefaultColumnFamilyName},
[]*grocksdb.Options{cfOpts},
)
if err != nil {
return err
}

if err != nil {
logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err)
return fmt.Errorf("failed to open db %s %w", dbPath, err)
}
defer db.Close()

logColumnFamilyMetadata(db, logger)

logger.Info("starting compaction...", "db", dbPath)

done := make(chan bool)
registerSignalHandler(db, logger, done)
startCompactionStatsOutput(db, logger, done, statsInterval)

// Actually run the compaction
db.CompactRange(grocksdb.Range{Start: nil, Limit: nil})
logger.Info("done compaction", "db", dbPath)

done <- true
return nil
}

// bytesToMB converts bytes to megabytes.
func bytesToMB(bytes uint64) float64 {
return float64(bytes) / 1024 / 1024
}

// logColumnFamilyMetadata outputs the column family and level metadata.
func logColumnFamilyMetadata(
db *grocksdb.DB,
logger log.Logger,
) {
metadata := db.GetColumnFamilyMetadata()

logger.Info(
"column family metadata",
"name", metadata.Name(),
"sizeMB", bytesToMB(metadata.Size()),
"fileCount", metadata.FileCount(),
"levels", len(metadata.LevelMetas()),
)

for _, level := range metadata.LevelMetas() {
logger.Info(
fmt.Sprintf("level %d metadata", level.Level()),
"sstMetas", strconv.Itoa(len(level.SstMetas())),
"sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64),
)
}
}

// startCompactionStatsOutput starts a goroutine that outputs compaction stats
// every minute.
func startCompactionStatsOutput(
db *grocksdb.DB,
logger log.Logger,
done chan bool,
statsInterval time.Duration,
) {
go func() {
ticker := time.NewTicker(statsInterval)
isClosed := false

for {
select {
// Make sure we don't try reading from the closed db.
// We continue the loop so that we can make sure the done channel
// does not stall indefinitely from repeated writes and no reader.
case <-done:
logger.Debug("stopping compaction stats output")
isClosed = true
case <-ticker.C:
if !isClosed {
compactionStats := db.GetProperty("rocksdb.stats")
fmt.Printf("%s\n", compactionStats)
}
}
}
}()
}

// registerSignalHandler registers a signal handler that will cancel any running
// compaction when the user presses Ctrl+C.
func registerSignalHandler(
db *grocksdb.DB,
logger log.Logger,
done chan bool,
) {
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
// Q: Can I close the DB when a manual compaction is in progress?
//
// A: No, it's not safe to do that. However, you call
// CancelAllBackgroundWork(db, true) in another thread to abort the
// running compactions, so that you can close the DB sooner. Since
// 6.5, you can also speed it up using
// DB::DisableManualCompaction().
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

go func() {
for sig := range c {
logger.Info(fmt.Sprintf(
"received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.",
sig,
))
db.DisableManualCompaction()
logger.Info("manual compaction disabled")

// Stop the logging
done <- true
}
}()
}
19 changes: 19 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command containing subcommands that assist in
// rocksdb related tasks such as manual compaction.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands",
}

func init() {
RocksDBCmd.AddCommand(CompactRocksDBCmd())
}
14 changes: 14 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build !rocksdb
// +build !rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command when the rocksdb build tag is not set.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands, disabled because rocksdb build tag not set",
}
2 changes: 2 additions & 0 deletions cmd/kava/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/kava-labs/kava/app"
"github.com/kava-labs/kava/app/params"
"github.com/kava-labs/kava/cmd/kava/cmd/rocksdb"
"github.com/kava-labs/kava/cmd/kava/opendb"
)

Expand Down Expand Up @@ -123,6 +124,7 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de
newQueryCmd(),
newTxCmd(),
keyCommands(app.DefaultNodeHome),
rocksdb.RocksDBCmd,
newShardCmd(opts),
)
}
12 changes: 6 additions & 6 deletions cmd/kava/opendb/opendb_rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
// default tm-db block cache size for RocksDB
defaultBlockCacheSize = 1 << 30

defaultColumnFamilyName = "default"
DefaultColumnFamilyName = "default"

enableMetricsOptName = "rocksdb.enable-metrics"
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
Expand Down Expand Up @@ -91,7 +91,7 @@ func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType)
// option will be overridden only in case if it explicitly specified in appOpts
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
optionsPath := filepath.Join(dir, "application.db")
dbOpts, cfOpts, err := loadLatestOptions(optionsPath)
dbOpts, cfOpts, err := LoadLatestOptions(optionsPath)
if err != nil {
return nil, err
}
Expand All @@ -112,10 +112,10 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, readOpts, enableMetrics, reportMetricsIntervalSecs)
}

// loadLatestOptions loads and returns database and column family options
// LoadLatestOptions loads and returns database and column family options
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
// if database exists it should have only one column family named default
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
func LoadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
return newDefaultOptions(), newDefaultOptions(), nil
Expand All @@ -127,7 +127,7 @@ func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error)
cfNames := latestOpts.ColumnFamilyNames()
cfOpts := latestOpts.ColumnFamilyOpts()
// db should have only one column family named default
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
ok := len(cfNames) == 1 && cfNames[0] == DefaultColumnFamilyName
if !ok {
return nil, nil, ErrUnexpectedConfiguration
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func newRocksDBWithOptions(
dbOpts.EnableStatistics()
}

db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{DefaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/kava/opendb/opendb_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -108,7 +108,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -210,7 +210,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
}()

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestNewRocksDBWithOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err = LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, 999, dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down
Loading