Skip to content

Commit

Permalink
node: Fix tokenEntry when checking flow cancel for pending transfers
Browse files Browse the repository at this point in the history
(Squash and merge bug fix from PR #4001)
Similar to a previous issue in the function `ProcessMsgForTime`, the
tokenEntry was not being generated properly.
This should result in queued "small transfers" being able to flow cancel
when they are released from the queue.
Also adds a comment on the CheckedInt64 function to indicate what its
error states mean and when they occur.

Add comments and change variable names for governor_monitoring
- Add function comments to explain what they do and what their error
  states mean
- Adds governor logging to error cases
- Change variable names in publishStatus function. `value` was used
  first to indicate the "governor usage" and then reused to indicate the
  remaining available notional value for a chain. This refactor tries to
  make it clear that these are different concepts

Add unit test for flow cancelling when a pending transfer is
released

- Add a unit test to ensure that, when a pending transfer is released,
  it also does flow-cancelling on the TargetChain (previously we had a
  bug here)
- Add documentation for CheckPendingForTime to clarify that it has
  side-effects
  • Loading branch information
johnsaigle committed Jul 16, 2024
1 parent 2cea90b commit be3696a
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 14 deletions.
16 changes: 14 additions & 2 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error)
// ensure that the Governor usage cannot be lowered due to malicious or invalid transfers.
// - the Value must be negative (in order to represent an incoming value)
// - the TargetChain must match the chain ID of the Chain Entry
// - the flow cancel value must always be less than the big transfer limit
func (ce *chainEntry) addFlowCancelTransfer(transfer transfer) error {
value := transfer.value
targetChain := transfer.dbTransfer.TargetChain
Expand Down Expand Up @@ -678,6 +679,15 @@ func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
return gov.CheckPendingForTime(time.Now())
}

// Iterates over all pending transfers for all of the chain entries configured for the Governor.
// If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by
// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published.
// A transfer is ready to be released when one of the following conditions holds:
// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of
// the Governor's current capacity)
// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity.
// This happens either because other transfers get released after 24 hours or because incoming transfers of
// flow-cancelling assets have freed up outbound capacity.
func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) {
gov.mutex.Lock()
defer gov.mutex.Unlock()
Expand Down Expand Up @@ -799,12 +809,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
ce.transfers = append(ce.transfers, transfer)

// Add inverse transfer to destination chain entry if this asset can cancel flows.
key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress}
key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr}
tokenEntry := gov.tokens[key]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok {

if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil {
return nil, err
}
Expand Down Expand Up @@ -973,7 +984,8 @@ func CheckedAddUint64(x uint64, y uint64) (uint64, error) {
return sum, nil
}

// CheckedAddInt64 adds two uint64 values with overflow checks
// CheckedAddInt64 adds two uint64 values with overflow checks. Returns an error if the calculation would
// overflow or underflow. In this case, the returned value is 0.
func CheckedAddInt64(x int64, y int64) (int64, error) {
if x == 0 {
return y, nil
Expand Down
33 changes: 21 additions & 12 deletions node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time,
}

// sumValue sums the value of all `transfers`. See also `TrimAndSumValue`.
// Returns an error if the sum of all transfers would overflow the bounds of Int64. In this case, the function
// returns a value of 0.
func sumValue(transfers []transfer, startTime time.Time) (uint64, error) {
if len(transfers) == 0 {
return 0, nil
Expand Down Expand Up @@ -277,18 +279,20 @@ func sumValue(transfers []transfer, startTime time.Time) (uint64, error) {
return uint64(sum), nil
}

// REST query to get the current available notional value per chain.
// REST query to get the current available notional value per chain. This is defined as the sum of all transfers
// subtracted from the chains's dailyLimit.
func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry) {
gov.mutex.Lock()
defer gov.mutex.Unlock()

startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, cid := range gov.chainIds {
ce := gov.chains[cid]
for _, chainId := range gov.chainIds {
ce := gov.chains[chainId]
value, err := sumValue(ce.transfers, startTime)
if err != nil {
// Don't return an error here, just return 0
// Don't return an error here, just return 0.
gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry", zap.String("chainID", chainId.String()), zap.Error(err))
return make([]*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry, 0)
}
if value >= ce.dailyLimit {
Expand Down Expand Up @@ -448,6 +452,7 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- []
if err != nil {
// Error can occur if the sum overflows. Return 0 in this case rather than returning an
// error.
gov.logger.Error("CollectMetrics: failed to compute sum of transfers for chain entry", zap.String("chain", chain.String()), zap.Error(err))
value = 0
}
if value >= ce.dailyLimit {
Expand Down Expand Up @@ -556,16 +561,20 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b
func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0)
numEnqueued := 0
for _, ce := range gov.chains {
value, err := sumValue(ce.transfers, startTime)
for chainId, ce := range gov.chains {
// The capacity for the chain to emit further messages, denoted as USD value.
remainingAvailableNotional := uint64(0)
// A chain's governor usage is the sum of all outgoing transfers and incoming flow-cancelling transfers
governorUsage, err := sumValue(ce.transfers, startTime)

if err != nil || value >= ce.dailyLimit {
// In case of error, set value to 0 rather than returning an error to the caller. An error
if err != nil {
// In case of error, set remainingAvailableNotional to 0 rather than returning an error to the caller. An error
// here means sumValue has encountered an overflow and this should never happen. Even if it did
// we don't want to stop execution here.
value = 0
} else {
value = ce.dailyLimit - value
gov.logger.Error("publishStatus: failed to compute sum of transfers for chain entry", zap.String("chain", chainId.String()), zap.Error(err))
} else if governorUsage < ce.dailyLimit {
// `remainingAvailableNotional` is 0 unless the current usage is strictly less than the limit.
remainingAvailableNotional = ce.dailyLimit - governorUsage
}

enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0)
Expand Down Expand Up @@ -595,7 +604,7 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b

chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: value,
RemainingAvailableNotional: remainingAvailableNotional,
Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter},
})
}
Expand Down
Loading

0 comments on commit be3696a

Please sign in to comment.