-
Notifications
You must be signed in to change notification settings - Fork 370
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cli): Add
rocksdb compact
command (#1804)
* Add rocksdb compact command * Increase compaction log output to 1 min * Use GetClient/ServerContextFromCmd * Update cmd info * Add doc to logColumnFamilyMetadata * Update RocksDBCmd docs * Add changelog entry * Load latest options from rocksdb * Allow application.db to be compacted * Rename more store -> db * Ensure compaction stats output does not run when db is closed * Add flag for custom stat output interval, return error
- Loading branch information
1 parent
9df1d68
commit 4f86ef9
Showing
7 changed files
with
264 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
//go:build rocksdb | ||
// +build rocksdb | ||
|
||
package rocksdb | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"os" | ||
"os/signal" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/cosmos/cosmos-sdk/client" | ||
"github.com/cosmos/cosmos-sdk/server" | ||
"github.com/kava-labs/kava/cmd/kava/opendb" | ||
"github.com/linxGnu/grocksdb" | ||
"github.com/spf13/cobra" | ||
"golang.org/x/exp/slices" | ||
|
||
"github.com/tendermint/tendermint/libs/log" | ||
) | ||
|
||
const ( | ||
flagPrintStatsInterval = "print-stats-interval" | ||
) | ||
|
||
var allowedDBs = []string{"application", "blockstore", "state"} | ||
|
||
func CompactRocksDBCmd() *cobra.Command { | ||
cmd := &cobra.Command{ | ||
Use: fmt.Sprintf( | ||
"compact <%s>", | ||
strings.Join(allowedDBs, "|"), | ||
), | ||
Short: "force compacts RocksDB", | ||
Long: `This is a utility command that performs a force compaction on the state or | ||
blockstore. This should only be run once the node has stopped.`, | ||
Args: cobra.ExactArgs(1), | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) | ||
|
||
statsIntervalStr, err := cmd.Flags().GetString(flagPrintStatsInterval) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
statsInterval, err := time.ParseDuration(statsIntervalStr) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse duration for --%s: %w", flagPrintStatsInterval, err) | ||
} | ||
|
||
clientCtx := client.GetClientContextFromCmd(cmd) | ||
ctx := server.GetServerContextFromCmd(cmd) | ||
|
||
if server.GetAppDBBackend(ctx.Viper) != "rocksdb" { | ||
return errors.New("compaction is currently only supported with rocksdb") | ||
} | ||
|
||
if !slices.Contains(allowedDBs, args[0]) { | ||
return fmt.Errorf( | ||
"invalid db name, must be one of the following: %s", | ||
strings.Join(allowedDBs, ", "), | ||
) | ||
} | ||
|
||
return compactRocksDBs(clientCtx.HomeDir, logger, args[0], statsInterval) | ||
}, | ||
} | ||
|
||
cmd.Flags().String(flagPrintStatsInterval, "1m", "duration string for how often to print compaction stats") | ||
|
||
return cmd | ||
} | ||
|
||
// compactRocksDBs performs a manual compaction on the given db. | ||
func compactRocksDBs( | ||
rootDir string, | ||
logger log.Logger, | ||
dbName string, | ||
statsInterval time.Duration, | ||
) error { | ||
dbPath := filepath.Join(rootDir, "data", dbName+".db") | ||
|
||
dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
logger.Info("opening db", "path", dbPath) | ||
db, _, err := grocksdb.OpenDbColumnFamilies( | ||
dbOpts, | ||
dbPath, | ||
[]string{opendb.DefaultColumnFamilyName}, | ||
[]*grocksdb.Options{cfOpts}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err != nil { | ||
logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err) | ||
return fmt.Errorf("failed to open db %s %w", dbPath, err) | ||
} | ||
defer db.Close() | ||
|
||
logColumnFamilyMetadata(db, logger) | ||
|
||
logger.Info("starting compaction...", "db", dbPath) | ||
|
||
done := make(chan bool) | ||
registerSignalHandler(db, logger, done) | ||
startCompactionStatsOutput(db, logger, done, statsInterval) | ||
|
||
// Actually run the compaction | ||
db.CompactRange(grocksdb.Range{Start: nil, Limit: nil}) | ||
logger.Info("done compaction", "db", dbPath) | ||
|
||
done <- true | ||
return nil | ||
} | ||
|
||
// bytesToMB converts bytes to megabytes. | ||
func bytesToMB(bytes uint64) float64 { | ||
return float64(bytes) / 1024 / 1024 | ||
} | ||
|
||
// logColumnFamilyMetadata outputs the column family and level metadata. | ||
func logColumnFamilyMetadata( | ||
db *grocksdb.DB, | ||
logger log.Logger, | ||
) { | ||
metadata := db.GetColumnFamilyMetadata() | ||
|
||
logger.Info( | ||
"column family metadata", | ||
"name", metadata.Name(), | ||
"sizeMB", bytesToMB(metadata.Size()), | ||
"fileCount", metadata.FileCount(), | ||
"levels", len(metadata.LevelMetas()), | ||
) | ||
|
||
for _, level := range metadata.LevelMetas() { | ||
logger.Info( | ||
fmt.Sprintf("level %d metadata", level.Level()), | ||
"sstMetas", strconv.Itoa(len(level.SstMetas())), | ||
"sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64), | ||
) | ||
} | ||
} | ||
|
||
// startCompactionStatsOutput starts a goroutine that outputs compaction stats | ||
// every minute. | ||
func startCompactionStatsOutput( | ||
db *grocksdb.DB, | ||
logger log.Logger, | ||
done chan bool, | ||
statsInterval time.Duration, | ||
) { | ||
go func() { | ||
ticker := time.NewTicker(statsInterval) | ||
isClosed := false | ||
|
||
for { | ||
select { | ||
// Make sure we don't try reading from the closed db. | ||
// We continue the loop so that we can make sure the done channel | ||
// does not stall indefinitely from repeated writes and no reader. | ||
case <-done: | ||
logger.Debug("stopping compaction stats output") | ||
isClosed = true | ||
case <-ticker.C: | ||
if !isClosed { | ||
compactionStats := db.GetProperty("rocksdb.stats") | ||
fmt.Printf("%s\n", compactionStats) | ||
} | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// registerSignalHandler registers a signal handler that will cancel any running | ||
// compaction when the user presses Ctrl+C. | ||
func registerSignalHandler( | ||
db *grocksdb.DB, | ||
logger log.Logger, | ||
done chan bool, | ||
) { | ||
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ | ||
// Q: Can I close the DB when a manual compaction is in progress? | ||
// | ||
// A: No, it's not safe to do that. However, you call | ||
// CancelAllBackgroundWork(db, true) in another thread to abort the | ||
// running compactions, so that you can close the DB sooner. Since | ||
// 6.5, you can also speed it up using | ||
// DB::DisableManualCompaction(). | ||
c := make(chan os.Signal, 1) | ||
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||
|
||
go func() { | ||
for sig := range c { | ||
logger.Info(fmt.Sprintf( | ||
"received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.", | ||
sig, | ||
)) | ||
db.DisableManualCompaction() | ||
logger.Info("manual compaction disabled") | ||
|
||
// Stop the logging | ||
done <- true | ||
} | ||
}() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
//go:build rocksdb | ||
// +build rocksdb | ||
|
||
package rocksdb | ||
|
||
import ( | ||
"github.com/spf13/cobra" | ||
) | ||
|
||
// RocksDBCmd defines the root command containing subcommands that assist in | ||
// rocksdb related tasks such as manual compaction. | ||
var RocksDBCmd = &cobra.Command{ | ||
Use: "rocksdb", | ||
Short: "RocksDB util commands", | ||
} | ||
|
||
func init() { | ||
RocksDBCmd.AddCommand(CompactRocksDBCmd()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
//go:build !rocksdb | ||
// +build !rocksdb | ||
|
||
package rocksdb | ||
|
||
import ( | ||
"github.com/spf13/cobra" | ||
) | ||
|
||
// RocksDBCmd defines the root command when the rocksdb build tag is not set. | ||
var RocksDBCmd = &cobra.Command{ | ||
Use: "rocksdb", | ||
Short: "RocksDB util commands, disabled because rocksdb build tag not set", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters