Skip to content

Commit

Permalink
Self review.
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed May 27, 2021
1 parent 2325f98 commit 57f9e73
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 75 deletions.
5 changes: 3 additions & 2 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@ func setDotGlobalConfigFromToml(tomlCfg *ctoml.Config, cfg *dot.GlobalConfig) {
}

cfg.MetricsPort = tomlCfg.Global.MetricsPort

cfg.RetainBlocks = tomlCfg.Global.RetainBlocks
cfg.GCMode = tomlCfg.Global.GCMode
}
}

Expand Down Expand Up @@ -473,9 +475,8 @@ func setDotGlobalConfigFromFlags(ctx *cli.Context, cfg *dot.GlobalConfig) {
cfg.MetricsPort = uint32(metricsPort)
}

// check --retain-blocks flag
cfg.RetainBlocks = ctx.GlobalInt64(RetainBlockNumberFlag.Name)

cfg.GCMode = ctx.GlobalString(GCModeFlag.Name)
cfg.NoTelemetry = ctx.Bool("no-telemetry")
}

Expand Down
1 change: 1 addition & 0 deletions cmd/gossamer/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func dotConfigToToml(dcfg *dot.Config) *ctoml.Config {
LogLvl: dcfg.Global.LogLvl.String(),
MetricsPort: dcfg.Global.MetricsPort,
RetainBlocks: dcfg.Global.RetainBlocks,
GCMode: dcfg.Global.GCMode,
}

cfg.Log = ctoml.LogConfig{
Expand Down
8 changes: 8 additions & 0 deletions cmd/gossamer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ var (
Usage: "Retain number of block from latest block while pruning",
Value: 256,
}

// GCModeFlag sets the blockchain GC mode. It's either full or archive.
GCModeFlag = cli.StringFlag{
Name: "gcmode",
Usage: `Blockchain garbage collection mode ("full", "archive")`,
Value: "full",
}
)

// flag sets that are shared by multiple commands
Expand All @@ -302,6 +309,7 @@ var (
DBPathFlag,
BloomFilterSizeFlag,
RetainBlockNumberFlag,
GCModeFlag,
}

// StartupFlags are flags that are valid for use with the root command and the export subcommand
Expand Down
6 changes: 5 additions & 1 deletion cmd/gossamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ func gossamerAction(ctx *cli.Context) error {
}

if cfg.Global.RetainBlocks < 256 {
return fmt.Errorf("retain blocks cannot be less than 256")
return fmt.Errorf("--%s cannot be less than 256", RetainBlockNumberFlag.Name)
}

if cfg.Global.GCMode != "full" && cfg.Global.GCMode != "archive" {
return fmt.Errorf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}

cfg.Global.LogLvl = lvl
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type GlobalConfig struct {
MetricsPort uint32
NoTelemetry bool
RetainBlocks int64
GCMode string
}

// LogConfig represents the log levels for individual packages
Expand Down
1 change: 1 addition & 0 deletions dot/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type GlobalConfig struct {
LogLvl string `toml:"log,omitempty"`
MetricsPort uint32 `toml:"metrics-port,omitempty"`
RetainBlocks int64 `toml:"retain-blocks,omitempty"`
GCMode string `toml:"gc-mode,omitempty"`
}

// LogConfig represents the log levels for individual packages
Expand Down
11 changes: 8 additions & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,14 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
// Network Service
var networkSrvc *network.Service

