diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 66810b30b4..04cc806fc9 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -128,6 +128,35 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { return nil } +// StoreSignedVAABatch writes multiple VAAs to the database using the BadgerDB batch API. +// Note that the API takes care of splitting up the slice into the maximum allowed count +// and size so we don't need to worry about that. +func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error { + batchTx := d.db.NewWriteBatch() + defer batchTx.Cancel() + + for _, v := range vaaBatch { + if len(v.Signatures) == 0 { + panic("StoreSignedVAABatch called for unsigned VAA") + } + + b, err := v.Marshal() + if err != nil { + panic("StoreSignedVAABatch failed to marshal VAA") + } + + err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b) + if err != nil { + return err + } + } + + // Wait for the batch to finish. + err := batchTx.Flush() + storedVaaTotal.Add(float64(len(vaaBatch))) + return err +} + func (d *Database) HasVAA(id VAAID) (bool, error) { err := d.db.View(func(txn *badger.Txn) error { _, err := txn.Get(id.Bytes()) diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index ffbdc2f804..5fde4c07d4 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -1,6 +1,7 @@ package db import ( + "bytes" "crypto/ecdsa" "crypto/rand" "fmt" @@ -22,6 +23,10 @@ import ( ) func getVAA() vaa.VAA { + return getVAAWithSeqNum(1) +} + +func getVAAWithSeqNum(seqNum uint64) vaa.VAA { var payload = []byte{97, 97, 97, 97, 97, 97} var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} @@ -31,7 +36,7 @@ func getVAA() vaa.VAA { Signatures: nil, Timestamp: time.Unix(0, 0), Nonce: uint32(1), - Sequence: uint64(1), + Sequence: seqNum, ConsistencyLevel: uint8(32), EmitterChain: vaa.ChainIDSolana, EmitterAddress: governanceEmitter, @@ -114,6 +119,68 @@ func TestStoreSignedVAASigned(t *testing.T) { assert.NoError(t, err2) } +func TestStoreSignedVAABatch(t *testing.T) { + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + + require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857. + require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329. + + // Make sure we exceed the max batch size. + numVAAs := uint64(db.db.MaxBatchCount() + 1) + + // Build the VAA batch. + vaaBatch := make([]*vaa.VAA, 0, numVAAs) + for seqNum := uint64(0); seqNum < numVAAs; seqNum++ { + v := getVAAWithSeqNum(seqNum) + v.AddSignature(privKey, 0) + vaaBatch = append(vaaBatch, &v) + } + + // Store the batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } + + // Verify that updates work as well by tweaking the VAAs and rewriting them. + for _, v := range vaaBatch { + v.Nonce += 1 + } + + // Store the updated batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the updated VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } +} + func TestGetSignedVAABytes(t *testing.T) { dbPath := t.TempDir() db, err := Open(dbPath) diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 641332de33..3c5cf859c0 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -1,16 +1,12 @@ package processor import ( - "encoding/hex" - "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ethcommon "github.com/ethereum/go-ethereum/common" + ethCommon "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/proto" - node_common "github.com/certusone/wormhole/node/pkg/common" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -22,12 +18,6 @@ var ( Help: "Total number of signed observations queued for broadcast", }) - observationsPostedInternally = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "wormhole_observations_posted_internally", - Help: "Total number of our observations posted internally", - }) - signedVAAsBroadcast = promauto.NewCounter( prometheus.CounterOpts{ Name: "wormhole_signed_vaas_queued_for_broadcast", @@ -35,18 +25,19 @@ var ( }) ) +// broadcastSignature broadcasts the observation for something we observed locally. func (p *Processor) broadcastSignature( - o Observation, - signature []byte, + messageID string, txhash []byte, -) { - digest := o.SigningDigest() + digest ethCommon.Hash, + signature []byte, +) (*gossipv1.SignedObservation, []byte) { obsv := gossipv1.SignedObservation{ Addr: p.ourAddr.Bytes(), Hash: digest.Bytes(), Signature: signature, TxHash: txhash, - MessageId: o.MessageID(), + MessageId: messageID, } w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}} @@ -59,37 +50,10 @@ func (p *Processor) broadcastSignature( // Broadcast the observation. p.gossipSendC <- msg observationsBroadcast.Inc() - - hash := hex.EncodeToString(digest.Bytes()) - - if p.state.signatures[hash] == nil { - p.state.signatures[hash] = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethcommon.Address][]byte{}, - source: "loopback", - } - } - - p.state.signatures[hash].ourObservation = o - p.state.signatures[hash].ourMsg = msg - p.state.signatures[hash].txHash = txhash - p.state.signatures[hash].source = o.GetEmitterChain().String() - p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - - // Fast path for our own signature - // send to obsvC directly if there is capacity, otherwise do it in a go routine. - // We can't block here because the same process would be responsible for reading from obsvC. - om := node_common.CreateMsgWithTimestamp[gossipv1.SignedObservation](&obsv) - select { - case p.obsvC <- om: - default: - go func() { p.obsvC <- om }() - } - - observationsPostedInternally.Inc() + return &obsv, msg } +// broadcastSignedVAA broadcasts a VAA to the gossip network. func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { b, err := v.Marshal() if err != nil { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index bcc5e6cd6d..18843ffe33 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -271,29 +271,34 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) { return false, nil } - vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID()) + msgId := s.ourObservation.MessageID() + vaaID, err := db.VaaIDFromString(msgId) if err != nil { return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err) } - vb, err := p.db.GetSignedVAABytes(*vaaID) - if err != nil { - if err == db.ErrVAANotFound { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("VAA not in DB", - zap.String("message_id", s.ourObservation.MessageID()), - zap.String("digest", hash), - ) + // If the VAA is waiting to be written to the DB, use that version. Otherwise use the DB. + v := p.getVaaFromUpdateMap(msgId) + if v == nil { + vb, err := p.db.GetSignedVAABytes(*vaaID) + if err != nil { + if err == db.ErrVAANotFound { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("VAA not in DB", + zap.String("message_id", s.ourObservation.MessageID()), + zap.String("digest", hash), + ) + } + return false, nil } - return false, nil - } else { + return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } - } - v, err := vaa.Unmarshal(vb) - if err != nil { - return false, fmt.Errorf("failed to unmarshal VAA: %w", err) + v, err = vaa.Unmarshal(vb) + if err != nil { + return false, fmt.Errorf("failed to unmarshal VAA: %w", err) + } } oldHash := hex.EncodeToString(v.SigningDigest().Bytes()) diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index c5351bf363..c23c0a55e9 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -2,12 +2,14 @@ package processor import ( "encoding/hex" + "time" "github.com/mr-tron/base58" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -26,13 +28,6 @@ var ( Help: "Total number of messages observed", }, []string{"emitter_chain"}) - - messagesSignedTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_message_observations_signed_total", - Help: "Total number of message observations that were successfully signed", - }, - []string{"emitter_chain"}) ) // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An @@ -48,18 +43,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { return } - if p.logger.Core().Enabled(zapcore.DebugLevel) { - p.logger.Debug("message publication confirmed", - zap.String("message_id", k.MessageIDString()), - zap.Uint32("nonce", k.Nonce), - zap.Stringer("txhash", k.TxHash), - zap.Time("timestamp", k.Timestamp), - ) - } - - messagesObservedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String(), - }).Add(1) + messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc() // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. @@ -83,9 +67,10 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { // Generate digest of the unsigned VAA. digest := v.SigningDigest() + hash := hex.EncodeToString(digest.Bytes()) // Sign the digest using our node's guardian key. - s, err := crypto.Sign(digest.Bytes(), p.gk) + signature, err := crypto.Sign(digest.Bytes(), p.gk) if err != nil { panic(err) } @@ -95,16 +80,43 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { zap.String("message_id", k.MessageIDString()), zap.Stringer("txhash", k.TxHash), zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), - zap.String("digest", hex.EncodeToString(digest.Bytes())), + zap.String("hash", hash), zap.Uint32("nonce", k.Nonce), + zap.Time("timestamp", k.Timestamp), zap.Uint8("consistency_level", k.ConsistencyLevel), - zap.String("signature", hex.EncodeToString(s)), + zap.String("signature", hex.EncodeToString(signature)), zap.Bool("isReobservation", k.IsReobservation), ) } - messagesSignedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String()}).Add(1) + // Broadcast the signature. + obsv, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature) - p.broadcastSignature(v, s, k.TxHash.Bytes()) + // Get / create our state entry. + s := p.state.signatures[hash] + if s == nil { + s = &state{ + firstObserved: time.Now(), + nextRetry: time.Now().Add(nextRetryDuration(0)), + signatures: map[ethCommon.Address][]byte{}, + source: "loopback", + } + + p.state.signatures[hash] = s + } + + // Update our state. + s.ourObservation = v + s.txHash = k.TxHash.Bytes() + s.source = v.GetEmitterChain().String() + s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + s.signatures[p.ourAddr] = signature + s.ourMsg = msg + + // Fast path for our own signature. + if !s.submitted { + start := time.Now() + p.checkForQuorum(obsv, s, s.gs, hash) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + } } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index e1545ce49c..31a34b98c3 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -2,7 +2,6 @@ package processor import ( - "context" "encoding/hex" "fmt" "time" @@ -46,11 +45,9 @@ var ( ) // signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys -// It also returns a bool array indicating which key in gsKeys had a signature // The processor state format is used for efficiently storing signatures during aggregation while the VAA format is more efficient for on-chain verification. -func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) { +func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) []*vaa.Signature { // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. - agg := make([]bool, len(gsKeys)) var sigs []*vaa.Signature for i, a := range gsKeys { sig, ok := signatures[a] @@ -66,20 +63,19 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common Signature: bs, }) } - - agg[i] = ok } - return sigs, agg + return sigs } // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { +func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + start := time.Now() observationsReceivedTotal.Inc() m := obs.Msg @@ -87,6 +83,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s := p.state.signatures[hash] if s != nil && s.submitted { // already submitted; ignoring additional signatures for it. + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -213,73 +210,76 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s.signatures[their_addr] = m.Signature - if s.submitted { + if s.ourObservation != nil { + p.checkForQuorum(m, s, gs, hash) + } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("already submitted, doing nothing", + p.logger.Debug("we have not yet seen this observation yet", zap.String("messageId", m.MessageId), zap.String("digest", hash), ) } - } else if s.ourObservation != nil { - // We have made this observation on chain! - - // Check if we have more signatures than required for quorum. - // s.signatures may contain signatures from multiple guardian sets during guardian set updates - // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, - // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. - // We will later check for quorum again after assembling the VAA for a particular guardian set. - if len(s.signatures) < gs.Quorum() { - // no quorum yet, we're done here - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not yet met", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - return - } + // Keep going to update metrics. + } - // Now we *may* have quorum, depending on the guardian set in use. - // Let's construct the VAA and check if we actually have quorum. - sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) +} +// checkForQuorum checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function +// is called both for local and external observations. It assumes we that we have made the observation ourselves but have not already submitted the VAA. +func (p *Processor) checkForQuorum(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { + // Check if we have more signatures than required for quorum. + // s.signatures may contain signatures from multiple guardian sets during guardian set updates + // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, + // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. + // We will later check for quorum again after assembling the VAA for a particular guardian set. + if len(s.signatures) < gs.Quorum() { + // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + p.logger.Debug("quorum not yet met", zap.String("messageId", m.MessageId), zap.String("digest", hash), - zap.Any("set", gs.KeysAsHexStrings()), - zap.Uint32("index", gs.Index), - zap.Bools("aggregation", agg), - zap.Int("required_sigs", gs.Quorum()), - zap.Int("have_sigs", len(sigsVaaFormat)), - zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), ) } + return + } - if len(sigsVaaFormat) >= gs.Quorum() { - // we have reached quorum *with the active guardian set* - s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) - } else { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not met, doing nothing", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - } - } else { + // Now we *may* have quorum, depending on the guardian set in use. + // Let's construct the VAA and check if we actually have quorum. + sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) + + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + zap.Any("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), + zap.Int("required_sigs", gs.Quorum()), + zap.Int("have_sigs", len(sigsVaaFormat)), + zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), + ) + } + + if len(sigsVaaFormat) < gs.Quorum() { if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian + p.logger.Debug("quorum not met, doing nothing", zap.String("messageId", m.MessageId), zap.String("digest", hash), ) } + return } - observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) + // We have reached quorum *with the active guardian set*. + start := time.Now() + s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + s.submitted = true + timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } -func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { +// handleInboundSignedVAAWithQuorum takes a VAA received from the network. If we have not already seen it and it is valid, we store it in the database. +func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQuorum) { v, err := vaa.Unmarshal(m.Vaa) if err != nil { p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message", @@ -337,11 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - if err := p.storeSignedVAA(v); err != nil { - p.logger.Error("failed to store signed VAA", - zap.String("message_id", v.MessageID()), - zap.Error(err), - ) - return - } + p.storeSignedVAA(v) } diff --git a/node/pkg/processor/observation_test.go b/node/pkg/processor/observation_test.go index c384ffc641..eeef648339 100644 --- a/node/pkg/processor/observation_test.go +++ b/node/pkg/processor/observation_test.go @@ -1,7 +1,6 @@ package processor import ( - "context" "crypto/ecdsa" "crypto/rand" "testing" @@ -45,12 +44,11 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.InfoLevel) observedLogger := zap.New(observedZapCore) - ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} processor := Processor{} processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) + processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) // Check to see if we got an error, which we should have, // because a `gs` is not defined on processor @@ -108,13 +106,12 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.InfoLevel) observedLogger := zap.New(observedZapCore) - ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} processor := Processor{} processor.gs = &guardianSet processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) + processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) // Check to see if we got an error, which we should have assert.Equal(t, 1, observedLogs.Len()) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index b1e88268ef..e31402ccd2 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "sync" "time" "github.com/certusone/wormhole/node/pkg/db" @@ -139,6 +140,14 @@ type Processor struct { acctReadC <-chan *common.MessagePublication pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer + updateVAALock sync.Mutex + updatedVAAs map[string]*updateVaaEntry +} + +// updateVaaEntry is used to queue up a VAA to be written to the database. +type updateVaaEntry struct { + v *vaa.VAA + dirty bool } var ( @@ -155,6 +164,20 @@ var ( Help: "Latency histogram for total time to process signed observations", Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, }) + + timeToHandleObservation = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_observation_us", + Help: "Latency histogram for total time to handle observation on an observation", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) + + timeToHandleQuorum = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_quorum_us", + Help: "Latency histogram for total time to handle quorum on an observation", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) ) func NewProcessor( @@ -193,10 +216,15 @@ func NewProcessor( acctReadC: acctReadC, pythnetVaas: make(map[string]PythNetVaaEntry), gatewayRelayer: gatewayRelayer, + updatedVAAs: make(map[string]*updateVaaEntry), } } func (p *Processor) Run(ctx context.Context) error { + if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil { + return fmt.Errorf("failed to start vaa writer: %w", err) + } + cleanup := time.NewTicker(CleanupInterval) // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that. @@ -254,9 +282,9 @@ func (p *Processor) Run(ctx context.Context) error { p.handleMessage(k) case m := <-p.obsvC: observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) - p.handleObservation(ctx, m) + p.handleObservation(m) case m := <-p.signedInC: - p.handleInboundSignedVAAWithQuorum(ctx, m) + p.handleInboundSignedVAAWithQuorum(m) case <-cleanup.C: p.handleCleanup(ctx) case <-govTimer.C: @@ -293,13 +321,17 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA) error { +// storeSignedVAA schedules a database update for a VAA. +func (p *Processor) storeSignedVAA(v *vaa.VAA) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} - return nil + return } - return p.db.StoreSignedVAA(v) + key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) + p.updateVAALock.Lock() + p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true} + p.updateVAALock.Unlock() } // haveSignedVAA returns true if we already have a VAA for the given VAAID @@ -313,12 +345,16 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return exists } + key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence) + if p.getVaaFromUpdateMap(key) != nil { + return true + } + if p.db == nil { return false } ok, err := p.db.HasVAA(id) - if err != nil { p.logger.Error("failed to look up VAA in database", zap.String("vaaID", string(id.Bytes())), @@ -329,3 +365,60 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return ok } + +// getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil. +func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { + p.updateVAALock.Lock() + entry, exists := p.updatedVAAs[key] + p.updateVAALock.Unlock() + if !exists { + return nil + } + return entry.v +} + +// vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map +// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA +// gets updated again while we are in the process of writing it to the database. +func (p *Processor) vaaWriter(ctx context.Context) error { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + var updatedVAAs map[string]*updateVaaEntry + p.updateVAALock.Lock() + if len(p.updatedVAAs) != 0 { + // There's something to write. Create a local copy of the map so we can release the lock. + updatedVAAs = make(map[string]*updateVaaEntry) + for key, entry := range p.updatedVAAs { + updatedVAAs[key] = entry + entry.dirty = false + } + } + p.updateVAALock.Unlock() + if updatedVAAs != nil { + // If there's anything to write, do that. + vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs)) + for _, entry := range updatedVAAs { + vaaBatch = append(vaaBatch, entry.v) + } + + if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil { + p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err)) + } + + // Go through the map and delete anything we have written that hasn't been updated again. + // If something has been updated again, it will get written next interval. + p.updateVAALock.Lock() + for key, entry := range p.updatedVAAs { + if !entry.dirty { + delete(p.updatedVAAs, key) + } + } + p.updateVAALock.Unlock() + } + } + } +} diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index d1ce007d81..1f4a64fc86 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -11,6 +11,7 @@ type VAA struct { Reobservation bool } +// HandleQuorum is called when a VAA reaches quorum. It publishes the VAA to the gossip network and stores it in the database. func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { // Deep copy the observation and add signatures signed := &vaa.VAA{ @@ -26,22 +27,14 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - // Store signed VAA in database. p.logger.Info("signed VAA with quorum", zap.String("message_id", signed.MessageID()), zap.String("digest", hash), ) - if err := p.storeSignedVAA(signed); err != nil { - p.logger.Error("failed to store signed VAA", - zap.String("message_id", signed.MessageID()), - zap.String("digest", hash), - zap.Error(err), - ) - } - + // Broadcast the VAA and store it in the database. p.broadcastSignedVAA(signed) - p.state.signatures[hash].submitted = true + p.storeSignedVAA(signed) } func (v *VAA) IsReliable() bool {