Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm, p2p/protocols: Stream accounting #18337

Merged
merged 15 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 76 additions & 74 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,33 @@ import (
"github.com/ethereum/go-ethereum/metrics"
)

//define some metrics
// define some metrics
var (
//All metrics are cumulative
// All metrics are cumulative

//total amount of units credited
// total amount of units credited
mBalanceCredit metrics.Counter
//total amount of units debited
// total amount of units debited
mBalanceDebit metrics.Counter
//total amount of bytes credited
// total amount of bytes credited
mBytesCredit metrics.Counter
//total amount of bytes debited
// total amount of bytes debited
mBytesDebit metrics.Counter
//total amount of credited messages
// total amount of credited messages
mMsgCredit metrics.Counter
//total amount of debited messages
// total amount of debited messages
mMsgDebit metrics.Counter
//how many times local node had to drop remote peers
// how many times local node had to drop remote peers
mPeerDrops metrics.Counter
//how many times local node overdrafted and dropped
// how many times local node overdrafted and dropped
mSelfDrops metrics.Counter

MetricsRegistry metrics.Registry
)

//Prices defines how prices are being passed on to the accounting instance
// Prices defines how prices are being passed on to the accounting instance
type Prices interface {
//Return the Price for a message
// Return the Price for a message
Price(interface{}) *Price
}

Expand All @@ -57,20 +59,20 @@ const (
Receiver = Payer(false)
)

//Price represents the costs of a message
// Price represents the costs of a message
type Price struct {
Value uint64 //
PerByte bool //True if the price is per byte or for unit
Value uint64
PerByte bool // True if the price is per byte or for unit
Payer Payer
}

//For gives back the price for a message
//A protocol provides the message price in absolute value
//This method then returns the correct signed amount,
//depending on who pays, which is identified by the `payer` argument:
//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
//Thus: If Sending and sender pays, amount positive, otherwise negative
//If Receiving, and receiver pays, amount positive, otherwise negative
// For gives back the price for a message
// A protocol provides the message price in absolute value
// This method then returns the correct signed amount,
// depending on who pays, which is identified by the `payer` argument:
// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
// Thus: If Sending and sender pays, amount positive, otherwise negative
// If Receiving, and receiver pays, amount positive, otherwise negative
func (p *Price) For(payer Payer, size uint32) int64 {
price := p.Value
if p.PerByte {
Expand All @@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
return int64(price)
}

//Balance is the actual accounting instance
//Balance defines the operations needed for accounting
//Implementations internally maintain the balance for every peer
// Balance is the actual accounting instance
// Balance defines the operations needed for accounting
// Implementations internally maintain the balance for every peer
type Balance interface {
//Adds amount to the local balance with remote node `peer`;
//positive amount = credit local node
//negative amount = debit local node
// Adds amount to the local balance with remote node `peer`;
// positive amount = credit local node
// negative amount = debit local node
Add(amount int64, peer *Peer) error
}

//Accounting implements the Hook interface
//It interfaces to the balances through the Balance interface,
//while interfacing with protocols and its prices through the Prices interface
// Accounting implements the Hook interface
// It interfaces to the balances through the Balance interface,
// while interfacing with protocols and its prices through the Prices interface
type Accounting struct {
Balance //interface to accounting logic
Prices //interface to prices logic
Balance // interface to accounting logic
Prices // interface to prices logic
}

func NewAccounting(balance Balance, po Prices) *Accounting {
Expand All @@ -108,87 +110,87 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}

//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
//this registry should be independent of any other metrics as it persists at different endpoints.
//It also instantiates the given metrics and starts the persisting go-routine which
//at the passed interval writes the metrics to a LevelDB
// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
//create an empty registry
registry := metrics.NewRegistry()
//instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
//create the DB and start persisting
return NewAccountingMetrics(registry, reportInterval, path)
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
}

//Implement Hook.Send
// Implement Hook.Send
// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec)
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
//this message doesn't need accounting
// this message doesn't need accounting
if price == nil {
return nil
}
//evaluate the price for sending messages
// evaluate the price for sending messages
costToLocalNode := price.For(Sender, size)
//do the accounting
// do the accounting
err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}

//Implement Hook.Receive
// Implement Hook.Receive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the comments that denote that it implements an interface method, comment them on the struct instead saying the Accounting implements the Hook interface. the same goes for the Send method.

// Receive takes a peer, a size and a msg and
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec)
// get the price for a message (through the protocol spec)
price := ah.Price(msg)
//this message doesn't need accounting
// this message doesn't need accounting
if price == nil {
return nil
}
//evaluate the price for receiving messages
// evaluate the price for receiving messages
costToLocalNode := price.For(Receiver, size)
//do the accounting
// do the accounting
err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}

//record some metrics
//this is not an error handling. `err` is returned by both `Send` and `Receive`
//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
//if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
// record some metrics
// this is not an error handling. `err` is returned by both `Send` and `Receive`
// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
// if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
if price > 0 {
mBalanceCredit.Inc(price)
mBytesCredit.Inc(int64(size))
mMsgCredit.Inc(1)
if err != nil {
//increase the number of times a remote node has been dropped due to "overdraft"
// increase the number of times a remote node has been dropped due to "overdraft"
mPeerDrops.Inc(1)
}
} else {
mBalanceDebit.Inc(price)
mBytesDebit.Inc(int64(size))
mMsgDebit.Inc(1)
if err != nil {
//increase the number of times the local node has done an "overdraft" in respect to other nodes
// increase the number of times the local node has done an "overdraft" in respect to other nodes
mSelfDrops.Inc(1)
}
}
Expand Down
Loading