Skip to content

Commit

Permalink
node: Use deterministic iteration order over chains when changing Gov…
Browse files Browse the repository at this point in the history
…ernor state

- Adds a field that stores a sorted slice of chain IDs to the governor.
- Use this field to iterate in a determinstic order when performing
  actions that change the state of the Governor
- This should help Guardians reach a more similar view of the Governor
  in scenarios where iteration order might impact whether a transfer is
  queued. (This is relevant especially in the case of Flow Canceling)
- Cases where only a single VAA is being modified were not changed.
  Iteration order should not matter here and determinstic order may
  may worse for performance when searching for a particular element.
  • Loading branch information
johnsaigle committed Jul 16, 2024
1 parent 193fd9e commit 2cea90b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
33 changes: 26 additions & 7 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"fmt"
"math"
"math/big"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -179,12 +180,15 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool {
}

type ChainGovernor struct {
db db.GovernorDB // protected by `mutex`
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry // protected by `mutex`
tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex`
chains map[vaa.ChainID]*chainEntry // protected by `mutex`
db db.GovernorDB // protected by `mutex`
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry // protected by `mutex`
tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex`
chains map[vaa.ChainID]*chainEntry // protected by `mutex`
// We maintain a sorted slice of governed chainIds so we can iterate over maps in a deterministic way
// This slice should be sorted in ascending order by (Wormhole) Chain ID.
chainIds []vaa.ChainID
msgsSeen map[string]bool // protected by `mutex` // Key is hash, payload is consts transferComplete and transferEnqueued.
msgsToPublish []*common.MessagePublication // protected by `mutex`
dayLengthInMinutes int
Expand Down Expand Up @@ -381,6 +385,19 @@ func (gov *ChainGovernor) initConfig() error {
return fmt.Errorf("no chains are configured")
}

// Populate a sorted list of chain IDs so that we can iterate over maps in a determinstic way.
// https://go.dev/blog/maps, "Iteration order" section
var governedChainIds []vaa.ChainID
for id := range gov.chains {
governedChainIds = append(governedChainIds, id)
}
// Custom sorting for the vaa.ChainID type
sort.Slice(governedChainIds, func(i, j int) bool {
return governedChainIds[i] < governedChainIds[j]
})

gov.chainIds = governedChainIds

return nil
}

Expand Down Expand Up @@ -675,7 +692,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
gov.msgsToPublish = nil
}

for _, ce := range gov.chains {
// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, cid := range gov.chainIds {
ce := gov.chains[cid]
// Keep going as long as we find something that will fit.
for {
foundOne := false
Expand Down
9 changes: 7 additions & 2 deletions node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (gov *ChainGovernor) Status() (resp string) {
defer gov.mutex.Unlock()

startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))

for _, ce := range gov.chains {
valueTrans, err := sumValue(ce.transfers, startTime)
if err != nil {
Expand Down Expand Up @@ -282,7 +283,9 @@ func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.Gov
defer gov.mutex.Unlock()

startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
for _, ce := range gov.chains {
// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, cid := range gov.chainIds {
ce := gov.chains[cid]
value, err := sumValue(ce.transfers, startTime)
if err != nil {
// Don't return an error here, just return 0
Expand Down Expand Up @@ -493,7 +496,9 @@ var governorMessagePrefixStatus = []byte("governor_status_000000000000000000|")

func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0)
for _, ce := range gov.chains {
// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, cid := range gov.chainIds {
ce := gov.chains[cid]
chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{
ChainId: uint32(ce.emitterChainId),
NotionalLimit: ce.dailyLimit,
Expand Down

0 comments on commit 2cea90b

Please sign in to comment.