Skip to content

Commit

Permalink
Simplify handleMessage and broadastSignature
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 9, 2024
1 parent 3d8da3c commit c92a2c6
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 119 deletions.
50 changes: 8 additions & 42 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package processor

import (
"encoding/hex"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

ethcommon "github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"google.golang.org/protobuf/proto"

gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
Expand All @@ -21,12 +18,6 @@ var (
Help: "Total number of signed observations queued for broadcast",
})

observationsPostedInternally = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_posted_internally",
Help: "Total number of our observations posted internally",
})

signedVAAsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_signed_vaas_queued_for_broadcast",
Expand All @@ -36,17 +27,17 @@ var (

// broadcastSignature broadcasts the observation for something we observed locally.
func (p *Processor) broadcastSignature(
o Observation,
signature []byte,
messageID string,
txhash []byte,
) {
digest := o.SigningDigest()
digest ethCommon.Hash,
signature []byte,
) (*gossipv1.SignedObservation, []byte) {
obsv := gossipv1.SignedObservation{
Addr: p.ourAddr.Bytes(),
Hash: digest.Bytes(),
Signature: signature,
TxHash: txhash,
MessageId: o.MessageID(),
MessageId: messageID,
}

w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}}
Expand All @@ -59,35 +50,10 @@ func (p *Processor) broadcastSignature(
// Broadcast the observation.
p.gossipSendC <- msg
observationsBroadcast.Inc()
observationsPostedInternally.Inc()

hash := hex.EncodeToString(digest.Bytes())

s := p.state.signatures[hash]
if s == nil {
s = &state{
firstObserved: time.Now(),
nextRetry: time.Now().Add(nextRetryDuration(0)),
signatures: map[ethcommon.Address][]byte{},
source: "loopback",
}

p.state.signatures[hash] = s
}

s.ourObservation = o
s.ourMsg = msg
s.txHash = txhash
s.source = o.GetEmitterChain().String()
s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
s.signatures[p.ourAddr] = signature

// Fast path for our own signature.
start := time.Now()
p.handleObservationAlreadyVerified(&obsv, s, s.gs, hash)
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
return &obsv, msg
}

// broadcastSignedVAA broadcasts a VAA to the gossip network.
func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
b, err := v.Marshal()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
)
}
return false, nil
} else {
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
}

return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
}

v, err = vaa.Unmarshal(vb)
Expand Down
62 changes: 37 additions & 25 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package processor

import (
"encoding/hex"
"time"

"github.com/mr-tron/base58"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -26,13 +28,6 @@ var (
Help: "Total number of messages observed",
},
[]string{"emitter_chain"})

messagesSignedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_message_observations_signed_total",
Help: "Total number of message observations that were successfully signed",
},
[]string{"emitter_chain"})
)

// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
Expand All @@ -48,18 +43,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
return
}

if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("message publication confirmed",
zap.String("message_id", k.MessageIDString()),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
}

messagesObservedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String(),
}).Add(1)
messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc()

// All nodes will create the exact same VAA and sign its digest.
// Consensus is established on this digest.
Expand All @@ -83,9 +67,10 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {

// Generate digest of the unsigned VAA.
digest := v.SigningDigest()
hash := hex.EncodeToString(digest.Bytes())

// Sign the digest using our node's guardian key.
s, err := crypto.Sign(digest.Bytes(), p.gk)
signature, err := crypto.Sign(digest.Bytes(), p.gk)
if err != nil {
panic(err)
}
Expand All @@ -95,16 +80,43 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
zap.String("message_id", k.MessageIDString()),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.String("digest", hex.EncodeToString(digest.Bytes())),
zap.String("hash", hash),
zap.Uint32("nonce", k.Nonce),
zap.Time("timestamp", k.Timestamp),
zap.Uint8("consistency_level", k.ConsistencyLevel),
zap.String("signature", hex.EncodeToString(s)),
zap.String("signature", hex.EncodeToString(signature)),
zap.Bool("isReobservation", k.IsReobservation),
)
}

messagesSignedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String()}).Add(1)
// Broadcast the signature.
obsv, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature)

p.broadcastSignature(v, s, k.TxHash.Bytes())
// Get / create our state entry.
s := p.state.signatures[hash]
if s == nil {
s = &state{
firstObserved: time.Now(),
nextRetry: time.Now().Add(nextRetryDuration(0)),
signatures: map[ethCommon.Address][]byte{},
source: "loopback",
}

p.state.signatures[hash] = s
}

// Update our state.
s.ourObservation = v
s.txHash = k.TxHash.Bytes()
s.source = v.GetEmitterChain().String()
s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
s.signatures[p.ourAddr] = signature
s.ourMsg = msg

// Fast path for our own signature.
if !s.submitted {
start := time.Now()
p.checkForQuorum(obsv, s, s.gs, hash)
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
}
}
100 changes: 51 additions & 49 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,74 +209,76 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1
}

s.signatures[their_addr] = m.Signature
p.handleObservationAlreadyVerified(m, s, gs, hash)
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
}

// handleObservationAlreadyVerified handles an observation after it's validity has been confirmed. It is called both for local and external observations.
func (p *Processor) handleObservationAlreadyVerified(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) {
if s.submitted {
return
}
if s.ourObservation != nil {
// We have made this observation on chain!

// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < gs.Quorum() {
// no quorum yet, we're done here
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not yet met",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
return
p.checkForQuorum(m, s, gs, hash)
} else {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("we have not yet seen this observation yet",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
// Keep going to update metrics.
}

// Now we *may* have quorum, depending on the guardian set in use.
// Let's construct the VAA and check if we actually have quorum.
sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys)
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
}

// checkForQuorum checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function
// is called both for local and external observations. It assumes we that we have made the observation ourselves but have not already submitted the VAA.
func (p *Processor) checkForQuorum(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) {
// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < gs.Quorum() {
// no quorum yet, we're done here
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
p.logger.Debug("quorum not yet met",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Int("required_sigs", gs.Quorum()),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()),
)
}
return
}

if len(sigsVaaFormat) >= gs.Quorum() {
// we have reached quorum *with the active guardian set*
start := time.Now()
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds()))
} else {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not met, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
}
} else {
// Now we *may* have quorum, depending on the guardian set in use.
// Let's construct the VAA and check if we actually have quorum.
sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys)

if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Int("required_sigs", gs.Quorum()),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()),
)
}

if len(sigsVaaFormat) < gs.Quorum() {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian
p.logger.Debug("quorum not met, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
return
}

// We have reached quorum *with the active guardian set*.
start := time.Now()
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
s.submitted = true
timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds()))
}

// handleInboundSignedVAAWithQuorum takes a VAA received from the network. If we have not already seen it and it is valid, we store it in the database.
func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQuorum) {
v, err := vaa.Unmarshal(m.Vaa)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type VAA struct {
Reobservation bool
}

// HandleQuorum is called when a VAA reaches quorum. It publishes the VAA to the gossip network and stores it in the database.
func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
// Deep copy the observation and add signatures
signed := &vaa.VAA{
Expand All @@ -26,12 +27,12 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
ConsistencyLevel: v.ConsistencyLevel,
}

// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
)

// Broadcast the VAA and store it in the database.
p.broadcastSignedVAA(signed)
p.storeSignedVAA(signed)
}
Expand Down

0 comments on commit c92a2c6

Please sign in to comment.