Skip to content

Commit

Permalink
#179 Change Bandwidth Price to Average for 24h Sliding Window. Refact…
Browse files Browse the repository at this point in the history
…oring
  • Loading branch information
arturalbov committed Jan 23, 2019
1 parent d715c4a commit 25e12c1
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 112 deletions.
103 changes: 12 additions & 91 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,7 @@ type CyberdApp struct {

latestBlockHeight int64

computeUnit rank.ComputeUnit

// TODO: move to RankState???
rankCalculationFinished bool
cidCount int64

rankCalcChan chan rank.Rank
rankErrChan chan error
}

// NewBasecoinApp returns a reference to a new CyberdApp given a
Expand All @@ -117,12 +110,11 @@ func NewCyberdApp(

// create your application type
var app = &CyberdApp{
cdc: cdc,
txDecoder: txDecoder,
BaseApp: baseapp.NewBaseApp(appName, logger, db, txDecoder, baseAppOptions...),
dbKeys: dbKeys,
mainKeeper: ms,
computeUnit: computeUnit,
cdc: cdc,
txDecoder: txDecoder,
BaseApp: baseapp.NewBaseApp(appName, logger, db, txDecoder, baseAppOptions...),
dbKeys: dbKeys,
mainKeeper: ms,
}

app.feeCollectionKeeper = NoopFeeCollectionKeeper{}
Expand Down Expand Up @@ -164,7 +156,10 @@ func NewCyberdApp(
app.linkIndexedKeeper = keeper.NewLinkIndexedKeeper(keeper.NewBaseLinkKeeper(ms, dbKeys.links))
app.cidNumKeeper = keeper.NewBaseCidNumberKeeper(ms, dbKeys.cidNum, dbKeys.cidNumReverse)
app.stakeIndex = cbdbank.NewIndexedKeeper(&app.bankKeeper, app.accountKeeper)
app.rankState = rank.NewRankState(allowSearch)
app.rankState = rank.NewRankState(
allowSearch, app.mainKeeper, app.stakeIndex,
app.linkIndexedKeeper, app.cidNumKeeper, computeUnit,
)

// register the staking hooks
// NOTE: stakingKeeper above are passed by reference,
Expand Down Expand Up @@ -225,18 +220,7 @@ func NewCyberdApp(
app.curBlockSpentBandwidth = 0

// RANK PARAMS
app.rankState.Load(ctx, app.mainKeeper)
app.rankCalculationFinished = true
app.rankCalcChan = make(chan rank.Rank, 1)
app.cidCount = int64(app.mainKeeper.GetCidsCount(ctx))
app.rankErrChan = make(chan error)

// if we have fallen and need to start new rank calculation
// todo: what if rank didn't changed in new calculation???
if app.latestBlockHeight != 0 && !app.rankState.NextRankReady() {
app.startRankCalculation(ctx)
app.rankCalculationFinished = false
}
app.rankState.Load(ctx, app.latestBlockHeight, app.Logger)

app.Seal()
return app
Expand Down Expand Up @@ -477,78 +461,15 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R
app.currentCreditPrice = newPrice
app.curBlockSpentBandwidth = 0

// START RANK CALCULATION

currentCidsCount := app.mainKeeper.GetCidsCount(ctx)
app.linkIndexedKeeper.EndBlocker()
if ctx.BlockHeight()%rank.CalculationPeriod == 0 || ctx.BlockHeight() == 1 {

if !app.rankCalculationFinished {
select {
case newRank := <-app.rankCalcChan:
app.handleNewRank(ctx, newRank)
case err := <-app.rankErrChan:
// DUMB ERROR HANDLING
app.BaseApp.Logger.Error("Error during rank calculation " + err.Error())
panic(err.Error())
}
}

app.rankState.ApplyNextRank()
// Recalculate index
// todo state copied
outLinksCopy := lnk.Links(app.linkIndexedKeeper.GetOutLinks()).Copy()
go app.rankState.BuildCidRankedLinksIndexInParallel(app.cidCount, outLinksCopy)

app.mainKeeper.StoreLastCalculatedRankHash(ctx, app.rankState.GetNetworkRankHash())

// start new calculation
app.rankCalculationFinished = false
app.cidCount = int64(currentCidsCount)
app.linkIndexedKeeper.FixLinks()
app.stakeIndex.FixUserStake()
app.startRankCalculation(ctx)

} else if !app.rankCalculationFinished {

select {
case newRank := <-app.rankCalcChan:
app.handleNewRank(ctx, newRank)
case err := <-app.rankErrChan:
// DUMB ERROR HANDLING
app.BaseApp.Logger.Error("Error during rank calculation " + err.Error())
panic(err.Error())
default:
}

}

//CHECK INDEX BUILDING FOR ERROR:
app.rankState.AddNewCids(currentCidsCount)
app.mainKeeper.StoreLatestMerkleTree(ctx, app.rankState.GetNetworkMerkleTreeAsBytes())
app.rankState.CheckBuildIndexError(app.BaseApp.Logger)

// END RANK CALCULATION
// RANK CALCULATION
app.rankState.EndBlocker(ctx, app.Logger)

return abci.ResponseEndBlock{
ValidatorUpdates: validatorUpdates,
Tags: tags,
}
}

func (app *CyberdApp) startRankCalculation(ctx sdk.Context) {
calcCtx := rank.NewCalcContext(
ctx, app.linkIndexedKeeper, app.cidNumKeeper, app.stakeIndex, app.rankState.SearchAllowed(),
)
go rank.CalculateRankInParallel(calcCtx, app.rankCalcChan, app.rankErrChan, app.computeUnit, app.BaseApp.Logger)
}

func (app *CyberdApp) handleNewRank(ctx sdk.Context, newRank rank.Rank) {
app.rankState.SetNextRank(newRank)
app.mainKeeper.StoreNextMerkleTree(ctx, app.rankState.GetNextMerkleTreeAsBytes())
app.rankCalculationFinished = true
}

// Implements ABCI
func (app *CyberdApp) Commit() (res abci.ResponseCommit) {

Expand Down
128 changes: 107 additions & 21 deletions x/rank/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cybercongress/cyberd/merkle"
"github.com/cybercongress/cyberd/store"
"github.com/cybercongress/cyberd/x/bank"
"github.com/cybercongress/cyberd/x/link/keeper"
. "github.com/cybercongress/cyberd/x/link/types"
"github.com/tendermint/tendermint/libs/log"
"sort"
Expand All @@ -27,22 +29,100 @@ type RankState struct {

lastCalculatedRankHash []byte

rankCalculationFinished bool
cidCount int64

rankCalcChan chan Rank
rankErrChan chan error
allowSearch bool
indexErrChan chan error
computeUnit ComputeUnit

// keepers
mainKeeper store.MainKeeper
stakeIndex *bank.IndexedKeeper
cidNumKeeper keeper.CidNumberKeeper
linkIndexedKeeper keeper.LinkIndexedKeeper
}

func NewRankState(allowSearch bool) *RankState {
func NewRankState(
allowSearch bool, mainKeeper store.MainKeeper, stakeIndex *bank.IndexedKeeper,
linkIndexedKeeper keeper.LinkIndexedKeeper, cidNumKeeper keeper.CidNumberKeeper,
unit ComputeUnit,
) *RankState {
return &RankState{
allowSearch: allowSearch, indexErrChan: make(chan error),
allowSearch: allowSearch, indexErrChan: make(chan error), rankCalcChan: make(chan Rank, 1),
rankErrChan: make(chan error), rankCalculationFinished: true, mainKeeper: mainKeeper,
stakeIndex: stakeIndex, linkIndexedKeeper: linkIndexedKeeper, cidNumKeeper: cidNumKeeper,
computeUnit: unit,
}
}

func (s *RankState) Load(ctx sdk.Context, mainKeeper store.MainKeeper) {
s.lastCalculatedRankHash = mainKeeper.GetLastCalculatedRankHash(ctx)
func (s *RankState) Load(ctx sdk.Context, latestBlockHeight int64, log log.Logger) {
s.lastCalculatedRankHash = s.mainKeeper.GetLastCalculatedRankHash(ctx)
s.networkCidRank = Rank{Values: nil, MerkleTree: merkle.NewTree(sha256.New(), false)}
s.nextCidRank = Rank{Values: nil, MerkleTree: merkle.NewTree(sha256.New(), false)}
s.networkCidRank.MerkleTree.ImportSubtreesRoots(mainKeeper.GetLatestMerkleTree(ctx))
s.nextCidRank.MerkleTree.ImportSubtreesRoots(mainKeeper.GetNextMerkleTree(ctx))
s.networkCidRank.MerkleTree.ImportSubtreesRoots(s.mainKeeper.GetLatestMerkleTree(ctx))
s.nextCidRank.MerkleTree.ImportSubtreesRoots(s.mainKeeper.GetNextMerkleTree(ctx))

s.cidCount = int64(s.mainKeeper.GetCidsCount(ctx))

// if we have fallen and need to start new rank calculation
// todo: what if rank didn't changed in new calculation???
if latestBlockHeight != 0 && !s.nextRankReady() {
s.startRankCalculation(ctx, log)
s.rankCalculationFinished = false
}
}

func (s *RankState) EndBlocker(ctx sdk.Context, log log.Logger) {
currentCidsCount := s.mainKeeper.GetCidsCount(ctx)
s.linkIndexedKeeper.EndBlocker()
if ctx.BlockHeight()%CalculationPeriod == 0 || ctx.BlockHeight() == 1 {

if !s.rankCalculationFinished {
select {
case newRank := <-s.rankCalcChan:
s.handleNewRank(ctx, newRank)
case err := <-s.rankErrChan:
// DUMB ERROR HANDLING
log.Error("Error during rank calculation " + err.Error())
panic(err.Error())
}
}

s.applyNextRank()
// Recalculate index
// todo state copied
outLinksCopy := Links(s.linkIndexedKeeper.GetOutLinks()).Copy()
go s.buildCidRankedLinksIndexInParallel(s.cidCount, outLinksCopy)

s.mainKeeper.StoreLastCalculatedRankHash(ctx, s.GetNetworkRankHash())

// start new calculation
s.rankCalculationFinished = false
s.cidCount = int64(currentCidsCount)
s.linkIndexedKeeper.FixLinks()
s.stakeIndex.FixUserStake()
s.startRankCalculation(ctx, log)

} else if !s.rankCalculationFinished {

select {
case newRank := <-s.rankCalcChan:
s.handleNewRank(ctx, newRank)
case err := <-s.rankErrChan:
// DUMB ERROR HANDLING
log.Error("Error during rank calculation " + err.Error())
panic(err.Error())
default:
}

}
//CHECK INDEX BUILDING FOR ERROR:
s.addNewCids(currentCidsCount)
s.mainKeeper.StoreLatestMerkleTree(ctx, s.getNetworkMerkleTreeAsBytes())
s.checkBuildIndexError(log)
}

func (s *RankState) GetCidRankedLinks(cidNumber CidNumber, page, perPage int) ([]RankedCidNumber, int, error) {
Expand Down Expand Up @@ -76,22 +156,33 @@ func (s *RankState) GetCidRankedLinks(cidNumber CidNumber, page, perPage int) ([
return resultSet, totalSize, nil
}

func (s *RankState) SetNextRank(newRank Rank) {
func (s *RankState) startRankCalculation(ctx sdk.Context, log log.Logger) {
calcCtx := NewCalcContext(ctx, s.linkIndexedKeeper, s.cidNumKeeper, s.stakeIndex, s.allowSearch)
go CalculateRankInParallel(calcCtx, s.rankCalcChan, s.rankErrChan, s.computeUnit, log)
}

func (s *RankState) handleNewRank(ctx sdk.Context, newRank Rank) {
s.setNextRank(newRank)
s.mainKeeper.StoreNextMerkleTree(ctx, s.getNextMerkleTreeAsBytes())
s.rankCalculationFinished = true
}

func (s *RankState) setNextRank(newRank Rank) {
s.nextCidRank = newRank
}

func (s *RankState) ApplyNextRank() {
func (s *RankState) applyNextRank() {
s.networkCidRank = s.nextCidRank
s.lastCalculatedRankHash = s.nextCidRank.MerkleTree.RootHash()
s.nextCidRank.Reset()
}

// add new cids to rank with 0 value
func (s *RankState) AddNewCids(cidCount uint64) {
func (s *RankState) addNewCids(cidCount uint64) {
s.networkCidRank.AddNewCids(cidCount)
}

func (s *RankState) BuildCidRankedLinksIndex(cidsCount int64, outLinks Links) {
func (s *RankState) buildCidRankedLinksIndex(cidsCount int64, outLinks Links) {
// If search on this node is not allowed then we don't need to build index
if !s.allowSearch || s.networkCidRank.Values == nil {
return
Expand All @@ -109,17 +200,17 @@ func (s *RankState) BuildCidRankedLinksIndex(cidsCount int64, outLinks Links) {
}

// Used for building index in parallel
func (s *RankState) BuildCidRankedLinksIndexInParallel(cidsCount int64, outLinks Links) {
func (s *RankState) buildCidRankedLinksIndexInParallel(cidsCount int64, outLinks Links) {
defer func() {
if r := recover(); r != nil {
s.indexErrChan <- r.(error)
}
}()

s.BuildCidRankedLinksIndex(cidsCount, outLinks)
s.buildCidRankedLinksIndex(cidsCount, outLinks)
}

func (s *RankState) CheckBuildIndexError(logger log.Logger) {
func (s *RankState) checkBuildIndexError(logger log.Logger) {
if s.allowSearch {
select {
case err := <-s.indexErrChan:
Expand All @@ -144,24 +235,19 @@ func (s *RankState) getLinksSortedByRank(cidOutLinks CidLinks) cidRankedLinks {
//
// GETTERS

//rank index
func (s *RankState) SearchAllowed() bool {
return s.allowSearch
}

func (s *RankState) GetNetworkRankHash() []byte {
return s.networkCidRank.MerkleTree.RootHash()
}

func (s *RankState) GetNetworkMerkleTreeAsBytes() []byte {
func (s *RankState) getNetworkMerkleTreeAsBytes() []byte {
return s.networkCidRank.MerkleTree.ExportSubtreesRoots()
}

func (s *RankState) GetNextMerkleTreeAsBytes() []byte {
func (s *RankState) getNextMerkleTreeAsBytes() []byte {
return s.nextCidRank.MerkleTree.ExportSubtreesRoots()
}

func (s *RankState) NextRankReady() bool {
func (s *RankState) nextRankReady() bool {
return s.lastCalculatedRankHash != nil && !bytes.Equal(s.nextCidRank.MerkleTree.RootHash(), s.lastCalculatedRankHash)
}

Expand Down

0 comments on commit 25e12c1

Please sign in to comment.