diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index c1bf0efb8b..98b67e6fe4 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -133,6 +133,7 @@ func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) // ensure that the Governor usage cannot be lowered due to malicious or invalid transfers. // - the Value must be negative (in order to represent an incoming value) // - the TargetChain must match the chain ID of the Chain Entry +// - the flow cancel value must always be less than the big transfer limit func (ce *chainEntry) addFlowCancelTransfer(transfer transfer) error { value := transfer.value targetChain := transfer.dbTransfer.TargetChain @@ -678,6 +679,15 @@ func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } +// Iterates over all pending transfers for all of the chain entries configured for the Governor. +// If a pending message is ready to be released, modifies the chain entry's `pending` and `transfers` slices by +// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published. +// A transfer is ready to be released when one of the following conditions holds: +// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of +// the Governor's current capacity) +// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity. +// This happens either because other transfers get released after 24 hours or because incoming transfers of +// flow-cancelling assets have freed up outbound capacity. func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -799,12 +809,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ce.transfers = append(ce.transfers, transfer) // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress} + key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} tokenEntry := gov.tokens[key] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { + if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { return nil, err } @@ -973,7 +984,8 @@ func CheckedAddUint64(x uint64, y uint64) (uint64, error) { return sum, nil } -// CheckedAddInt64 adds two uint64 values with overflow checks +// CheckedAddInt64 adds two uint64 values with overflow checks. Returns an error if the calculation would +// overflow or underflow. In this case, the returned value is 0. func CheckedAddInt64(x int64, y int64) (int64, error) { if x == 0 { return y, nil diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index 6c7fb0b5b4..00458a0bf0 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -248,6 +248,8 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time, } // sumValue sums the value of all `transfers`. See also `TrimAndSumValue`. +// Returns an error if the sum of all transfers would overflow the bounds of Int64. In this case, the function +// returns a value of 0. func sumValue(transfers []transfer, startTime time.Time) (uint64, error) { if len(transfers) == 0 { return 0, nil @@ -277,18 +279,20 @@ func sumValue(transfers []transfer, startTime time.Time) (uint64, error) { return uint64(sum), nil } -// REST query to get the current available notional value per chain. +// REST query to get the current available notional value per chain. This is defined as the sum of all transfers +// subtracted from the chains's dailyLimit. func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry) { gov.mutex.Lock() defer gov.mutex.Unlock() startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly - for _, cid := range gov.chainIds { - ce := gov.chains[cid] + for _, chainId := range gov.chainIds { + ce := gov.chains[chainId] value, err := sumValue(ce.transfers, startTime) if err != nil { - // Don't return an error here, just return 0 + // Don't return an error here, just return 0. + gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry", zap.String("chainID", chainId.String()), zap.Error(err)) return make([]*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry, 0) } if value >= ce.dailyLimit { @@ -448,6 +452,7 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- [] if err != nil { // Error can occur if the sum overflows. Return 0 in this case rather than returning an // error. + gov.logger.Error("CollectMetrics: failed to compute sum of transfers for chain entry", zap.String("chain", chain.String()), zap.Error(err)) value = 0 } if value >= ce.dailyLimit { @@ -556,16 +561,20 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0) numEnqueued := 0 - for _, ce := range gov.chains { - value, err := sumValue(ce.transfers, startTime) + for chainId, ce := range gov.chains { + // The capacity for the chain to emit further messages, denoted as USD value. + remainingAvailableNotional := uint64(0) + // A chain's governor usage is the sum of all outgoing transfers and incoming flow-cancelling transfers + governorUsage, err := sumValue(ce.transfers, startTime) - if err != nil || value >= ce.dailyLimit { - // In case of error, set value to 0 rather than returning an error to the caller. An error + if err != nil { + // In case of error, set remainingAvailableNotional to 0 rather than returning an error to the caller. An error // here means sumValue has encountered an overflow and this should never happen. Even if it did // we don't want to stop execution here. - value = 0 - } else { - value = ce.dailyLimit - value + gov.logger.Error("publishStatus: failed to compute sum of transfers for chain entry", zap.String("chain", chainId.String()), zap.Error(err)) + } else if governorUsage < ce.dailyLimit { + // `remainingAvailableNotional` is 0 unless the current usage is strictly less than the limit. + remainingAvailableNotional = ce.dailyLimit - governorUsage } enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0) @@ -595,7 +604,7 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{ ChainId: uint32(ce.emitterChainId), - RemainingAvailableNotional: value, + RemainingAvailableNotional: remainingAvailableNotional, Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, }) } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 7229718a2f..3f5d868f5e 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -1544,6 +1544,269 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, 3, len(gov.msgsSeen)) } +// Test that, when a small transfer (under the 'big tx limit') of a flow-cancelling asset is queued and +// later released, it causes a reduction in the Governor usage for the destination chain. +func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { + + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + + require.NoError(t, err) + assert.NotNil(t, gov) + + // Set-up time + gov.setDayLengthInMinutes(24 * 60) + transferTime := time.Unix(int64(1654543099), 0) + + // Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works + // when the Origin chain of the asset does not match the emitter chain + // NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified + var flowCancelTokenOriginAddress vaa.Address + flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61") + require.NoError(t, err) + + require.NoError(t, err) + + // Data for Ethereum + tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec + tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum) + require.NoError(t, err) + recipientEthereum := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" //nolint:gosec + + // Data for Sui + tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec + tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui) + require.NoError(t, err) + recipientSui := "0x84a5f374d29fc77e370014dce4fd6a55b58ad608de8074b0be5571701724da31" + + // Data for Solana. Only used to represent the flow cancel asset. + // "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb" + tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec + + // Add chain entries to `gov` + dailyLimit := uint64(10000) + err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, dailyLimit, 0) + require.NoError(t, err) + + // Add flow cancel asset and non-flow cancelable asset to the token entry for `gov` + err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true) + require.NoError(t, err) + assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}]) + + // First message: consume most of the dailyLimit for the emitter chain + msg1 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), + Nonce: uint32(1), + Sequence: uint64(1), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, // The origin asset for the token being transferred + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 10000, + ), + } + + // Second message: This transfer gets queued because the limit is exhausted + msg2 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+2), 0), + Nonce: uint32(2), + Sequence: uint64(2), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 500, + ), + } + + // Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This + // reduces the Governor usage for that chain. + msg3 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+3), 0), + Nonce: uint32(3), + Sequence: uint64(3), + EmitterChain: vaa.ChainIDSui, + EmitterAddress: tokenBridgeAddrSui, + ConsistencyLevel: uint8(0), // Sui has a consistency level of 0 (instant) + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDEthereum, + recipientEthereum, + 1000, + ), + } + + // Stage 0: No transfers sent + chainEntryEthereum, exists := gov.chains[vaa.ChainIDEthereum] + assert.True(t, exists) + assert.NotNil(t, chainEntryEthereum) + chainEntrySui, exists := gov.chains[vaa.ChainIDSui] + assert.True(t, exists) + assert.NotNil(t, chainEntrySui) + sumEth, ethTransfers, err := gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, len(ethTransfers)) + assert.Zero(t, len(chainEntryEthereum.pending)) + assert.Zero(t, sumEth) + require.NoError(t, err) + sumSui, suiTransfers, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(1654543099), 0)) + assert.Zero(t, len(suiTransfers)) + assert.Zero(t, sumSui) + require.NoError(t, err) + + // Perform a FIRST transfer (Ethereum --> Sui) + result, err := gov.ProcessMsgForTime(&msg1, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + numTrans, netValueTrans, numPending, valuePending := gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, numTrans) // One for the positive and one for the negative + assert.Equal(t, int64(0), netValueTrans) // Zero, because the asset flow cancels + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) + assert.Equal(t, 1, len(gov.msgsSeen)) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Equal to total dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) + require.NoError(t, err) + + // Outbound check: + // - ensure that the sum of the transfers is equal to the value of the inverse transfer + // - ensure the actual governor usage is Zero (any negative value is converted to zero by TrimAndSumValueForChain) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, 1, len(suiTransfers)) // A single NEGATIVE transfer + assert.Equal(t, int64(-10000), sumSui) // Ensure the inverse (negative) transfer is in the Sui chain Entry + require.NoError(t, err) + suiGovernorUsage, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Perform a SECOND transfer (Ethereum --> Sui again) + // When a transfer is queued, ProcessMsgForTime should return false. + result, err = gov.ProcessMsgForTime(&msg2, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.False(t, result) + require.NoError(t, err) + + // Stage 2: Transfer sent from Ethereum to Sui gets queued + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, len(gov.msgsSeen)) // Two messages observed + assert.Equal(t, 2, numTrans) // Two transfers (same as previous step) + assert.Equal(t, int64(0), netValueTrans) // The two transfers and their inverses cancel each other out. + assert.Equal(t, 1, numPending) // Second transfer is queued because the limit is exhausted + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor. + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) // One from previous step + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) // One inverse transfer. Inverse from pending not added yet + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Same as before: full dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) // Same as before + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(1), len(suiTransfers)) // just the inverse from before + assert.Equal(t, int64(-10000), sumSui) // Unchanged. + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Stage 3: Message that reduces Governor usage for Ethereum (Sui --> Ethereum) + result, err = gov.ProcessMsgForTime(&msg3, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + // Stage 3: Governor usage reduced on Ethereum due to incoming from Sui + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 4, numTrans) // Two transfers and their inverses + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 1, numPending) // Not released yet + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(2), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // We have not yet released the pending transfer + assert.Equal(t, int(2), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9000), sumEth) // We freed up room because of Sui incoming + assert.Equal(t, int(2), len(ethTransfers)) // Two transfers cancel each other out + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(2), len(suiTransfers)) + assert.Equal(t, int64(-9000), sumSui) // We consumed some outbound capacity + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero because it's still negative + require.NoError(t, err) + + // Stage 4: Release the pending transfer. We deliberately do not advance the time here because we are relying + // on the pending transfer being released as a result of flow-cancelling and not because 24 hours have passed. + // NOTE that even though the function says "Checked..." it modifies `gov` as a side-effect when a pending + // transfer is ready to be released + toBePublished, err := gov.CheckPendingForTime(time.Unix(int64(transferTime.Unix()-1000), 0)) + require.NoError(t, err) + assert.Equal(t, 1, len(toBePublished)) + + // Stage 4: Pending transfer released. This increases the Ethereum Governor usage again and reduces Sui. + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 6, numTrans) // Two new transfers created from previous pending transfer + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 0, numPending) // Pending transfer has been released + assert.Equal(t, uint64(0), valuePending) + + // Verify the stats that are non flow-cancelling. + // In practice this is the sum of the absolute value of all the transfers, including the inverses. + // 2 * (10000 + 1000 + 500) = 23000 + _, absValueTrans, _, _ := gov.getStatsForAllChains() + assert.Equal(t, uint64(23000), absValueTrans) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(3), len(chainEntryEthereum.transfers)) // Two outbound, one inverse from Sui + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // Released + assert.Equal(t, int(3), len(chainEntrySui.transfers)) // One outbound, two inverses from Ethereum + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9500), sumEth) + assert.Equal(t, int(3), len(ethTransfers)) + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(3), len(suiTransfers)) // New inverse transfer added after pending transfer was released + assert.Equal(t, int64(-9500), sumSui) // Flow-cancelling inverse transfer added to Sui after released + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero + require.NoError(t, err) +} + func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx)