Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Fix issue where transfers that were loaded from the DB did not add a flow-cancel transfer on the TargetChain #4002

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions node/pkg/governor/governor_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func (gov *ChainGovernor) loadFromDB() error {
return gov.loadFromDBAlreadyLocked()
}

// loadFromDBAlreadyLocked method loads transfers and pending data from the database and modifies the corresponding fields in the ChainGovernor.
// These fields are slices of transfers or pendingTransfers and will be sorted by their Timestamp property.
// Modifies the state of the database as a side-effect: 'transfers' that are older than 24 hours are deleted.
func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
xfers, pending, err := gov.db.GetChainGovernorData(gov.logger)
if err != nil {
Expand Down Expand Up @@ -154,10 +157,16 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
zap.String("Hash", hash),
)

// Note: no flow cancel added here. We only want to add an inverse, flow-cancel transfer when the transfer is
// released from the pending queue, not when it's added.
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending})
gov.msgsSeen[hash] = transferEnqueued
}

// reloadTransfer method processes a db.Transfer and validates that it should be loaded into `gov`.
// Modifies `gov` as a side-effect: when a valid transfer is loaded, the properties 'transfers' and 'msgsSeen' are
// updated with information about the loaded transfer. In the case of a loading a transfer of a flow-canceling asset,
johnsaigle marked this conversation as resolved.
Show resolved Hide resolved
// both chain entries (emitter and target) will be updated.
func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
ce, exists := gov.chains[xfer.EmitterChain]
if !exists {
Expand Down Expand Up @@ -233,5 +242,27 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
return err
}
ce.transfers = append(ce.transfers, transfer)

// Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but
// that function does not capture flow-cancelling when the node is restarted.
tokenEntry := gov.tokens[tk]
if tokenEntry != nil {
johnsaigle marked this conversation as resolved.
Show resolved Hide resolved
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil {
return err
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", xfer.MsgID),
zap.Stringer("token chain", xfer.OriginChain),
zap.Stringer("token address", xfer.OriginAddress),
zap.Stringer("target chain", xfer.TargetChain),
)
}
}
}
return nil
}
199 changes: 199 additions & 0 deletions node/pkg/governor/governor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,205 @@ func TestDontReloadDuplicates(t *testing.T) {
assert.Equal(t, uint64(4436), valuePending)
}

// With the addition of the flow-cancel feature, it's possible to in a way "exceed the daily limit" of outflow from a
// Governor as long as there is corresponding inflow of a flow-canceling asset to allow for additional outflow.
// When the node is restarted, it reloads all transfers and pending transfers. If the actual outflow is greater than
// the daily limit (due to flow cancel) ensure that the calculated limit on start-up is correct.
// This test ensures that governor usage limits are correctly calculated when reloading transfers from the database.
func TestReloadTransfersNearCapacity(t *testing.T) {
// Setup
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)

require.NoError(t, err)
assert.NotNil(t, gov)

// Set-up time
gov.setDayLengthInMinutes(24 * 60)
transferTime := time.Unix(int64(1654543099), 0)
johnsaigle marked this conversation as resolved.
Show resolved Hide resolved

// Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works
// when the Origin chain of the asset does not match the emitter chain
// NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified
var flowCancelTokenOriginAddress vaa.Address
flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61")
require.NoError(t, err)

var notFlowCancelTokenOriginAddress vaa.Address
notFlowCancelTokenOriginAddress, err = vaa.StringToAddress("77777af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f7777")
require.NoError(t, err)

// Data for Ethereum
tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum)
require.NoError(t, err)
// recipientEthereum := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" //nolint:gosec

// Data for Sui
tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec
tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui)
require.NoError(t, err)
// recipientSui := "0x84a5f374d29fc77e370014dce4fd6a55b58ad608de8074b0be5571701724da31"
johnsaigle marked this conversation as resolved.
Show resolved Hide resolved

// Data for Solana. Only used to represent the flow cancel asset.
// "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb"
tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec

// Add chain entries to `gov`
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, 10000, 50000)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, 10000, 0)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, 10000, 0)
require.NoError(t, err)

// Add flow cancel asset and non-flow cancelable asset to the token entry for `gov`
err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true)
require.NoError(t, err)
assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}])
err = gov.setTokenForTesting(vaa.ChainIDEthereum, notFlowCancelTokenOriginAddress.String(), "NOTCANCELABLE", 1.0, false)
require.NoError(t, err)

// This transfer should exhaust the dailyLimit for the emitter chain
xfer1 := &db.Transfer{
Timestamp: transferTime,
Value: uint64(10000),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
TargetAddress: tokenBridgeAddrSui,
TargetChain: vaa.ChainIDSui,
MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125",
Hash: "Hash1",
}

// This incoming transfer should free up some of the space on the previous emitter chain
xfer2 := &db.Transfer{
Timestamp: transferTime.Add(1),
Value: uint64(2000),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDSui,
EmitterAddress: tokenBridgeAddrSui,
TargetAddress: tokenBridgeAddrEthereum,
TargetChain: vaa.ChainIDEthereum,
MsgID: "2/" + tokenBridgeAddrSui.String() + "/126",
Hash: "Hash2",
}

// Send another transfer out from the original emitter chain so that we "exceed the daily limit" if flow
// cancel is not applied
xfer3 := &db.Transfer{
Timestamp: transferTime.Add(2),
Value: uint64(50),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
TargetAddress: tokenBridgeAddrSui,
TargetChain: vaa.ChainIDSui,
MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125",
Hash: "Hash3",
}

// Simulate reloading from the database.
// NOTE: The actual execution path we want to test is the following and runs when the node is restarted:
// gov.Run () --> gov.loadFromDb() --> gov.loadFromDBAlreadyLocked() --> gov.reloadTransfer()
// We don't have access to Run() from the test suite and the other functions are mocked to return `nil`.
// Therefore, the remainder of this test proceeds by operating on a list of `transfersLoadedFromDb` which
// simulates loading transfers from the database.
// From here we proceed with the next function we can actually test: `reloadTransfer()`.

// STEP 0: Initial state
assert.Equal(t, len(gov.msgsSeen), 0)
numTrans, netValueTransferred, numPending, valuePending := gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 0, numTrans)
assert.Equal(t, int64(0), netValueTransferred)
assert.Equal(t, 0, numPending)
assert.Equal(t, uint64(0), valuePending)

chainEntryEth, exists := gov.chains[vaa.ChainIDEthereum]
require.True(t, exists)
chainEntrySui, exists := gov.chains[vaa.ChainIDSui]
require.True(t, exists)

// STEP 1: Load first transfer
err = gov.reloadTransfer(xfer1)
require.NoError(t, err)
assert.Equal(t, len(gov.msgsSeen), 1)
numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 2, numTrans) // 1 plus transfer the inverse flow transfer on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000) = 20_000
_, valueTransferred, _, _ := gov.getStatsForAllChains()
assert.Equal(t, uint64(20000), valueTransferred)

governorUsageEth, err := gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(10000), governorUsageEth)
assert.Zero(t, governorUsageEth-chainEntryEth.dailyLimit) // Make sure we used the whole capacity
require.NoError(t, err)
governorUsageSui, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
assert.Zero(t, governorUsageSui)
require.NoError(t, err)
sumTransfersSui, _, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-10000), sumTransfersSui)
require.NoError(t, err)

// STEP 2: Load second transfer
err = gov.reloadTransfer(xfer2)
require.NoError(t, err)
assert.Equal(t, len(gov.msgsSeen), 2)
numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 4, numTrans) // 2 transfers and their inverse flow transfers on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000 + 2_000) = 24_000
_, valueTransferred, _, _ = gov.getStatsForAllChains()
assert.Equal(t, uint64(24000), valueTransferred)

governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(8000), governorUsageEth)
assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 2000) // Remaining capacity
require.NoError(t, err)
governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, governorUsageSui)
require.NoError(t, err)
sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-8000), sumTransfersSui)
require.NoError(t, err)

// STEP 3: Load third transfer
err = gov.reloadTransfer(xfer3)
require.NoError(t, err)
// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000 + 2_000 + 50) = 24_100
_, valueTransferred, _, _ = gov.getStatsForAllChains()
assert.Equal(t, uint64(24100), valueTransferred)

numTrans, netValueTransferred, numPending, valuePending = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 6, numTrans) // 3 transfers and their inverse flow transfers on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(8050), governorUsageEth)
assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 1950) // Remaining capacity
require.NoError(t, err)
governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
require.NoError(t, err)
assert.Zero(t, governorUsageSui)
sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-8050), sumTransfersSui)
require.NoError(t, err)

// Sanity check: make sure these are still empty/zero
assert.Equal(t, 0, numPending)
assert.Equal(t, uint64(0), valuePending)
}

func TestReobservationOfPublishedMsg(t *testing.T) {
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)
Expand Down
Loading