Skip to content

Commit

Permalink
#99 Rework bandwidth handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
hleb-albau committed Dec 17, 2018
1 parent 0cbae91 commit edee772
Showing 10 changed files with 177 additions and 116 deletions.
91 changes: 60 additions & 31 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -73,16 +73,15 @@ type CyberdApp struct {
txDecoder sdk.TxDecoder

// bandwidth
bandwidthHandler bw.BandwidthHandler
msgBandwidthCost bw.MsgBandwidthCost
bandwidthHandler bw.Handler
curBlockSpentBandwidth uint64 //resets every block
lastTotalSpentBandwidth uint64 //resets every bandwidth price adjustment interval
currentCreditPrice float64

// keys to access the multistore
dbKeys CyberdAppDbKeys

// manage getting and setting accounts
// manage getting and setting app data
mainStorage MainStorage
accountKeeper auth.AccountKeeper
feeCollectionKeeper auth.FeeCollectionKeeper
@@ -91,7 +90,7 @@ type CyberdApp struct {
slashingKeeper slashing.Keeper
distrKeeper distr.Keeper
paramsKeeper params.Keeper
accBandwidthKeeper bandwidth.AccountBandwidthKeeper
accBandwidthKeeper bw.Keeper

//inflation
minter mint.Minter
@@ -145,14 +144,13 @@ func NewCyberdApp(

// create your application type
var app = &CyberdApp{
cdc: cdc,
txDecoder: txDecoder,
BaseApp: baseapp.NewBaseApp(appName, logger, db, txDecoder, baseAppOptions...),
dbKeys: dbKeys,
persistStorages: storages,
mainStorage: ms,
computeUnit: computeUnit,
msgBandwidthCost: bandwidth.MsgBandwidthCost,
cdc: cdc,
txDecoder: txDecoder,
BaseApp: baseapp.NewBaseApp(appName, logger, db, txDecoder, baseAppOptions...),
dbKeys: dbKeys,
persistStorages: storages,
mainStorage: ms,
computeUnit: computeUnit,
}

// define and attach the mappers and keepers
@@ -162,7 +160,7 @@ func NewCyberdApp(

var stakeKeeper *stake.Keeper
app.bankKeeper = cbdbank.NewBankKeeper(app.accountKeeper, stakeKeeper, nil)
app.accBandwidthKeeper = bandwidth.NewAccountBandwidthKeeper(dbKeys.accBandwidth, app.bankKeeper)
app.accBandwidthKeeper = bandwidth.NewAccBandwidthKeeper(dbKeys.accBandwidth)
*stakeKeeper = stake.NewKeeper(
app.cdc, dbKeys.stake,
dbKeys.tStake, app.bankKeeper,
@@ -189,7 +187,9 @@ func NewCyberdApp(
// so that it can be modified like below:
app.stakeKeeper = *stakeKeeper.SetHooks(NewHooks(app.slashingKeeper.Hooks()))

app.bandwidthHandler = bandwidth.NewBandwidthHandler(app.accountKeeper, app.accBandwidthKeeper, app.msgBandwidthCost)
app.bandwidthHandler = bandwidth.NewHandler(
app.accountKeeper, app.bankKeeper, app.accBandwidthKeeper, bandwidth.MsgBandwidthCosts,
)

// register message routes
app.Router().
@@ -307,8 +307,7 @@ func (app *CyberdApp) initChainer(ctx sdk.Context, req abci.RequestInitChain) ab
}
}

bandwidth.InitGenesis(ctx, app.accBandwidthKeeper, genesisState.Accounts)

bandwidth.InitGenesis(ctx, app.bandwidthHandler, app.accBandwidthKeeper, genesisState.Accounts)
return abci.ResponseInitChain{
Validators: validators,
}
@@ -327,25 +326,27 @@ func (app *CyberdApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock)

func (app *CyberdApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {

cachedCtx, writeCache := app.NewContext(true, abci.Header{Height: app.latestBlockHeight}).CacheContext()
ctx := app.NewContext(true, abci.Header{Height: app.latestBlockHeight})
tx, acc, err := app.decodeTxAndAcc(ctx, txBytes)

var tx, err = app.txDecoder(txBytes)
if err == nil {

_, err = app.bandwidthHandler(cachedCtx, app.currentCreditPrice, tx)
if err == nil {
txCost := app.bandwidthHandler.GetTxCost(ctx, app.currentCreditPrice, tx)
accBw := app.bandwidthHandler.GetCurrentAccBandwidth(ctx, acc)

if !accBw.HasEnoughRemained(txCost) {
err = cbd.ErrNotEnoughBandwidth()
} else {

resp := app.BaseApp.CheckTx(txBytes)
if resp.Code == 0 {
writeCache()
app.bandwidthHandler.ConsumeAccBandwidth(ctx, accBw, txCost)
}

return resp
}
}

result := err.Result()

return abci.ResponseCheckTx{
Code: uint32(result.Code),
Data: result.Data,
@@ -354,23 +355,26 @@ func (app *CyberdApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {
GasUsed: int64(result.GasUsed),
Tags: result.Tags,
}

}

func (app *CyberdApp) DeliverTx(txBytes []byte) (res abci.ResponseDeliverTx) {

cachedCtx, writeCache := app.NewContext(false, abci.Header{Height: app.latestBlockHeight}).CacheContext()
ctx := app.NewContext(true, abci.Header{Height: app.latestBlockHeight})
tx, acc, err := app.decodeTxAndAcc(ctx, txBytes)

var tx, err = app.txDecoder(txBytes)
if err == nil {

spent, err := app.bandwidthHandler(cachedCtx, app.currentCreditPrice, tx)
if err == nil {
txCost := app.bandwidthHandler.GetTxCost(ctx, app.currentCreditPrice, tx)
accBw := app.bandwidthHandler.GetCurrentAccBandwidth(ctx, acc)

if !accBw.HasEnoughRemained(txCost) {
err = cbd.ErrNotEnoughBandwidth()
} else {

resp := app.BaseApp.DeliverTx(txBytes)
if resp.Code == 0 {
writeCache()
app.curBlockSpentBandwidth = app.curBlockSpentBandwidth + uint64(spent)
app.bandwidthHandler.ConsumeAccBandwidth(ctx, accBw, txCost)
app.curBlockSpentBandwidth = app.curBlockSpentBandwidth + uint64(txCost)
}

return abci.ResponseDeliverTx{
@@ -386,7 +390,6 @@ func (app *CyberdApp) DeliverTx(txBytes []byte) (res abci.ResponseDeliverTx) {
}

result := err.Result()

return abci.ResponseDeliverTx{
Code: uint32(result.Code),
Codespace: string(result.Codespace),
@@ -398,6 +401,32 @@ func (app *CyberdApp) DeliverTx(txBytes []byte) (res abci.ResponseDeliverTx) {
}
}

func (app *CyberdApp) decodeTxAndAcc(ctx sdk.Context, txBytes []byte) (auth.StdTx, sdk.AccAddress, sdk.Error) {

decoded, err := app.txDecoder(txBytes)
if err != nil {
return auth.StdTx{}, nil, err
}

tx := decoded.(auth.StdTx)
if tx.GetMsgs() == nil || len(tx.GetMsgs()) == 0 {
return tx, nil, sdk.ErrInternal("Tx.GetMsgs() must return at least one message in list")
}

if err := tx.ValidateBasic(); err != nil {
return tx, nil, err
}

// signers acc [0] bandwidth will be consumed
account := tx.GetSigners()[0]
acc := app.accountKeeper.GetAccount(ctx, account)
if acc == nil {
return tx, nil, sdk.ErrUnknownAddress(account.String())
}

return tx, account, nil
}

func getSignersTags(tx sdk.Tx) sdk.Tags {

signers := make(map[string]struct{})
2 changes: 1 addition & 1 deletion app/rpc.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ func (app *CyberdApp) Account(address sdk.AccAddress) auth.Account {
}

func (app *CyberdApp) AccountBandwidth(address sdk.AccAddress) bdwth.AcсBandwidth {
return app.accBandwidthKeeper.GetCurrentAccBandwidth(app.RpcContext(), address)
return app.bandwidthHandler.GetCurrentAccBandwidth(app.RpcContext(), address)
}

func (app *CyberdApp) IsLinkExist(from cbd.Cid, to cbd.Cid, address sdk.AccAddress) bool {
15 changes: 11 additions & 4 deletions app/types/errors.go
Original file line number Diff line number Diff line change
@@ -7,10 +7,11 @@ import (

const (
// Base error codes
CodeOK sdk.CodeType = 0
CodeLinkAlreadyExist sdk.CodeType = 1
CodeInvalidCid sdk.CodeType = 2
CodeCidNotFound sdk.CodeType = 3
CodeOK sdk.CodeType = 0
CodeLinkAlreadyExist sdk.CodeType = 1
CodeInvalidCid sdk.CodeType = 2
CodeCidNotFound sdk.CodeType = 3
CodeNotEnoughBandwidth sdk.CodeType = 4

// Code space
CodespaceCbd sdk.CodespaceType = "cyberd"
@@ -24,6 +25,8 @@ func codeToDefaultMsg(code sdk.CodeType) string {
return "cid not found"
case CodeLinkAlreadyExist:
return "link already exists"
case CodeNotEnoughBandwidth:
return "not enough bandwidth to make transaction"
default:
return fmt.Sprintf("unknown error: code %d", code)
}
@@ -36,6 +39,10 @@ func ErrInvalidCid() sdk.Error {
return newError(CodespaceCbd, CodeInvalidCid)
}

func ErrNotEnoughBandwidth() sdk.Error {
return newError(CodespaceCbd, CodeNotEnoughBandwidth)
}

func ErrCidNotFound() sdk.Error {
return newError(CodespaceCbd, CodeCidNotFound)
}
2 changes: 1 addition & 1 deletion x/bandwidth/cost.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ import (
"github.com/cybercongress/cyberd/x/link"
)

func MsgBandwidthCost(msg sdk.Msg) int64 {
func MsgBandwidthCosts(msg sdk.Msg) int64 {
switch msg.(type) {
case link.Msg:
return LinkMsgCost
4 changes: 2 additions & 2 deletions x/bandwidth/genesis.go
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@ import (
)

// Genesis accounts should contains fully restored bandwidth on block 0
func InitGenesis(ctx sdk.Context, bwKeeper AccountBandwidthKeeper, accs []genesis.GenesisAccount) {
func InitGenesis(ctx sdk.Context, bwHandler types.Handler, bwKeeper types.Keeper, accs []genesis.GenesisAccount) {

for _, acc := range accs {
accMaxBw := bwKeeper.GetAccMaxBandwidth(ctx, acc.Address)
accMaxBw := bwHandler.GetAccMaxBandwidth(ctx, acc.Address)
bwKeeper.SetAccBandwidth(ctx, types.NewGenesisAccBandwidth(acc.Address, accMaxBw))
}
}
80 changes: 40 additions & 40 deletions x/bandwidth/handler.go
Original file line number Diff line number Diff line change
@@ -6,55 +6,55 @@ import (
"github.com/cybercongress/cyberd/x/bandwidth/types"
)

func NewBandwidthHandler(
accKeeper auth.AccountKeeper, bwKeeper AccountBandwidthKeeper, msgCost types.MsgBandwidthCost,
) types.BandwidthHandler {
var _ types.Handler = BaseHandler{}

return func(ctx sdk.Context, price float64, tx sdk.Tx) (int64, sdk.Error) {
type BaseHandler struct {
// data providers
accKeeper auth.AccountKeeper
stakeProvider types.AccStakeProvider
bwKeeper types.Keeper

account, sdkErr := getAccount(ctx, accKeeper, tx.(auth.StdTx))
if sdkErr != nil {
return 0, sdkErr
}

// We should call this function cause total stake could be changed since last update
// and currently we can't intercept all AccountKeeper interactions.
// This method calls bandwidth.`Recover()` under the hood, so everything should work fine.
accountBandwidth := bwKeeper.GetCurrentAccBandwidth(ctx, account)

bandwidthForTx := TxCost
for _, msg := range tx.GetMsgs() {
bandwidthForTx = bandwidthForTx + msgCost(msg)
}

if !accountBandwidth.HasEnoughRemained(int64(float64(bandwidthForTx) * price)) {
return 0, sdk.ErrInternal("Not enough bandwidth to make transaction! ")
}

accountBandwidth.Consume(int64(float64(bandwidthForTx) * price))
bwKeeper.SetAccBandwidth(ctx, accountBandwidth)

return bandwidthForTx, nil
}
// bw configuration
msgCost types.MsgBandwidthCost
}

func getAccount(ctx sdk.Context, accKeeper auth.AccountKeeper, tx auth.StdTx) (sdk.AccAddress, sdk.Error) {
func NewHandler(
ak auth.AccountKeeper, sp types.AccStakeProvider, bwKeeper types.Keeper, msgCost types.MsgBandwidthCost,
) BaseHandler {

if tx.GetMsgs() == nil || len(tx.GetMsgs()) == 0 {
return nil, sdk.ErrInternal("Tx.GetMsgs() must return at least one message in list")
return BaseHandler{
accKeeper: ak,
stakeProvider: sp,
bwKeeper: bwKeeper,
msgCost: msgCost,
}
}

if err := tx.ValidateBasic(); err != nil {
return nil, err
func (h BaseHandler) GetTxCost(ctx sdk.Context, price float64, tx sdk.Tx) int64 {
bandwidthForTx := TxCost
for _, msg := range tx.GetMsgs() {
bandwidthForTx = bandwidthForTx + h.msgCost(msg)
}
return bandwidthForTx
}

// signers acc [0] bandwidth will be consumed
account := tx.GetSigners()[0]
func (h BaseHandler) GetAccMaxBandwidth(ctx sdk.Context, addr sdk.AccAddress) int64 {
accStakePercentage := h.stakeProvider.GetAccStakePercentage(ctx, addr)
return int64(accStakePercentage * float64(MaxNetworkBandwidth) / 2)
}

acc := accKeeper.GetAccount(ctx, account)
if acc == nil {
return nil, sdk.ErrUnknownAddress(account.String())
}
func (h BaseHandler) GetCurrentAccBandwidth(ctx sdk.Context, address sdk.AccAddress) types.AcсBandwidth {
accBw := h.bwKeeper.GetAccBandwidth(ctx, address)
accMaxBw := h.GetAccMaxBandwidth(ctx, address)
accBw.UpdateMax(accMaxBw, ctx.BlockHeight(), RecoveryPeriod)
return accBw
}

return account, nil
// Double save for case:
// When acc send coins, we should consume bw before cutting max bw.
func (h BaseHandler) ConsumeAccBandwidth(ctx sdk.Context, bw types.AcсBandwidth, amt int64) {
bw.Consume(amt)
h.bwKeeper.SetAccBandwidth(ctx, bw)
bw = h.GetCurrentAccBandwidth(ctx, bw.Address)
h.bwKeeper.SetAccBandwidth(ctx, bw)
}
Loading

0 comments on commit edee772

Please sign in to comment.