diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 98b67e6fe4..685a297bfc 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -32,6 +32,7 @@ import ( "math" "math/big" "sort" + "strconv" "sync" "time" @@ -675,12 +676,13 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked( return true, ce, token, payload, nil } +// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing +// queued transfers. func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } -// Iterates over all pending transfers for all of the chain entries configured for the Governor. -// If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by +// CheckPendingForTime If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by // moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published. // A transfer is ready to be released when one of the following conditions holds: // - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of @@ -688,6 +690,9 @@ func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { // - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity. // This happens either because other transfers get released after 24 hours or because incoming transfers of // flow-cancelling assets have freed up outbound capacity. +// +// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a +// signal to RESTART THE NODE. Therefore, errors returned by this function effectively act as panics. func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -703,16 +708,21 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP } // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly - for _, cid := range gov.chainIds { - ce := gov.chains[cid] + for _, chainId := range gov.chainIds { + ce := gov.chains[chainId] // Keep going as long as we find something that will fit. for { foundOne := false prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) if err != nil { gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) + gov.logger.Error("refusing to release transfers for this chain until the sum can be correctly calculated", + zap.Stringer("chainId", chainId), + zap.Uint64("prevTotalValue", prevTotalValue), + zap.Error(err)) gov.msgsToPublish = msgsToPublish - return nil, err + // Skip further processing for this chain entry + break } // Keep going until we find something that fits or hit the end. @@ -769,7 +779,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", pe.dbData.Msg.MessageIDString())) + zap.String("msgID", pe.dbData.Msg.MessageIDString()), + zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels))) } payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload) @@ -781,7 +792,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ) delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below. } else { - // If we get here, publish it and remove it from the pending list. + // If we get here, publish it and move it from the pending list to the + // transfers list. Also add a flow-cancel transfer to the destination chain + // if the transfer is sending a flow-canceling asset. msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if countsTowardsTransfers { @@ -797,17 +810,32 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP Hash: pe.hash, } - if err := gov.db.StoreTransfer(&dbTransfer); err != nil { - gov.msgsToPublish = msgsToPublish + transfer, err := newTransferFromDbTransfer(&dbTransfer) + if err != nil { + // Should never occur unless dbTransfer.Value overflows MaxInt64 + gov.logger.Error("could not convert dbTransfer to transfer", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // This causes the process to die. We don't want to process transfers that + // have USD value in excess of MaxInt64 under any circumstances. + // This check should occur before the call to the database so + // that we don't store a problematic transfer. return nil, err } - transfer, err := newTransferFromDbTransfer(&dbTransfer) - if err != nil { + if err := gov.db.StoreTransfer(&dbTransfer); err != nil { + gov.msgsToPublish = msgsToPublish + // This causes the process to die. We can't tolerate DB connection + // errors. return nil, err } + ce.transfers = append(ce.transfers, transfer) + gov.msgsSeen[pe.hash] = transferComplete + // Add inverse transfer to destination chain entry if this asset can cancel flows. key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} tokenEntry := gov.tokens[key] @@ -817,7 +845,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return nil, err + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // Process the next pending transfer + continue } } else { gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", @@ -828,7 +862,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP } } } - gov.msgsSeen[pe.hash] = transferComplete } else { delete(gov.msgsSeen, pe.hash) } @@ -872,46 +905,42 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } -// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `emitter`. In effect, it represents a +// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token // that have the `emitter` as their destination chain. // The resulting `sum` return value therefore represents the net flow across a chain when taking flow-cancelling tokens // into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. -// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes +// As a side-effect, this function modifies the parameter `chainEntry`, updating its `transfers` field so that it only includes // filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). +// Returns an error if the sum cannot be calculated. The transfers field will still be updated in this case. When +// an error condition occurs, this function returns the chain's `dailyLimit` as the sum. This should result in the +// chain appearing at maximum capacity from the perspective of the Governor, and therefore cause new transfers to be +// queued until space opens up. // SECURITY Invariant: The `sum` return value should never be less than 0 -// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain -func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { - // Sum the value of all outgoing transfers - var sumOutgoing int64 - sumOutgoing, emitter.transfers, err = gov.TrimAndSumValue(emitter.transfers, startTime) +func (gov *ChainGovernor) TrimAndSumValueForChain(chainEntry *chainEntry, startTime time.Time) (sum uint64, err error) { + // Sum the value of all transfers for this chain. This sum can be negative if flow-cancelling is enabled + // and the incoming value of flow-cancelling assets exceeds the summed value of all outgoing assets. + var sumValue int64 + sumValue, chainEntry.transfers, err = gov.TrimAndSumValue(chainEntry.transfers, startTime) if err != nil { - return 0, err + // Return the daily limit as the sum so that any further transfers will be queued. + return chainEntry.dailyLimit, err } - // Return early if the sum is not positive as it cannot exceed the daily limit. - // In this case, return 0 even if the sum is negative. - if sumOutgoing <= 0 { + // Return 0 even if the sum is negative. + if sumValue <= 0 { return 0, nil } - sum = uint64(sumOutgoing) - if sum > emitter.dailyLimit { - return 0, fmt.Errorf( - "invariant violation: calculated sum %d exceeds Governor limit %d", - sum, - emitter.dailyLimit, - ) - } - - return sum, nil + return uint64(sumValue), nil } // TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that // are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value, // and returns the sum and the filtered transfers. +// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`. // The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the // Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read // from the actual on-chain transaction. diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index a71fb194fa..9eb4f5b1de 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -243,15 +243,19 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { } ce.transfers = append(ce.transfers, transfer) - // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, - // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but - // that function does not capture flow-cancelling when the node is restarted. + // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and + // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. tokenEntry := gov.tokens[tk] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", xfer.MsgID), + zap.String("hash", xfer.Hash), zap.Error(err), + ) return err } } else { diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 3f5d868f5e..1e4e56fe29 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -360,10 +360,12 @@ func TestFlowCancelCannotUnderflow(t *testing.T) { assert.Zero(t, usage) } -// Simulate a case where the total sum of transfers for a chain in a 24 hour period exceeds -// the configured Governor limit. This should never happen, so we make sure that an error -// is returned if the system is in this state -func TestInvariantGovernorLimit(t *testing.T) { +// We never expect this to occur when flow-cancelling is disabled. If flow-cancelling is enabled, there +// are some cases where the outgoing value exceeds the daily limit. Example: a large, incoming transfer +// of a flow-cancelling asset increases the Governor capacity beyond the daily limit. After 24h, that +// transfer is trimmed. This reduces the daily limit back to normal, but by this time more outgoing +// transfers have been emitted, causing the sum to exceed the daily limit. +func TestChainEntrySumExceedsDailyLimit(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) require.NoError(t, err) @@ -406,10 +408,69 @@ func TestInvariantGovernorLimit(t *testing.T) { assert.Equal(t, expectedNumTransfers, len(transfers)) assert.NotZero(t, sum) - // Make sure we trigger the Invariant usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) - require.ErrorContains(t, err, "invariant violation: calculated sum") - assert.Zero(t, usage) + require.NoError(t, err) + assert.Equal(t, emitterTransferValue*uint64(expectedNumTransfers), usage) +} + +func TestTrimAndSumValueOverflowErrors(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + var transfers_from_emitter []transfer + transferTime, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + emitterChainId := vaa.ChainIDSolana + + transfer, err := newTransferFromDbTransfer(&db.Transfer{Value: math.MaxInt64, Timestamp: transferTime}) + require.NoError(t, err) + transfer2, err := newTransferFromDbTransfer(&db.Transfer{Value: 1, Timestamp: transferTime}) + require.NoError(t, err) + transfers_from_emitter = append(transfers_from_emitter, transfer, transfer2) + + // Populate chainEntry and ChainGovernor + emitter := &chainEntry{ + transfers: transfers_from_emitter, + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + sum, _, err := gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Zero(t, sum) + usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Equal(t, uint64(10000), usage) + + // overwrite emitter (discard transfer added above) + emitter = &chainEntry{ + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + // Now test underflow + transfer3 := &db.Transfer{Value: math.MaxInt64, Timestamp: transferTime, TargetChain: vaa.ChainIDSolana} + + ce := gov.chains[emitter.emitterChainId] + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + + sum, _, err = gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Zero(t, sum) + usage, err = gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Equal(t, uint64(10000), usage) } func TestTrimOneOfTwoTransfers(t *testing.T) {