Skip to content

Commit

Permalink
node: Modify error handling for CheckPending method in the Governor
Browse files Browse the repository at this point in the history
Previous rollouts of the Flow Cancel feature contained issues when
calculating the Governor usage when usage was near the daily limit. This
caused an invariant to be violated. However, this was propagated to the
processor code and resulted in the processor restarting the entire
process. Instead, the Governor should simply fail-closed and report that
there is no remaining capacity, causing further VAAs to be queued until
the usage diminishes over time.
The circumstances leading to the invariant violations are not addressed
in this commit. Instead this commit reworks the way errors are handled
by the CheckPending, making careful choices about when the process
should or should not be killed.

- Change "invariant" error handling: instead of causing the process to
  die, log an error and skip further for a single chain while allowing
  processing for other chains to continue
- Remove 'invariant error' in TrimAndSumValueForChain as it can occur
  somewhat regularly with the addition of the flow cancel feature
- Return dailyLimit in error condition rather than 0 so that future
  transfers will be queued
- Do not cap the sum returned from TrimAndSumValueForChain: instead
  allow it to exceed the daily limit.
- Modify unit tests to reflect this
- Add unit tests for overflow/underflow scenarios in the TrimAndSumValue
  functions
- Change other less severe error cases to log warnings instead of
  returning errors.
- Generally prevent flow-cancel related issues from affecting normal
  Governor operations. Instead the flow cancel transfers should simply
  not be populated and thus result in "GovernorV1" behavior.
- Add documentation to CheckPendingForTime to explain the dangers of
  returning an error
- Reword error messages to be more precise and include more relevant
  fields. Add documentation explaining when the process should and
  should not die
  • Loading branch information
johnsaigle committed Jul 16, 2024
1 parent be3696a commit 013d79a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 44 deletions.
97 changes: 63 additions & 34 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"math"
"math/big"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -675,19 +676,23 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked(
return true, ce, token, payload, nil
}

// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing
// queued transfers.
func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
return gov.CheckPendingForTime(time.Now())
}

// Iterates over all pending transfers for all of the chain entries configured for the Governor.
// If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by
// CheckPendingForTime If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by
// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published.
// A transfer is ready to be released when one of the following conditions holds:
// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of
// the Governor's current capacity)
// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity.
// This happens either because other transfers get released after 24 hours or because incoming transfers of
// flow-cancelling assets have freed up outbound capacity.
//
// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a
// signal to RESTART THE NODE. Therefore, errors returned by this function effectively act as panics.
func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) {
gov.mutex.Lock()
defer gov.mutex.Unlock()
Expand All @@ -703,16 +708,21 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
}

// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, cid := range gov.chainIds {
ce := gov.chains[cid]
for _, chainId := range gov.chainIds {
ce := gov.chains[chainId]
// Keep going as long as we find something that will fit.
for {
foundOne := false
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
if err != nil {
gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err))
gov.logger.Error("refusing to release transfers for this chain until the sum can be correctly calculated",
zap.Stringer("chainId", chainId),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Error(err))
gov.msgsToPublish = msgsToPublish
return nil, err
// Skip further processing for this chain entry
break
}

// Keep going until we find something that fits or hit the end.
Expand Down Expand Up @@ -769,7 +779,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", pe.dbData.Msg.MessageIDString()))
zap.String("msgID", pe.dbData.Msg.MessageIDString()),
zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels)))
}

payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload)
Expand All @@ -781,7 +792,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
)
delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below.
} else {
// If we get here, publish it and remove it from the pending list.
// If we get here, publish it and move it from the pending list to the
// transfers list. Also add a flow-cancel transfer to the destination chain
// if the transfer is sending a flow-canceling asset.
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)

if countsTowardsTransfers {
Expand All @@ -797,17 +810,32 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
Hash: pe.hash,
}

