Skip to content

Commit

Permalink
Node: Metric and performance tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed May 14, 2024
1 parent 6294969 commit 4611558
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 32 deletions.
33 changes: 29 additions & 4 deletions node/pkg/common/guardianset.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -49,8 +50,30 @@ const MaxStateAge = 1 * time.Minute
type GuardianSet struct {
// Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes).
Keys []common.Address

// On-chain set index
Index uint32

// Quorum value for this set of keys
Quorum int

// A map from address to index. Testing showed that, on average, a map is almost three times faster than a sequential search of the key slice.
// Testing also showed that the map was twice as fast as using a sorted slice and `slices.BinarySearchFunc`. That being said, on a 4GHz CPU,
// the sequential search takes an average of 800 nanos and the map look up takes about 260 nanos. Is this worth doing?
keyMap map[common.Address]int
}

func NewGuardianSet(keys []common.Address, index uint32) *GuardianSet {
keyMap := map[common.Address]int{}
for idx, key := range keys {
keyMap[key] = idx
}
return &GuardianSet{
Keys: keys,
Index: index,
Quorum: vaa.CalculateQuorum(len(keys)),
keyMap: keyMap,
}
}

func (g *GuardianSet) KeysAsHexStrings() []string {
Expand All @@ -66,10 +89,12 @@ func (g *GuardianSet) KeysAsHexStrings() []string {
// KeyIndex returns a given address index from the guardian set. Returns (-1, false)
// if the address wasn't found and (addr, true) otherwise.
func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
for n, k := range g.Keys {
if k == addr {
return n, true
}
if g.keyMap == nil {
panic("guardian set key map not initialized")
}

if idx, found := g.keyMap[addr]; found {
return idx, true
}

return -1, false
Expand Down
38 changes: 34 additions & 4 deletions node/pkg/common/guardianset_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,42 @@
package common

import (
"reflect"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

func TestNewGuardianSet(t *testing.T) {
keys := []common.Address{
common.HexToAddress("0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"),
common.HexToAddress("0x88D7D8B32a9105d228100E72dFFe2Fae0705D31c"),
common.HexToAddress("0x58076F561CC62A47087B567C86f986426dFCD000"),
common.HexToAddress("0xBd6e9833490F8fA87c733A183CD076a6cBD29074"),
common.HexToAddress("0xb853FCF0a5C78C1b56D15fCE7a154e6ebe9ED7a2"),
common.HexToAddress("0xAF3503dBD2E37518ab04D7CE78b630F98b15b78a"),
common.HexToAddress("0x785632deA5609064803B1c8EA8bB2c77a6004Bd1"),
common.HexToAddress("0x09a281a698C0F5BA31f158585B41F4f33659e54D"),
common.HexToAddress("0x3178443AB76a60E21690DBfB17f7F59F09Ae3Ea1"),
common.HexToAddress("0x647ec26ae49b14060660504f4DA1c2059E1C5Ab6"),
common.HexToAddress("0x810AC3D8E1258Bd2F004a94Ca0cd4c68Fc1C0611"),
common.HexToAddress("0x80610e96d645b12f47ae5cf4546b18538739e90F"),
common.HexToAddress("0x2edb0D8530E31A218E72B9480202AcBaeB06178d"),
common.HexToAddress("0xa78858e5e5c4705CdD4B668FFe3Be5bae4867c9D"),
common.HexToAddress("0x5Efe3A05Efc62D60e1D19fAeB56A80223CDd3472"),
common.HexToAddress("0xD791b7D32C05aBB1cc00b6381FA0c4928f0c56fC"),
common.HexToAddress("0x14Bc029B8809069093D712A3fd4DfAb31963597e"),
common.HexToAddress("0x246Ab29FC6EBeDf2D392a51ab2Dc5C59d0902A03"),
common.HexToAddress("0x132A84dFD920b35a3D0BA5f7A0635dF298F9033e"),
}
gs := NewGuardianSet(keys, 1)
assert.True(t, reflect.DeepEqual(keys, gs.Keys))
assert.Equal(t, uint32(1), gs.Index)
assert.Equal(t, vaa.CalculateQuorum(len(keys)), gs.Quorum)
}

func TestKeyIndex(t *testing.T) {
type test struct {
guardianSet GuardianSet
Expand All @@ -15,13 +45,13 @@ func TestKeyIndex(t *testing.T) {
keyIndex int
}

guardianSet := GuardianSet{
Keys: []common.Address{
guardianSet := *NewGuardianSet(
[]common.Address{
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed"),
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaee"),
},
Index: 1,
}
1,
)

tests := []test{
{guardianSet: guardianSet, address: "0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed", result: true, keyIndex: 0},
Expand Down
23 changes: 19 additions & 4 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ import (
)

var (
observationsBroadcastTotal = promauto.NewCounter(
observationsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_broadcast_total",
Name: "wormhole_observations_queued_for_broadcast",
Help: "Total number of signed observations queued for broadcast",
})

observationsPostedInternally = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_posted_internally",
Help: "Total number of our observations posted internally",
})

signedVAAsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_signed_vaas_queued_for_broadcast",
Help: "Total number of signed vaas queued for broadcast",
})
)

func (p *Processor) broadcastSignature(
Expand All @@ -45,9 +57,10 @@ func (p *Processor) broadcastSignature(
panic(err)
}

// Broadcast the observation.
p.gossipSendC <- msg
observationsBroadcast.Inc()

// Store our VAA in case we're going to submit it to Solana
hash := hex.EncodeToString(digest.Bytes())

if p.state.signatures[hash] == nil {
Expand Down Expand Up @@ -75,7 +88,7 @@ func (p *Processor) broadcastSignature(
go func() { p.obsvC <- om }()
}

observationsBroadcastTotal.Inc()
observationsPostedInternally.Inc()
}

func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
Expand All @@ -93,7 +106,9 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
panic(err)
}

// Broadcast the signed VAA.
p.gossipSendC <- msg
signedVAAsBroadcast.Inc()

if p.gatewayRelayer != nil {
p.gatewayRelayer.SubmitVAA(v)
Expand Down
11 changes: 5 additions & 6 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}

hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
quorum := hasSigs >= wantSigs
quorum := hasSigs >= gs.Quorum

var chain vaa.ChainID
if s.ourObservation != nil {
Expand All @@ -129,7 +128,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Int("required_sigs", gs.Quorum),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
Expand Down Expand Up @@ -220,6 +219,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.String("firstObserved", s.firstObserved.String()),
zap.Int("numSignatures", len(s.signatures)),
)
req := &gossipv1.ObservationRequest{
ChainId: uint32(s.ourObservation.GetEmitterChain()),
Expand All @@ -238,16 +238,15 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// network reached consensus without us. We don't know the correct guardian
// set, so we simply use the most recent one.
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))

if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring unsubmitted nil observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
zap.Int("required_sigs", p.gs.Quorum),
zap.Bool("quorum", hasSigs >= p.gs.Quorum),
)
}
delete(p.state.signatures, hash)
Expand Down
22 changes: 13 additions & 9 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,22 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW

