Skip to content

Commit

Permalink
fix(cmd): data prune-compact repeatable execution
Browse files Browse the repository at this point in the history
  • Loading branch information
fx0x55 authored and zakir-code committed Aug 12, 2024
1 parent b5cc6d5 commit 7460f60
Showing 1 changed file with 53 additions and 56 deletions.
109 changes: 53 additions & 56 deletions server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package server

import (
"fmt"
"log"
"sync"
"time"

Expand All @@ -12,7 +11,6 @@ import (
"github.com/cometbft/cometbft/store"
"github.com/cosmos/cosmos-sdk/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -86,7 +84,7 @@ func dataPruningCmd() *cobra.Command {
pruneBlockCmd(),
)

cmd.PersistentFlags().Int64P(flagHeight, "r", 0, "Removes block or state up to (but not including) a height")
cmd.PersistentFlags().Int64P(flagHeight, "r", 100800, "Removes block or state up to (but not including) a height")
cmd.PersistentFlags().BoolP(flagPruning, "p", false, "Enable pruning")
return cmd
}
Expand All @@ -111,24 +109,18 @@ func pruneAllCmd() *cobra.Command {
return err
}

if viper.GetBool(flagPruning) {
baseHeight, retainHeight := getPruneBlockParams(blockStoreDB)
var wg sync.WaitGroup
log.Println("--------- pruning start... ---------")
wg.Add(2)
go pruneBlocks(blockStoreDB, baseHeight, retainHeight, &wg)
go pruneStates(stateDB, baseHeight, retainHeight, &wg)
wg.Wait()
log.Println("--------- pruning end!!! ---------")
if err = pruneBlockData(cmd, blockStoreDB, stateDB); err != nil {
return err
}
log.Println("--------- compact start... ---------")

fmt.Println("--------- compact start... ---------")
var wg sync.WaitGroup
wg.Add(3)
go compactDB(blockStoreDB, BlockDBName, &wg)
go compactDB(stateDB, StateDBName, &wg)
go compactDB(appDB, AppDBName, &wg)
wg.Wait()
log.Println("--------- compact end!!! ---------")
fmt.Println("--------- compact end!!! ---------")

return nil
},
Expand All @@ -148,11 +140,11 @@ func pruneAppCmd() *cobra.Command {
if err != nil {
return err
}
log.Println("--------- compact start ---------")
fmt.Println("--------- compact start ---------")
var wg sync.WaitGroup
wg.Add(1)
compactDB(appDB, AppDBName, &wg)
log.Println("--------- compact end ---------")
fmt.Println("--------- compact end ---------")

return nil
},
Expand All @@ -177,25 +169,17 @@ func pruneBlockCmd() *cobra.Command {
return err
}

if viper.GetBool(flagPruning) {
baseHeight, retainHeight := getPruneBlockParams(blockStoreDB)
//
log.Println("--------- pruning start... ---------")
var wg sync.WaitGroup
wg.Add(2)
go pruneBlocks(blockStoreDB, baseHeight, retainHeight, &wg)
go pruneStates(stateDB, baseHeight, retainHeight, &wg)
wg.Wait()
log.Println("--------- pruning end!!! ---------")
if err = pruneBlockData(cmd, blockStoreDB, stateDB); err != nil {
return err
}

log.Println("--------- compact start... ---------")
fmt.Println("--------- compact start... ---------")
var wg sync.WaitGroup
wg.Add(2)
go compactDB(blockStoreDB, BlockDBName, &wg)
go compactDB(stateDB, StateDBName, &wg)
wg.Wait()
log.Println("--------- compact end!!! ---------")
fmt.Println("--------- compact end!!! ---------")

return nil
},
Expand All @@ -204,46 +188,61 @@ func pruneBlockCmd() *cobra.Command {
return cmd
}