if err := gov.db.StoreTransfer(&dbTransfer); err != nil {
gov.msgsToPublish = msgsToPublish
transfer, err := newTransferFromDbTransfer(&dbTransfer)
if err != nil {
// Should never occur unless dbTransfer.Value overflows MaxInt64
gov.logger.Error("could not convert dbTransfer to transfer",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash),
zap.Error(err),
)
// This causes the process to die. We don't want to process transfers that
// have USD value in excess of MaxInt64 under any circumstances.
// This check should occur before the call to the database so
// that we don't store a problematic transfer.
return nil, err
}

transfer, err := newTransferFromDbTransfer(&dbTransfer)
if err != nil {
if err := gov.db.StoreTransfer(&dbTransfer); err != nil {
gov.msgsToPublish = msgsToPublish
// This causes the process to die. We can't tolerate DB connection
// errors.
return nil, err
}

ce.transfers = append(ce.transfers, transfer)

gov.msgsSeen[pe.hash] = transferComplete

// Add inverse transfer to destination chain entry if this asset can cancel flows.
key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr}
tokenEntry := gov.tokens[key]
Expand All @@ -817,7 +845,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok {

if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil {
return nil, err
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash),
zap.Error(err),
)
// Process the next pending transfer
continue
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
Expand All @@ -828,7 +862,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
}
}
}
gov.msgsSeen[pe.hash] = transferComplete
} else {
delete(gov.msgsSeen, pe.hash)
}
Expand Down Expand Up @@ -872,46 +905,42 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
return value, nil
}

// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `emitter`. In effect, it represents a
// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a
// chain's "Governor Usage" for a given 24 hour period.
// This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token
// that have the `emitter` as their destination chain.
// The resulting `sum` return value therefore represents the net flow across a chain when taking flow-cancelling tokens
// into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain.
// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes
// As a side-effect, this function modifies the parameter `chainEntry`, updating its `transfers` field so that it only includes
// filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`).
// Returns an error if the sum cannot be calculated. The transfers field will still be updated in this case. When
// an error condition occurs, this function returns the chain's `dailyLimit` as the sum. This should result in the
// chain appearing at maximum capacity from the perspective of the Governor, and therefore cause new transfers to be
// queued until space opens up.
// SECURITY Invariant: The `sum` return value should never be less than 0
// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain
func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) {
// Sum the value of all outgoing transfers
var sumOutgoing int64
sumOutgoing, emitter.transfers, err = gov.TrimAndSumValue(emitter.transfers, startTime)
func (gov *ChainGovernor) TrimAndSumValueForChain(chainEntry *chainEntry, startTime time.Time) (sum uint64, err error) {
// Sum the value of all transfers for this chain. This sum can be negative if flow-cancelling is enabled
// and the incoming value of flow-cancelling assets exceeds the summed value of all outgoing assets.
var sumValue int64
sumValue, chainEntry.transfers, err = gov.TrimAndSumValue(chainEntry.transfers, startTime)
if err != nil {
return 0, err
// Return the daily limit as the sum so that any further transfers will be queued.
return chainEntry.dailyLimit, err
}

// Return early if the sum is not positive as it cannot exceed the daily limit.
// In this case, return 0 even if the sum is negative.
if sumOutgoing <= 0 {
// Return 0 even if the sum is negative.
if sumValue <= 0 {
return 0, nil
}

sum = uint64(sumOutgoing)
if sum > emitter.dailyLimit {
return 0, fmt.Errorf(
"invariant violation: calculated sum %d exceeds Governor limit %d",
sum,
emitter.dailyLimit,
)
}

return sum, nil
return uint64(sumValue), nil

}

// TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that
// are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value,
// and returns the sum and the filtered transfers.
// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`.
// The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the
// Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read
// from the actual on-chain transaction.
Expand Down
10 changes: 7 additions & 3 deletions node/pkg/governor/governor_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,19 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
}
ce.transfers = append(ce.transfers, transfer)

// Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but
// that function does not capture flow-cancelling when the node is restarted.
// Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and
// `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted.
tokenEntry := gov.tokens[tk]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil {
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", xfer.MsgID),
zap.String("hash", xfer.Hash), zap.Error(err),
)
return err
}
} else {
Expand Down
75 changes: 68 additions & 7 deletions node/pkg/governor/governor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,12 @@ func TestFlowCancelCannotUnderflow(t *testing.T) {
assert.Zero(t, usage)
}

// Simulate a case where the total sum of transfers for a chain in a 24 hour period exceeds
// the configured Governor limit. This should never happen, so we make sure that an error
// is returned if the system is in this state
func TestInvariantGovernorLimit(t *testing.T) {
// We never expect this to occur when flow-cancelling is disabled. If flow-cancelling is enabled, there
// are some cases where the outgoing value exceeds the daily limit. Example: a large, incoming transfer
// of a flow-cancelling asset increases the Governor capacity beyond the daily limit. After 24h, that
// transfer is trimmed. This reduces the daily limit back to normal, but by this time more outgoing
// transfers have been emitted, causing the sum to exceed the daily limit.
func TestChainEntrySumExceedsDailyLimit(t *testing.T) {
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -406,10 +408,69 @@ func TestInvariantGovernorLimit(t *testing.T) {
assert.Equal(t, expectedNumTransfers, len(transfers))
assert.NotZero(t, sum)

// Make sure we trigger the Invariant
usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24))
require.ErrorContains(t, err, "invariant violation: calculated sum")
assert.Zero(t, usage)
require.NoError(t, err)
assert.Equal(t, emitterTransferValue*uint64(expectedNumTransfers), usage)
}

func TestTrimAndSumValueOverflowErrors(t *testing.T) {
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)
require.NoError(t, err)
assert.NotNil(t, gov)

now, err := time.Parse("2006-Jan-02", "2024-Feb-19")
require.NoError(t, err)

var transfers_from_emitter []transfer
transferTime, err := time.Parse("2006-Jan-02", "2024-Feb-19")
require.NoError(t, err)

emitterChainId := vaa.ChainIDSolana

transfer, err := newTransferFromDbTransfer(&db.Transfer{Value: math.MaxInt64, Timestamp: transferTime})
require.NoError(t, err)
transfer2, err := newTransferFromDbTransfer(&db.Transfer{Value: 1, Timestamp: transferTime})
require.NoError(t, err)
transfers_from_emitter = append(transfers_from_emitter, transfer, transfer2)

// Populate chainEntry and ChainGovernor
emitter := &chainEntry{
transfers: transfers_from_emitter,
emitterChainId: vaa.ChainID(emitterChainId),
dailyLimit: 10000,
}
gov.chains[emitter.emitterChainId] = emitter

sum, _, err := gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24))
require.ErrorContains(t, err, "integer overflow")
assert.Zero(t, sum)
usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24))
require.ErrorContains(t, err, "integer overflow")
assert.Equal(t, uint64(10000), usage)

// overwrite emitter (discard transfer added above)
emitter = &chainEntry{
emitterChainId: vaa.ChainID(emitterChainId),
dailyLimit: 10000,
}
gov.chains[emitter.emitterChainId] = emitter

// Now test underflow
transfer3 := &db.Transfer{Value: math.MaxInt64, Timestamp: transferTime, TargetChain: vaa.ChainIDSolana}

ce := gov.chains[emitter.emitterChainId]
err = ce.addFlowCancelTransferFromDbTransfer(transfer3)
require.NoError(t, err)
err = ce.addFlowCancelTransferFromDbTransfer(transfer3)
require.NoError(t, err)

sum, _, err = gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24))
require.ErrorContains(t, err, "integer underflow")
assert.Zero(t, sum)
usage, err = gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24))
require.ErrorContains(t, err, "integer underflow")
assert.Equal(t, uint64(10000), usage)
}

func TestTrimOneOfTwoTransfers(t *testing.T) {
Expand Down

0 comments on commit 013d79a

Please sign in to comment.