diff --git a/cmd/gossamer/config.go b/cmd/gossamer/config.go index d172802d57..15eb95ce5b 100644 --- a/cmd/gossamer/config.go +++ b/cmd/gossamer/config.go @@ -443,7 +443,9 @@ func setDotGlobalConfigFromToml(tomlCfg *ctoml.Config, cfg *dot.GlobalConfig) { } cfg.MetricsPort = tomlCfg.Global.MetricsPort + cfg.RetainBlocks = tomlCfg.Global.RetainBlocks + cfg.GCMode = tomlCfg.Global.GCMode } } @@ -473,9 +475,8 @@ func setDotGlobalConfigFromFlags(ctx *cli.Context, cfg *dot.GlobalConfig) { cfg.MetricsPort = uint32(metricsPort) } - // check --retain-blocks flag cfg.RetainBlocks = ctx.GlobalInt64(RetainBlockNumberFlag.Name) - + cfg.GCMode = ctx.GlobalString(GCModeFlag.Name) cfg.NoTelemetry = ctx.Bool("no-telemetry") } diff --git a/cmd/gossamer/export.go b/cmd/gossamer/export.go index 90e7b8ea6a..670fdbf0ab 100644 --- a/cmd/gossamer/export.go +++ b/cmd/gossamer/export.go @@ -85,6 +85,7 @@ func dotConfigToToml(dcfg *dot.Config) *ctoml.Config { LogLvl: dcfg.Global.LogLvl.String(), MetricsPort: dcfg.Global.MetricsPort, RetainBlocks: dcfg.Global.RetainBlocks, + GCMode: dcfg.Global.GCMode, } cfg.Log = ctoml.LogConfig{ diff --git a/cmd/gossamer/flags.go b/cmd/gossamer/flags.go index ba3559234b..6d59f41f22 100644 --- a/cmd/gossamer/flags.go +++ b/cmd/gossamer/flags.go @@ -285,6 +285,13 @@ var ( Usage: "Retain number of block from latest block while pruning", Value: 256, } + + // GCModeFlag sets the blockchain GC mode. It's either full or archive. + GCModeFlag = cli.StringFlag{ + Name: "gcmode", + Usage: `Blockchain garbage collection mode ("full", "archive")`, + Value: "full", + } ) // flag sets that are shared by multiple commands @@ -302,6 +309,7 @@ var ( DBPathFlag, BloomFilterSizeFlag, RetainBlockNumberFlag, + GCModeFlag, } // StartupFlags are flags that are valid for use with the root command and the export subcommand diff --git a/cmd/gossamer/main.go b/cmd/gossamer/main.go index 623bbb6c63..58d486a851 100644 --- a/cmd/gossamer/main.go +++ b/cmd/gossamer/main.go @@ -234,8 +234,12 @@ func gossamerAction(ctx *cli.Context) error { return err } - if cfg.Global.RetainBlocks < 256 { - return fmt.Errorf("retain blocks cannot be less than 256") + if cfg.Global.RetainBlocks < 10 { + return fmt.Errorf("--%s cannot be less than 256", RetainBlockNumberFlag.Name) + } + + if cfg.Global.GCMode != "full" && cfg.Global.GCMode != "archive" { + return fmt.Errorf("--%s must be either 'full' or 'archive'", GCModeFlag.Name) } cfg.Global.LogLvl = lvl diff --git a/dot/config.go b/dot/config.go index 07554245f5..884a63b5fe 100644 --- a/dot/config.go +++ b/dot/config.go @@ -53,6 +53,7 @@ type GlobalConfig struct { MetricsPort uint32 NoTelemetry bool RetainBlocks int64 + GCMode string } // LogConfig represents the log levels for individual packages diff --git a/dot/config/toml/config.go b/dot/config/toml/config.go index ec84c537a2..7ca982f45f 100644 --- a/dot/config/toml/config.go +++ b/dot/config/toml/config.go @@ -35,6 +35,7 @@ type GlobalConfig struct { LogLvl string `toml:"log,omitempty"` MetricsPort uint32 `toml:"metrics-port,omitempty"` RetainBlocks int64 `toml:"retain-blocks,omitempty"` + GCMode string `toml:"gc-mode,omitempty"` } // LogConfig represents the log levels for individual packages diff --git a/dot/node.go b/dot/node.go index 2ab8dfe0a9..fbe5848ae9 100644 --- a/dot/node.go +++ b/dot/node.go @@ -237,9 +237,14 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, // Network Service var networkSrvc *network.Service - pruner, err := state.CreatePruner(stateSrvc.DB(), cfg.Global.RetainBlocks) - if err != nil { - logger.Error("failed to create pruner:", err) + var pruner state.Pruner + if cfg.Global.GCMode == "full" { + pruner, err = state.CreatePruner(stateSrvc.DB(), cfg.Global.RetainBlocks) + if err != nil { + return nil, err + } + } else { + pruner = &state.ArchivalNodePruner{} } // check if network service is enabled diff --git a/dot/services.go b/dot/services.go index b4241f07c4..2b6eac58fe 100644 --- a/dot/services.go +++ b/dot/services.go @@ -165,7 +165,7 @@ func createRuntime(cfg *Config, st *state.Service, ks *keystore.GlobalKeystore, return rt, nil } -func createBABEService(cfg *Config, rt runtime.Instance, st *state.Service, ks keystore.Keystore, pruner *state.Pruner) (*babe.Service, error) { +func createBABEService(cfg *Config, rt runtime.Instance, st *state.Service, ks keystore.Keystore, pruner state.Pruner) (*babe.Service, error) { logger.Info( "creating BABE service...", "authority", cfg.Core.BabeAuthority, @@ -377,7 +377,7 @@ func createBlockVerifier(st *state.Service) (*babe.VerificationManager, error) { return ver, nil } -func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, fg sync.FinalityGadget, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance, pruner *state.Pruner) (*sync.Service, error) { +func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, fg sync.FinalityGadget, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance, pruner state.Pruner) (*sync.Service, error) { syncCfg := &sync.Config{ LogLvl: cfg.Log.SyncLvl, BlockState: st.Block, diff --git a/dot/services_test.go b/dot/services_test.go index a727c34f29..6dfbc9efd2 100644 --- a/dot/services_test.go +++ b/dot/services_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/keystore" "github.com/ChainSafe/gossamer/lib/utils" @@ -136,7 +137,7 @@ func TestCreateSyncService(t *testing.T) { ver, err := createBlockVerifier(stateSrvc) require.NoError(t, err) - _, err = createSyncService(cfg, stateSrvc, nil, nil, nil, ver, rt, nil) + _, err = createSyncService(cfg, stateSrvc, nil, nil, nil, ver, rt, &state.ArchivalNodePruner{}) require.NoError(t, err) } @@ -233,7 +234,7 @@ func TestCreateBABEService(t *testing.T) { rt, err := createRuntime(cfg, stateSrvc, ks, &network.Service{}) require.NoError(t, err) - bs, err := createBABEService(cfg, rt, stateSrvc, ks.Babe, nil) + bs, err := createBABEService(cfg, rt, stateSrvc, ks.Babe, &state.ArchivalNodePruner{}) require.NoError(t, err) require.NotNil(t, bs) } diff --git a/dot/state/pruner.go b/dot/state/pruner.go index 40ba42ff92..f3ceaa813f 100644 --- a/dot/state/pruner.go +++ b/dot/state/pruner.go @@ -13,24 +13,24 @@ import ( const ( journalPrefix = "journal" - lastPruned = "last_pruned" + lastPrunedKey = "last_pruned" ) -type journalRecord struct { - blockHash *common.Hash - // Hash of keys that are inserted into state trie of the block - insertedKeys []*common.Hash - // Hash of keys that are deleted from state trie of the block - deletedKeys []*common.Hash +// Pruner is implemented by FullNodePruner and ArchivalNodePruner. +type Pruner interface { + StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error } -type deathRow struct { - blockHash *common.Hash - deletedKeys map[*common.Hash]int64 // keys hash that will be deleted from DB +// ArchivalNodePruner is a no-op since we don't prune nodes in archive mode. +type ArchivalNodePruner struct{} + +// StoreJournalRecord for archive node doesn't do anything. +func (a *ArchivalNodePruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error { + return nil } -// Pruner stores state trie diff and allows online state trie pruning -type Pruner struct { +// FullNodePruner stores state trie diff and allows online state trie pruning +type FullNodePruner struct { deathList []*deathRow storageDB chaindb.Database journalDB chaindb.Database @@ -40,6 +40,15 @@ type Pruner struct { sync.RWMutex } +type journalRecord struct { + // blockHash of the block corresponding to journal record + blockHash *common.Hash + // Hash of keys that are inserted into state trie of the block + insertedKeys []*common.Hash + // Hash of keys that are deleted from state trie of the block + deletedKeys []*common.Hash +} + func newJournalRecord(hash *common.Hash, insertedKeys, deletedKeys []*common.Hash) *journalRecord { return &journalRecord{ blockHash: hash, @@ -48,9 +57,14 @@ func newJournalRecord(hash *common.Hash, insertedKeys, deletedKeys []*common.Has } } +type deathRow struct { + blockHash *common.Hash + deletedKeys map[*common.Hash]int64 // keys hash that will be deleted from DB +} + // CreatePruner creates a pruner -func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) { - p := &Pruner{ +func CreatePruner(db chaindb.Database, retainBlocks int64) (Pruner, error) { + p := &FullNodePruner{ deathList: make([]*deathRow, 0), deathIndex: make(map[*common.Hash]int64), storageDB: chaindb.NewTable(db, storagePrefix), @@ -60,12 +74,7 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) { blockNum, err := p.getLastPrunedIndex() if err != nil { - if err == chaindb.ErrKeyNotFound { - blockNum = 0 - } else { - logger.Error("pruner", "failed to get last pruned key", err) - return nil, err - } + return nil, err } logger.Info("pruner", "last pruned block", blockNum) @@ -76,20 +85,15 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) { // load deathList and deathIndex from journalRecord for { record, err := p.getJournalRecord(blockNum) - if err != nil { - if err == chaindb.ErrKeyNotFound { - break - } - return nil, err + if err == chaindb.ErrKeyNotFound { + break } - jr, err := scale.Decode(record, new(journalRecord)) if err != nil { return nil, err } - j := jr.(journalRecord) - err = p.addDeathRow(&j, blockNum) + err = p.addDeathRow(record, blockNum) if err != nil { return nil, err } @@ -97,23 +101,21 @@ func CreatePruner(db chaindb.Database, retainBlocks int64) (*Pruner, error) { blockNum++ } + go p.start() + return p, nil } // StoreJournalRecord stores journal record into DB and add deathRow into deathList -func (p *Pruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error { +func (p *FullNodePruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash *common.Hash, blockNum *big.Int) error { jr := newJournalRecord(blockHash, inserted, deleted) - encRecord, err := scale.Encode(jr) - if err != nil { - return fmt.Errorf("failed to encode journal record %d: %w", blockNum, err) - } - err = p.storeJournal(blockNum.Int64(), encRecord) + err := p.storeJournal(blockNum.Int64(), jr) if err != nil { return fmt.Errorf("failed to store journal record for %d: %w", blockNum, err) } - logger.Info("journal record stored") + logger.Info("journal record stored", "block", blockNum.Int64()) err = p.addDeathRow(jr, blockNum.Int64()) if err != nil { return err @@ -122,15 +124,15 @@ func (p *Pruner) StoreJournalRecord(deleted, inserted []*common.Hash, blockHash return nil } -func (p *Pruner) addDeathRow(jr *journalRecord, blockNum int64) error { +func (p *FullNodePruner) addDeathRow(jr *journalRecord, blockNum int64) error { p.Lock() defer p.Unlock() // remove re-inserted keys for _, k := range jr.insertedKeys { if num, ok := p.deathIndex[k]; ok { - delete(p.deathIndex, k) delete(p.deathList[num-p.pendingNumber].deletedKeys, k) + delete(p.deathIndex, k) } } @@ -154,28 +156,28 @@ func (p *Pruner) addDeathRow(jr *journalRecord, blockNum int64) error { return nil } -// PruneOne starts online pruning process -func (p *Pruner) PruneOne() { +func (p *FullNodePruner) start() { logger.Info("pruning started") for { + p.Lock() if int64(len(p.deathList)) <= p.retainBlocks { + p.Unlock() time.Sleep(2 * time.Second) continue } logger.Info("pruner", "pruning block ", p.pendingNumber) - p.Lock() // pop first element from death list - dr := p.deathList[0] - err := p.deleteKeys(dr.deletedKeys) + row := p.deathList[0] + err := p.deleteKeys(row.deletedKeys) if err != nil { logger.Error("pruner", "failed to delete keys for block", p.pendingNumber) continue } - for k := range dr.deletedKeys { + for k := range row.deletedKeys { delete(p.deathIndex, k) } @@ -190,13 +192,18 @@ func (p *Pruner) PruneOne() { } } -func (p *Pruner) storeJournal(num int64, record []byte) error { - encNum, err := scale.Encode(num) +func (p *FullNodePruner) storeJournal(blockNum int64, jr *journalRecord) error { + encRecord, err := scale.Encode(jr) + if err != nil { + return fmt.Errorf("failed to encode journal record %d: %w", blockNum, err) + } + + encNum, err := scale.Encode(blockNum) if err != nil { return err } - err = p.journalDB.Put(encNum, record) + err = p.journalDB.Put(encNum, encRecord) if err != nil { return err } @@ -204,7 +211,7 @@ func (p *Pruner) storeJournal(num int64, record []byte) error { return nil } -func (p *Pruner) getJournalRecord(num int64) ([]byte, error) { +func (p *FullNodePruner) getJournalRecord(num int64) (*journalRecord, error) { encNum, err := scale.Encode(num) if err != nil { return nil, err @@ -215,16 +222,21 @@ func (p *Pruner) getJournalRecord(num int64) ([]byte, error) { return nil, err } - return val, nil + decJR, err := scale.Decode(val, new(journalRecord)) + if err != nil { + return nil, err + } + + return decJR.(*journalRecord), nil } -func (p *Pruner) storeLastPrunedIndex(blockNum int64) error { +func (p *FullNodePruner) storeLastPrunedIndex(blockNum int64) error { encNum, err := scale.Encode(blockNum) if err != nil { return err } - err = p.journalDB.Put([]byte(lastPruned), encNum) + err = p.journalDB.Put([]byte(lastPrunedKey), encNum) if err != nil { return err } @@ -232,8 +244,12 @@ func (p *Pruner) storeLastPrunedIndex(blockNum int64) error { return nil } -func (p *Pruner) getLastPrunedIndex() (int64, error) { - val, err := p.journalDB.Get([]byte(lastPruned)) +func (p *FullNodePruner) getLastPrunedIndex() (int64, error) { + val, err := p.journalDB.Get([]byte(lastPrunedKey)) + if err == chaindb.ErrKeyNotFound { + return 0, nil + } + if err != nil { return 0, err } @@ -246,7 +262,7 @@ func (p *Pruner) getLastPrunedIndex() (int64, error) { return blockNum.(int64), err } -func (p *Pruner) deleteKeys(nodesHash map[*common.Hash]int64) error { +func (p *FullNodePruner) deleteKeys(nodesHash map[*common.Hash]int64) error { for k := range nodesHash { err := p.storageDB.Del(k.ToBytes()) if err != nil { diff --git a/dot/state/service.go b/dot/state/service.go index 1160fdb405..dc3788d9bd 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -51,8 +51,6 @@ type Service struct { // Below are for testing only. BabeThresholdNumerator uint64 BabeThresholdDenominator uint64 - - pruner *Pruner } // NewService create a new instance of Service @@ -69,7 +67,6 @@ func NewService(path string, lvl log.Lvl) *Service { Storage: nil, Block: nil, closeCh: make(chan interface{}), - pruner: nil, } } @@ -180,7 +177,6 @@ func (s *Service) Start() error { logger.Info("created state service", "head", s.Block.BestBlockHash(), "highest number", num) // Start background goroutine to GC pruned keys. go s.Storage.pruneStorage(s.closeCh) - return nil } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index e48299811d..8e69d2eaeb 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -57,7 +57,7 @@ type Service struct { // Consensus digest handling digestHandler DigestHandler - pruner *state.Pruner + pruner state.Pruner } // Config is the configuration for the sync Service. @@ -71,8 +71,7 @@ type Config struct { Runtime runtime.Instance Verifier Verifier DigestHandler DigestHandler - - Pruner *state.Pruner + Pruner state.Pruner } // NewService returns a new *sync.Service @@ -351,14 +350,14 @@ func (s *Service) handleBlock(block *types.Block) error { return fmt.Errorf("failed to execute block %d: %w", block.Header.Number, err) } - blockHash := block.Header.Hash() insKeys, err := ts.GetInsertedNodeHashes() if err != nil { - logger.Error("failed to get state trie inserted keys: block ", block.Header.Number, err) + return fmt.Errorf("failed to get state trie inserted keys: block %s %w", block.Header.Number, err) } delKeys := ts.GetDeletedNodeHashes() + blockHash := block.Header.Hash() err = s.pruner.StoreJournalRecord(delKeys, insKeys, &blockHash, block.Header.Number) if err != nil { return err @@ -390,8 +389,6 @@ func (s *Service) handleBlock(block *types.Block) error { s.handleDigests(block.Header) } - go s.pruner.PruneOne() - return s.handleRuntimeChanges(ts) } diff --git a/dot/sync/syncer_test.go b/dot/sync/syncer_test.go index 7400446cfa..7fae639cc8 100644 --- a/dot/sync/syncer_test.go +++ b/dot/sync/syncer_test.go @@ -65,7 +65,6 @@ func newTestSyncer(t *testing.T) *Service { cfg := &Config{} testDatadirPath, _ := ioutil.TempDir("/tmp", "test-datadir-*") - stateSrvc := state.NewService(testDatadirPath, log.LvlInfo) stateSrvc.UseMemDB() diff --git a/lib/babe/babe.go b/lib/babe/babe.go index 6675d9d0a7..e861ed2317 100644 --- a/lib/babe/babe.go +++ b/lib/babe/babe.go @@ -50,7 +50,7 @@ type Service struct { transactionState TransactionState epochState EpochState epochLength uint64 - pruner *state.Pruner + pruner state.Pruner // BABE authority keypair keypair *sr25519.Keypair // TODO: change to BABE keystore @@ -88,7 +88,7 @@ type ServiceConfig struct { SlotDuration uint64 // for development purposes; in milliseconds EpochLength uint64 // for development purposes; in slots Authority bool - Pruner *state.Pruner + Pruner state.Pruner } // NewService returns a new Babe Service using the provided VRF keys and runtime @@ -320,7 +320,7 @@ func (b *Service) safeSend(msg types.Block) error { defer b.lock.Unlock() if b.IsStopped() { - return errors.New("Service has been stopped") + return errors.New("service has been stopped") } b.blockChan <- msg @@ -414,8 +414,6 @@ func (b *Service) invokeBlockAuthoring(epoch uint64) { slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i)) } - go b.pruner.PruneOne() - for i := 0; i < int(b.epochLength-intoEpoch); i++ { select { case <-b.ctx.Done(): @@ -493,7 +491,6 @@ func (b *Service) handleSlot(slotNum uint64) error { return nil } - blockHash := block.Header.Hash() insKeys, err := ts.GetInsertedNodeHashes() if err != nil { logger.Error("failed to get inserted keys for ", block.Header.Number, err) @@ -501,6 +498,7 @@ func (b *Service) handleSlot(slotNum uint64) error { delKeys := ts.GetDeletedNodeHashes() + blockHash := block.Header.Hash() err = b.pruner.StoreJournalRecord(delKeys, insKeys, &blockHash, block.Header.Number) if err != nil { return err diff --git a/lib/trie/database.go b/lib/trie/database.go index c66edb9a18..53787a8084 100644 --- a/lib/trie/database.go +++ b/lib/trie/database.go @@ -289,6 +289,7 @@ func (t *Trie) writeDirty(db chaindb.Batch, curr node) error { } // GetInsertedNodeHashes returns the hash of nodes that are inserted into state trie since last snapshot is called +// Since inserted nodesare newly created we need to compute their hash values. func (t *Trie) GetInsertedNodeHashes(curr node) ([]*common.Hash, error) { var nodeHashes []*common.Hash if curr == nil || !curr.isDirty() { diff --git a/lib/trie/trie.go b/lib/trie/trie.go index 3fcbada62b..61bfeee002 100644 --- a/lib/trie/trie.go +++ b/lib/trie/trie.go @@ -75,8 +75,12 @@ func (t *Trie) maybeUpdateGeneration(n node) node { newNode := n.copy() newNode.setGeneration(t.generation) - hash := common.BytesToHash(n.getHash()) - t.deletedKeys = append(t.deletedKeys, &hash) + // Hash of old nodes should already be computed since it belongs to older generation. + oldNodeHash := n.getHash() + if len(oldNodeHash) > 0 { + hash := common.BytesToHash(oldNodeHash) + t.deletedKeys = append(t.deletedKeys, &hash) + } return newNode } @@ -248,9 +252,7 @@ func (t *Trie) tryPut(key, value []byte) { // TryPut attempts to insert a key with value into the trie func (t *Trie) insert(parent node, key []byte, value node) node { - parent = t.maybeUpdateGeneration(parent) - - switch p := parent.(type) { + switch p := t.maybeUpdateGeneration(parent).(type) { case *branch: n := t.updateBranch(p, key, value) @@ -516,8 +518,8 @@ func (t *Trie) ClearPrefix(prefix []byte) { t.root, _ = t.clearPrefix(t.root, p) } -func (t *Trie) clearPrefix(curr node, prefix []byte) (node, bool) { - curr = t.maybeUpdateGeneration(curr) +func (t *Trie) clearPrefix(cn node, prefix []byte) (node, bool) { + curr := t.maybeUpdateGeneration(cn) switch c := curr.(type) { case *branch: length := lenCommonPrefix(c.key, prefix) @@ -574,8 +576,7 @@ func (t *Trie) Delete(key []byte) { func (t *Trie) delete(parent node, key []byte) (node, bool) { // Store the current node and return it, if the trie is not updated. - parent = t.maybeUpdateGeneration(parent) - switch p := parent.(type) { + switch p := t.maybeUpdateGeneration(parent).(type) { case *branch: length := lenCommonPrefix(p.key, key) diff --git a/tests/utils/gossamer_utils.go b/tests/utils/gossamer_utils.go index b1ca3dcfd8..ebfe658e32 100644 --- a/tests/utils/gossamer_utils.go +++ b/tests/utils/gossamer_utils.go @@ -422,6 +422,7 @@ func generateDefaultConfig() *ctoml.Config { LogLvl: "crit", MetricsPort: 9876, RetainBlocks: 256, + GCMode: "archive", }, Log: ctoml.LogConfig{ CoreLvl: "info",