func getPruneBlockParams(blockStoreDB cmtdbm.DB) (baseHeight, retainHeight int64) {
baseHeight, size := getBlockInfo(blockStoreDB)
func pruneBlockData(cmd *cobra.Command, blockStoreDB cmtdbm.DB, stateDB cmtdbm.DB) error {
enablePruning, err := cmd.Flags().GetBool(flagPruning)
if err != nil {
return err
}
if !enablePruning {
return nil
}

retainHeight = viper.GetInt64(flagHeight)
if retainHeight >= baseHeight+size-1 || retainHeight <= baseHeight {
retainHeight = baseHeight + size - 2
fullHeight, err := cmd.Flags().GetInt64(flagHeight)
if err != nil {
return err
}
if fullHeight == 0 {
return nil
}

baseHeight, currentHeight := getBlockInfo(blockStoreDB)

return
toHeight := currentHeight - fullHeight
if toHeight <= baseHeight {
fmt.Printf("base height greater than equal to full height, skip pruning!baseHeight:%d,currentHeight:%d,toHeight:%d\n", baseHeight, currentHeight, toHeight)
return nil
}
fmt.Printf("--------- pruning start... from:%d,to:%d,sieze:%d---------\n", baseHeight, toHeight, fullHeight)
var wg sync.WaitGroup
wg.Add(2)
start := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
go pruneBlocks(blockStoreDB, baseHeight, toHeight, &wg)
go pruneStates(stateDB, baseHeight, toHeight, &wg)
wg.Wait()
fmt.Printf("--------- pruning end!!! cast:%v---------\n", time.Since(start))
return nil
}

// pruneBlocks deletes blocks between the given heights (including from, excluding to).
func pruneBlocks(blockStoreDB cmtdbm.DB, baseHeight, retainHeight int64, wg *sync.WaitGroup) {
func pruneBlocks(blockStoreDB cmtdbm.DB, baseHeight, toHeight int64, wg *sync.WaitGroup) {
defer wg.Done()

log.Printf("Prune blocks [%d,%d)...", baseHeight, retainHeight)
if retainHeight <= baseHeight {
return
}

baseHeightBefore, sizeBefore := getBlockInfo(blockStoreDB)
fmt.Printf("Prune blocks start from %d to %d, pruneSize: %d\n", baseHeight, toHeight, toHeight-baseHeight)
start := time.Now()
_, err := store.NewBlockStore(blockStoreDB).PruneBlocks(retainHeight)
_, err := store.NewBlockStore(blockStoreDB).PruneBlocks(toHeight)
if err != nil {
panic(fmt.Errorf("failed to prune block store: %w", err))
}

baseHeightAfter, sizeAfter := getBlockInfo(blockStoreDB)
log.Printf("Block db info [baseHeight,size]: [%d,%d] --> [%d,%d]\n", baseHeightBefore, sizeBefore, baseHeightAfter, sizeAfter)
log.Printf("Prune blocks done in %v \n", time.Since(start))
baseHeightAfter, currentHeightAfter := getBlockInfo(blockStoreDB)
fmt.Printf("Prune blocks done new base: %d, height:%d, cost:%v\n", baseHeightAfter, currentHeightAfter, time.Since(start))
}

// pruneStates deletes states between the given heights (including from, excluding to).
func pruneStates(stateDB cmtdbm.DB, from, to int64, wg *sync.WaitGroup) {
defer wg.Done()

log.Printf("Prune states [%d,%d)...", from, to)
if to <= from {
return
}
fmt.Printf("Prune states start from %d to %d, pruneSize: %d\n", from, to, to-from)

start := time.Now()
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
Expand All @@ -253,13 +252,13 @@ func pruneStates(stateDB cmtdbm.DB, from, to int64, wg *sync.WaitGroup) {
panic(fmt.Errorf("failed to prune state database: %w", err))
}

log.Printf("Prune states done in %v \n", time.Since(start))
fmt.Printf("Prune states done in %v \n", time.Since(start))
}

func compactDB(db cmtdbm.DB, name string, wg *sync.WaitGroup) {
defer wg.Done()

log.Printf("Compact %s... \n", name)
fmt.Printf("Compact %s... \n", name)
start := time.Now()

if ldb, ok := db.(*cmtdbm.GoLevelDB); ok {
Expand All @@ -268,12 +267,10 @@ func compactDB(db cmtdbm.DB, name string, wg *sync.WaitGroup) {
}
}

log.Printf("Compact %s done in %v \n", name, time.Since(start))
fmt.Printf("Compact %s done in %v \n", name, time.Since(start))
}

func getBlockInfo(blockStoreDB cmtdbm.DB) (baseHeight, size int64) {
func getBlockInfo(blockStoreDB cmtdbm.DB) (baseHeight, height int64) {
blockStore := store.NewBlockStore(blockStoreDB)
baseHeight = blockStore.Base()
size = blockStore.Size()
return
return blockStore.Base(), blockStore.Height()
}

0 comments on commit 7460f60

Please sign in to comment.