pruner, err := state.CreatePruner(stateSrvc.DB(), cfg.Global.RetainBlocks)
if err != nil {
logger.Error("failed to create pruner:", err)
var pruner state.Pruner
if cfg.Global.GCMode == "full" {
pruner, err = state.CreatePruner(stateSrvc.DB(), cfg.Global.RetainBlocks)
if err != nil {
return nil, err
}
} else {
pruner = &state.ArchivalNodePruner{}
}

// check if network service is enabled
Expand Down
4 changes: 2 additions & 2 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func createRuntime(cfg *Config, st *state.Service, ks *keystore.GlobalKeystore,
return rt, nil
}

func createBABEService(cfg *Config, rt runtime.Instance, st *state.Service, ks keystore.Keystore, pruner *state.Pruner) (*babe.Service, error) {
func createBABEService(cfg *Config, rt runtime.Instance, st *state.Service, ks keystore.Keystore, pruner state.Pruner) (*babe.Service, error) {
logger.Info(
"creating BABE service...",
"authority", cfg.Core.BabeAuthority,
Expand Down Expand Up @@ -377,7 +377,7 @@ func createBlockVerifier(st *state.Service) (*babe.VerificationManager, error) {
return ver, nil
}

func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, fg sync.FinalityGadget, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance, pruner *state.Pruner) (*sync.Service, error) {
func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, fg sync.FinalityGadget, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance, pruner state.Pruner) (*sync.Service, error) {
syncCfg := &sync.Config{
LogLvl: cfg.Log.SyncLvl,
BlockState: st.Block,
Expand Down
5 changes: 3 additions & 2 deletions dot/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/utils"
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestCreateSyncService(t *testing.T) {
ver, err := createBlockVerifier(stateSrvc)
require.NoError(t, err)

_, err = createSyncService(cfg, stateSrvc, nil, nil, nil, ver, rt, nil)
_, err = createSyncService(cfg, stateSrvc, nil, nil, nil, ver, rt, &state.ArchivalNodePruner{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -233,7 +234,7 @@ func TestCreateBABEService(t *testing.T) {
rt, err := createRuntime(cfg, stateSrvc, ks, &network.Service{})
require.NoError(t, err)

bs, err := createBABEService(cfg, rt, stateSrvc, ks.Babe, nil)
bs, err := createBABEService(cfg, rt, stateSrvc, ks.Babe, &state.ArchivalNodePruner{})
require.NoError(t, err)
require.NotNil(t, bs)
}
Expand Down
91 changes: 53 additions & 38 deletions dot/state/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ import (

const (
journalPrefix = "journal"
lastPruned = "last_pruned"
lastPrunedKey = "last_pruned"
)

type journalRecord struct {
blockHash *common.Hash
// Hash of keys that are inserted into state trie of the block
insertedKeys []*common.Hash
// Hash of keys that are deleted from state trie of the block
deletedKeys []*common.Hash
// Pruner is implemented by FullNodePruner and ArchivalNodePruner.
type Pruner interface {
StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error
}

type deathRow struct {
blockHash *common.Hash
deletedKeys map[*common.Hash]int64 // keys hash that will be deleted from DB
// ArchivalNodePruner is a no-op since we don't prune nodes in archive mode.
type ArchivalNodePruner struct{}

// StoreJournalRecord for archive node doesn't do anything.
func (a *ArchivalNodePruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error {
return nil
}

// Pruner stores state trie diff and allows online state trie pruning
type Pruner struct {
// FullNodePruner stores state trie diff and allows online state trie pruning
type FullNodePruner struct {
deathList []*deathRow
storageDB chaindb.Database
journalDB chaindb.Database
Expand All @@ -40,6 +40,15 @@ type Pruner struct {
sync.RWMutex
}

type journalRecord struct {
// blockHash of the block corresponding to journal record
blockHash *common.Hash
// Hash of keys that are inserted into state trie of the block
insertedKeys []*common.Hash
// Hash of keys that are deleted from state trie of the block
deletedKeys []*common.Hash
}

func newJournalRecord(hash *common.Hash, insertedKeys, deletedKeys []*common.Hash) *journalRecord {
return &journalRecord{
blockHash: hash,
Expand All @@ -48,9 +57,14 @@ func newJournalRecord(hash *common.Hash, insertedKeys, deletedKeys []*common.Has
}
}

type deathRow struct {
blockHash *common.Hash
deletedKeys map[*common.Hash]int64 // keys hash that will be deleted from DB
}

// CreatePruner creates a pruner
func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) {
p := &Pruner{
func CreatePruner(db chaindb.Database, retainBlocks int64) (Pruner, error) {
p := &FullNodePruner{
deathList: make([]*deathRow, 0),
deathIndex: make(map[*common.Hash]int64),
storageDB: chaindb.NewTable(db, storagePrefix),
Expand All @@ -60,12 +74,7 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) {

blockNum, err := p.getLastPrunedIndex()
if err != nil {
if err == chaindb.ErrKeyNotFound {
blockNum = 0
} else {
logger.Error("pruner", "failed to get last pruned key", err)
return nil, err
}
return nil, err
}

logger.Info("pruner", "last pruned block", blockNum)
Expand All @@ -76,10 +85,11 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) {
// load deathList and deathIndex from journalRecord
for {
record, err := p.getJournalRecord(blockNum)
if err == chaindb.ErrKeyNotFound {
break
}

if err != nil {
if err == chaindb.ErrKeyNotFound {
break
}
return nil, err
}

Expand All @@ -97,11 +107,13 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) {
blockNum++
}

go p.start()

return p, nil
}

// StoreJournalRecord stores journal record into DB and add deathRow into deathList
func (p *Pruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error {
func (p *FullNodePruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error {
jr := newJournalRecord(blockHash, inserted, deleted)
encRecord, err := scale.Encode(jr)
if err != nil {
Expand All @@ -122,15 +134,15 @@ func (p *Pruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash
return nil
}

func (p *Pruner) addDeathRow(jr *journalRecord, blockNum int64) error {
func (p *FullNodePruner) addDeathRow(jr *journalRecord, blockNum int64) error {
p.Lock()
defer p.Unlock()

// remove re-inserted keys
for _, k := range jr.insertedKeys {
if num, ok := p.deathIndex[k]; ok {
delete(p.deathIndex, k)
delete(p.deathList[num-p.pendingNumber].deletedKeys, k)
delete(p.deathIndex, k)
}
}

Expand All @@ -154,8 +166,7 @@ func (p *Pruner) addDeathRow(jr *journalRecord, blockNum int64) error {
return nil
}

// PruneOne starts online pruning process
func (p *Pruner) PruneOne() {
func (p *FullNodePruner) start() {
logger.Info("pruning started")

for {
Expand All @@ -168,14 +179,14 @@ func (p *Pruner) PruneOne() {
p.Lock()

// pop first element from death list
dr := p.deathList[0]
err := p.deleteKeys(dr.deletedKeys)
row := p.deathList[0]
err := p.deleteKeys(row.deletedKeys)
if err != nil {
logger.Error("pruner", "failed to delete keys for block", p.pendingNumber)
continue
}

for k := range dr.deletedKeys {
for k := range row.deletedKeys {
delete(p.deathIndex, k)
}

Expand All @@ -190,7 +201,7 @@ func (p *Pruner) PruneOne() {
}
}

func (p *Pruner) storeJournal(num int64, record []byte) error {
func (p *FullNodePruner) storeJournal(num int64, record []byte) error {
encNum, err := scale.Encode(num)
if err != nil {
return err
Expand All @@ -204,7 +215,7 @@ func (p *Pruner) storeJournal(num int64, record []byte) error {
return nil
}

func (p *Pruner) getJournalRecord(num int64) ([]byte, error) {
func (p *FullNodePruner) getJournalRecord(num int64) ([]byte, error) {
encNum, err := scale.Encode(num)
if err != nil {
return nil, err
Expand All @@ -218,22 +229,26 @@ func (p *Pruner) getJournalRecord(num int64) ([]byte, error) {
return val, nil
}

func (p *Pruner) storeLastPrunedIndex(blockNum int64) error {
func (p *FullNodePruner) storeLastPrunedIndex(blockNum int64) error {
encNum, err := scale.Encode(blockNum)
if err != nil {
return err
}

err = p.journalDB.Put([]byte(lastPruned), encNum)
err = p.journalDB.Put([]byte(lastPrunedKey), encNum)
if err != nil {
return err
}

return nil
}

func (p *Pruner) getLastPrunedIndex() (int64, error) {
val, err := p.journalDB.Get([]byte(lastPruned))
func (p *FullNodePruner) getLastPrunedIndex() (int64, error) {
val, err := p.journalDB.Get([]byte(lastPrunedKey))
if err != chaindb.ErrKeyNotFound {
return 0, nil
}

if err != nil {
return 0, err
}
Expand All @@ -246,7 +261,7 @@ func (p *Pruner) getLastPrunedIndex() (int64, error) {
return blockNum.(int64), err
}

func (p *Pruner) deleteKeys(nodesHash map[*common.Hash]int64) error {
func (p *FullNodePruner) deleteKeys(nodesHash map[*common.Hash]int64) error {
for k := range nodesHash {
err := p.storageDB.Del(k.ToBytes())
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type Service struct {
// Below are for testing only.
BabeThresholdNumerator uint64
BabeThresholdDenominator uint64

pruner *Pruner
}

// NewService create a new instance of Service
Expand All @@ -69,7 +67,6 @@ func NewService(path string, lvl log.Lvl) *Service {
Storage: nil,
Block: nil,
closeCh: make(chan interface{}),
pruner: nil,
}
}

Expand Down Expand Up @@ -180,7 +177,6 @@ func (s *Service) Start() error {
logger.Info("created state service", "head", s.Block.BestBlockHash(), "highest number", num)
// Start background goroutine to GC pruned keys.
go s.Storage.pruneStorage(s.closeCh)

return nil
}

Expand Down
Loading

0 comments on commit 57f9e73

Please sign in to comment.