s.signatures[their_addr] = m.Signature

if s.ourObservation != nil {
if s.submitted {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("already submitted, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
} else if s.ourObservation != nil {
// We have made this observation on chain!

quorum := vaa.CalculateQuorum(len(gs.Keys))

// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < quorum {
if len(s.signatures) < gs.Quorum {
// no quorum yet, we're done here
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not yet met",
Expand All @@ -245,18 +250,18 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("required_sigs", gs.Quorum),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= quorum),
zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum),
)
}

if len(sigsVaaFormat) >= quorum && !s.submitted {
if len(sigsVaaFormat) >= gs.Quorum {
// we have reached quorum *with the active guardian set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
p.logger.Debug("quorum not met, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
Expand All @@ -269,7 +274,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.String("digest", hash),
)
}

}

observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
Expand Down
4 changes: 3 additions & 1 deletion node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func (p *Processor) Run(ctx context.Context) error {
case p.gs = <-p.setC:
p.logger.Info("guardian set updated",
zap.Strings("set", p.gs.KeysAsHexStrings()),
zap.Uint32("index", p.gs.Index))
zap.Uint32("index", p.gs.Index),
zap.Int("quorum", p.gs.Quorum),
)
p.gst.Set(p.gs)
case k := <-p.msgC:
if p.governor != nil {
Expand Down
7 changes: 3 additions & 4 deletions node/pkg/watchers/evm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,10 @@ func (w *Watcher) Run(parentCtx context.Context) error {

// Transaction is now ready
if pLock.height <= blockNumberU {
msm := time.Now()
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds())
cancel()

// If the node returns an error after waiting expectedConfirmation blocks,
Expand Down Expand Up @@ -690,10 +692,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet(
w.currentGuardianSet = &idx

if w.setC != nil {
w.setC <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
w.setC <- common.NewGuardianSet(gs.Keys, idx)
}

return nil
Expand Down

0 comments on commit 4611558

Please sign in to comment.