Skip to content

Commit

Permalink
Merge pull request #62 from blukat29/supply-api-reacc
Browse files Browse the repository at this point in the history
reward: SupplyManager reaccumulates from nearest persisted AccReward
  • Loading branch information
blukat29 authored Aug 9, 2024
2 parents 1afbbfb + c140137 commit a8b7a99
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
2 changes: 2 additions & 0 deletions node/cn/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ func New(ctx *node.ServiceContext, config *Config) (*CN, error) {
// NewStakingManager is called with proper non-nil parameters
reward.NewStakingManager(cn.blockchain, governance, cn.chainDB)
}
// Note: archive nodes might have TrieBlockInterval == 128, then SupplyManager will store checkpoints every 128 blocks.
// Still it is not a problem since SupplyManager can re-accumulate from the nearest checkpoint.
cn.supplyManager = reward.NewSupplyManager(cn.blockchain, cn.governance, cn.chainDB, config.TrieBlockInterval)

// Governance states which are not yet applied to the db remains at in-memory storage
Expand Down
51 changes: 44 additions & 7 deletions reward/supply_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
var (
supplyCacheSize = 86400 // A day; Some total supply consumers might want daily supply.
supplyLogInterval = uint64(102400) // Periodic total supply log.
supplyReaccLimit = uint64(1024) // Re-accumulate from the last accumulated block.
zeroBurnAddress = common.HexToAddress("0x0")
deadBurnAddress = common.HexToAddress("0xdead")

Expand Down Expand Up @@ -143,9 +144,9 @@ func (sm *supplyManager) GetAccReward(num uint64) (*database.AccReward, error) {
return nil, errNoAccReward
}

accReward := sm.db.ReadAccReward(num)
if accReward == nil {
return nil, errNoAccReward
accReward, err := sm.getAccRewardUncached(num)
if err != nil {
return nil, err
}

sm.accRewardCache.Add(num, accReward.Copy())
Expand Down Expand Up @@ -318,7 +319,7 @@ func (sm *supplyManager) catchup() {
for lastNum < headNum {
logger.Info("Total supply big step catchup", "last", lastNum, "head", headNum, "minted", lastAccReward.Minted.String(), "burntFee", lastAccReward.BurntFee.String())

accReward, err := sm.accumulateReward(lastNum, headNum, lastAccReward)
accReward, err := sm.accumulateReward(lastNum, headNum, lastAccReward, true)
if err != nil {
if err != errSupplyManagerQuit {
logger.Error("Total supply accumulate failed", "from", lastNum, "to", headNum, "err", err)
Expand All @@ -341,7 +342,7 @@ func (sm *supplyManager) catchup() {
case head := <-sm.chainHeadChan:
headNum = head.Block.NumberU64()

supply, err := sm.accumulateReward(lastNum, headNum, lastAccReward)
supply, err := sm.accumulateReward(lastNum, headNum, lastAccReward, true)
if err != nil {
if err != errSupplyManagerQuit {
logger.Error("Total supply accumulate failed", "from", lastNum, "to", headNum, "err", err)
Expand Down Expand Up @@ -379,9 +380,45 @@ func (sm *supplyManager) totalSupplyFromState(num uint64) (*big.Int, error) {
return totalSupply, nil
}

func (sm *supplyManager) getAccRewardUncached(num uint64) (*database.AccReward, error) {
accReward := sm.db.ReadAccReward(num)
if accReward != nil {
return accReward, nil
}

// Trace back to the last stored accumulated reward.
var fromNum uint64
var fromAcc *database.AccReward

// Fast path using checkpointInterval
if accReward := sm.db.ReadAccReward(num - num%sm.checkpointInterval); accReward != nil {
fromNum = num - num%sm.checkpointInterval
fromAcc = accReward
} else {
// Slow path in case the checkpoint has changed or checkpoint is missing.
for i := uint64(1); i < supplyReaccLimit; i++ {
accReward = sm.db.ReadAccReward(num - i)
if accReward != nil {
fromNum = num - i
fromAcc = accReward
break
}
}
}
if fromAcc == nil {
return nil, errNoAccReward
}

logger.Trace("on-demand reaccumulating rewards", "from", fromNum, "to", num)
return sm.accumulateReward(fromNum, num, fromAcc, false)
}

// accumulateReward calculates the total supply from the last block to the current block.
// Given supply at `from` is `fromSupply`, calculate the supply until `to`, inclusive.
func (sm *supplyManager) accumulateReward(from, to uint64, fromAcc *database.AccReward) (*database.AccReward, error) {
// If `write` is true, the result will be written to the database.
// If `write` is false, the result will not be written to the database,
// to prevent overwriting LastAccRewardBlockNumber (essentially rollback) and to keep the disk size small (only store at checkpointInterval).
func (sm *supplyManager) accumulateReward(from, to uint64, fromAcc *database.AccReward, write bool) (*database.AccReward, error) {
accReward := fromAcc.Copy() // make a copy because we're updating it in-place.

for num := from + 1; num <= to; num++ {
Expand Down Expand Up @@ -410,7 +447,7 @@ func (sm *supplyManager) accumulateReward(from, to uint64, fromAcc *database.Acc

// Store to database, print progress log.
sm.accRewardCache.Add(num, accReward.Copy())
if (num % sm.checkpointInterval) == 0 {
if write && (num%sm.checkpointInterval) == 0 {
sm.db.WriteAccReward(num, accReward)
sm.db.WriteLastAccRewardBlockNumber(num)
}
Expand Down
45 changes: 42 additions & 3 deletions reward/supply_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,44 @@ func (s *SupplyTestSuite) TestTotalSupplyPartialInfo() {
assert.Nil(t, ts)
}

// Test that when db.AccReward are missing, GetTotalSupply will re-accumulate from the nearest stored AccReward.
func (s *SupplyTestSuite) TestTotalSupplyReaccumulate() {
t := s.T()
s.setupHistory()
s.sm.Start()
defer s.sm.Stop()
s.waitAccReward()

// Delete AccRewards not on the default block interval (128).
// This happens on full nodes, and archive nodes with default BlockInterval config.
// Note that archive nodes ars allowed to have BlockInterval > 1, still tries are committed every block.
for num := uint64(0); num <= 400; num++ {
if num%128 != 0 {
// Because it's for testing, we do not add db.DeleteAccReward method.
s.db.GetMiscDB().Delete(append([]byte("accReward"), common.Int64ToByteBigEndian(num)...))
}
}

// Still, all block data must be available.
testcases := s.testcases()
for _, tc := range testcases {
s.sm.accRewardCache.Purge()
ts, err := s.sm.GetTotalSupply(tc.number)
require.NoError(t, err)

expected := tc.expectTotalSupply
actual := ts
bigEqual(t, expected.TotalSupply, actual.TotalSupply, tc.number)
bigEqual(t, expected.TotalMinted, actual.TotalMinted, tc.number)
bigEqual(t, expected.TotalBurnt, actual.TotalBurnt, tc.number)
bigEqual(t, expected.BurntFee, actual.BurntFee, tc.number)
bigEqual(t, expected.ZeroBurn, actual.ZeroBurn, tc.number)
bigEqual(t, expected.DeadBurn, actual.DeadBurn, tc.number)
bigEqual(t, expected.Kip103Burn, actual.Kip103Burn, tc.number)
bigEqual(t, expected.Kip160Burn, actual.Kip160Burn, tc.number)
}
}

func (s *SupplyTestSuite) waitAccReward() {
for i := 0; i < 1000; i++ { // wait 10 seconds until catchup complete
if s.db.ReadLastAccRewardBlockNumber() >= 400 {
Expand Down Expand Up @@ -421,8 +459,9 @@ func (s *SupplyTestSuite) SetupTest() {
t := s.T()

s.db = database.NewMemoryDBManager()
chainConfig := s.config.Copy() // to avoid some tests (i.e. PartialInfo) breaking other tests
genesis := &blockchain.Genesis{
Config: s.config,
Config: chainConfig,
Timestamp: uint64(time.Now().Unix()),
BlockScore: common.Big1,
Alloc: blockchain.GenesisAlloc{
Expand All @@ -443,11 +482,11 @@ func (s *SupplyTestSuite) SetupTest() {
TriesInMemory: 128,
TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(),
}
chain, err := blockchain.NewBlockChain(s.db, cacheConfig, s.config, s.engine, vm.Config{})
chain, err := blockchain.NewBlockChain(s.db, cacheConfig, chainConfig, s.engine, vm.Config{})
require.NoError(t, err)
s.chain = chain

s.sm = NewSupplyManager(s.chain, s.gov, s.db, 1)
s.sm = NewSupplyManager(s.chain, s.gov, s.db, 1) // 1 interval for testing
}

func (s *SupplyTestSuite) setupHistory() {
Expand Down

0 comments on commit a8b7a99

Please sign in to comment.