diff --git a/node/pkg/common/guardianset.go b/node/pkg/common/guardianset.go index b1a06b3c21..2fedf90fa6 100644 --- a/node/pkg/common/guardianset.go +++ b/node/pkg/common/guardianset.go @@ -8,6 +8,7 @@ import ( gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/ethereum/go-ethereum/common" "github.com/libp2p/go-libp2p/core/peer" + "github.com/wormhole-foundation/wormhole/sdk/vaa" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -49,8 +50,30 @@ const MaxStateAge = 1 * time.Minute type GuardianSet struct { // Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes). Keys []common.Address + // On-chain set index Index uint32 + + // Quorum value for this set of keys + Quorum int + + // A map from address to index. Testing showed that, on average, a map is almost three times faster than a sequential search of the key slice. + // Testing also showed that the map was twice as fast as using a sorted slice and `slices.BinarySearchFunc`. That being said, on a 4GHz CPU, + // the sequential search takes an average of 800 nanos and the map look up takes about 260 nanos. Is this worth doing? + keyMap map[common.Address]int +} + +func NewGuardianSet(keys []common.Address, index uint32) *GuardianSet { + keyMap := map[common.Address]int{} + for idx, key := range keys { + keyMap[key] = idx + } + return &GuardianSet{ + Keys: keys, + Index: index, + Quorum: vaa.CalculateQuorum(len(keys)), + keyMap: keyMap, + } } func (g *GuardianSet) KeysAsHexStrings() []string { @@ -66,10 +89,12 @@ func (g *GuardianSet) KeysAsHexStrings() []string { // KeyIndex returns a given address index from the guardian set. Returns (-1, false) // if the address wasn't found and (addr, true) otherwise. func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) { - for n, k := range g.Keys { - if k == addr { - return n, true - } + if g.keyMap == nil { + panic("guardian set key map not initialized") + } + + if idx, found := g.keyMap[addr]; found { + return idx, true } return -1, false diff --git a/node/pkg/common/guardianset_test.go b/node/pkg/common/guardianset_test.go index c172515699..de936630b0 100644 --- a/node/pkg/common/guardianset_test.go +++ b/node/pkg/common/guardianset_test.go @@ -1,12 +1,42 @@ package common import ( + "reflect" "testing" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" + "github.com/wormhole-foundation/wormhole/sdk/vaa" ) +func TestNewGuardianSet(t *testing.T) { + keys := []common.Address{ + common.HexToAddress("0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"), + common.HexToAddress("0x88D7D8B32a9105d228100E72dFFe2Fae0705D31c"), + common.HexToAddress("0x58076F561CC62A47087B567C86f986426dFCD000"), + common.HexToAddress("0xBd6e9833490F8fA87c733A183CD076a6cBD29074"), + common.HexToAddress("0xb853FCF0a5C78C1b56D15fCE7a154e6ebe9ED7a2"), + common.HexToAddress("0xAF3503dBD2E37518ab04D7CE78b630F98b15b78a"), + common.HexToAddress("0x785632deA5609064803B1c8EA8bB2c77a6004Bd1"), + common.HexToAddress("0x09a281a698C0F5BA31f158585B41F4f33659e54D"), + common.HexToAddress("0x3178443AB76a60E21690DBfB17f7F59F09Ae3Ea1"), + common.HexToAddress("0x647ec26ae49b14060660504f4DA1c2059E1C5Ab6"), + common.HexToAddress("0x810AC3D8E1258Bd2F004a94Ca0cd4c68Fc1C0611"), + common.HexToAddress("0x80610e96d645b12f47ae5cf4546b18538739e90F"), + common.HexToAddress("0x2edb0D8530E31A218E72B9480202AcBaeB06178d"), + common.HexToAddress("0xa78858e5e5c4705CdD4B668FFe3Be5bae4867c9D"), + common.HexToAddress("0x5Efe3A05Efc62D60e1D19fAeB56A80223CDd3472"), + common.HexToAddress("0xD791b7D32C05aBB1cc00b6381FA0c4928f0c56fC"), + common.HexToAddress("0x14Bc029B8809069093D712A3fd4DfAb31963597e"), + common.HexToAddress("0x246Ab29FC6EBeDf2D392a51ab2Dc5C59d0902A03"), + common.HexToAddress("0x132A84dFD920b35a3D0BA5f7A0635dF298F9033e"), + } + gs := NewGuardianSet(keys, 1) + assert.True(t, reflect.DeepEqual(keys, gs.Keys)) + assert.Equal(t, uint32(1), gs.Index) + assert.Equal(t, vaa.CalculateQuorum(len(keys)), gs.Quorum) +} + func TestKeyIndex(t *testing.T) { type test struct { guardianSet GuardianSet @@ -15,13 +45,13 @@ func TestKeyIndex(t *testing.T) { keyIndex int } - guardianSet := GuardianSet{ - Keys: []common.Address{ + guardianSet := *NewGuardianSet( + []common.Address{ common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed"), common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaee"), }, - Index: 1, - } + 1, + ) tests := []test{ {guardianSet: guardianSet, address: "0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed", result: true, keyIndex: 0}, diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index a5cee6a568..40ecfa665a 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -17,11 +17,23 @@ import ( ) var ( - observationsBroadcastTotal = promauto.NewCounter( + observationsBroadcast = promauto.NewCounter( prometheus.CounterOpts{ - Name: "wormhole_observations_broadcast_total", + Name: "wormhole_observations_queued_for_broadcast", Help: "Total number of signed observations queued for broadcast", }) + + observationsPostedInternally = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "wormhole_observations_posted_internally", + Help: "Total number of our observations posted internally", + }) + + signedVAAsBroadcast = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "wormhole_signed_vaas_queued_for_broadcast", + Help: "Total number of signed vaas queued for broadcast", + }) ) func (p *Processor) broadcastSignature( @@ -45,9 +57,10 @@ func (p *Processor) broadcastSignature( panic(err) } + // Broadcast the observation. p.gossipSendC <- msg + observationsBroadcast.Inc() - // Store our VAA in case we're going to submit it to Solana hash := hex.EncodeToString(digest.Bytes()) if p.state.signatures[hash] == nil { @@ -75,7 +88,7 @@ func (p *Processor) broadcastSignature( go func() { p.obsvC <- om }() } - observationsBroadcastTotal.Inc() + observationsPostedInternally.Inc() } func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { @@ -93,7 +106,9 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { panic(err) } + // Broadcast the signed VAA. p.gossipSendC <- msg + signedVAAsBroadcast.Inc() if p.gatewayRelayer != nil { p.gatewayRelayer.SubmitVAA(v) diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index d9a9cfc7d9..31120409dc 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -115,8 +115,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { } hasSigs := len(s.signatures) - wantSigs := vaa.CalculateQuorum(len(gs.Keys)) - quorum := hasSigs >= wantSigs + quorum := hasSigs >= gs.Quorum var chain vaa.ChainID if s.ourObservation != nil { @@ -129,7 +128,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), - zap.Int("required_sigs", wantSigs), + zap.Int("required_sigs", gs.Quorum), zap.Bool("quorum", quorum), zap.Stringer("emitter_chain", chain), ) @@ -220,6 +219,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { zap.String("digest", hash), zap.Duration("delta", delta), zap.String("firstObserved", s.firstObserved.String()), + zap.Int("numSignatures", len(s.signatures)), ) req := &gossipv1.ObservationRequest{ ChainId: uint32(s.ourObservation.GetEmitterChain()), @@ -238,7 +238,6 @@ func (p *Processor) handleCleanup(ctx context.Context) { // network reached consensus without us. We don't know the correct guardian // set, so we simply use the most recent one. hasSigs := len(s.signatures) - wantSigs := vaa.CalculateQuorum(len(p.gs.Keys)) if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("expiring unsubmitted nil observation", @@ -246,8 +245,8 @@ func (p *Processor) handleCleanup(ctx context.Context) { zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), - zap.Int("required_sigs", wantSigs), - zap.Bool("quorum", hasSigs >= wantSigs), + zap.Int("required_sigs", p.gs.Quorum), + zap.Bool("quorum", hasSigs >= p.gs.Quorum), ) } delete(p.state.signatures, hash) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 0c92e761df..48e258e812 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -213,17 +213,22 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s.signatures[their_addr] = m.Signature - if s.ourObservation != nil { + if s.submitted { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("already submitted, doing nothing", + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + ) + } + } else if s.ourObservation != nil { // We have made this observation on chain! - quorum := vaa.CalculateQuorum(len(gs.Keys)) - // Check if we have more signatures than required for quorum. // s.signatures may contain signatures from multiple guardian sets during guardian set updates // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. // We will later check for quorum again after assembling the VAA for a particular guardian set. - if len(s.signatures) < quorum { + if len(s.signatures) < gs.Quorum { // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not yet met", @@ -245,18 +250,18 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.Any("set", gs.KeysAsHexStrings()), zap.Uint32("index", gs.Index), zap.Bools("aggregation", agg), - zap.Int("required_sigs", quorum), + zap.Int("required_sigs", gs.Quorum), zap.Int("have_sigs", len(sigsVaaFormat)), - zap.Bool("quorum", len(sigsVaaFormat) >= quorum), + zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum), ) } - if len(sigsVaaFormat) >= quorum && !s.submitted { + if len(sigsVaaFormat) >= gs.Quorum { // we have reached quorum *with the active guardian set* s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian + p.logger.Debug("quorum not met, doing nothing", zap.String("messageId", m.MessageId), zap.String("digest", hash), ) @@ -269,7 +274,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("digest", hash), ) } - } observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 3a1fdfaab9..4dab22f7fb 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -222,7 +222,9 @@ func (p *Processor) Run(ctx context.Context) error { case p.gs = <-p.setC: p.logger.Info("guardian set updated", zap.Strings("set", p.gs.KeysAsHexStrings()), - zap.Uint32("index", p.gs.Index)) + zap.Uint32("index", p.gs.Index), + zap.Int("quorum", p.gs.Quorum), + ) p.gst.Set(p.gs) case k := <-p.msgC: if p.governor != nil { diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 2cdb6122e0..787a948c7b 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -553,8 +553,10 @@ func (w *Watcher) Run(parentCtx context.Context) error { // Transaction is now ready if pLock.height <= blockNumberU { + msm := time.Now() timeout, cancel := context.WithTimeout(ctx, 5*time.Second) tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash) + queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds()) cancel() // If the node returns an error after waiting expectedConfirmation blocks, @@ -690,10 +692,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet( w.currentGuardianSet = &idx if w.setC != nil { - w.setC <- &common.GuardianSet{ - Keys: gs.Keys, - Index: idx, - } + w.setC <- common.NewGuardianSet(gs.Keys, idx) } return nil