From bf1efb48fe90477926724fcb13ae63be0ab74215 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Mon, 17 Jun 2024 12:34:34 -0500 Subject: [PATCH 01/11] Node: Processor db write separation --- node/pkg/processor/cleanup.go | 37 ++++++++++++--------- node/pkg/processor/observation.go | 8 +---- node/pkg/processor/processor.go | 55 ++++++++++++++++++++++++++++--- node/pkg/processor/vaa.go | 9 +---- 4 files changed, 74 insertions(+), 35 deletions(-) diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index bcc5e6cd6d..14c72d1019 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -271,29 +271,34 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) { return false, nil } - vaaID, err := db.VaaIDFromString(s.ourObservation.MessageID()) + msgId := s.ourObservation.MessageID() + vaaID, err := db.VaaIDFromString(msgId) if err != nil { return false, fmt.Errorf(`failed to generate VAA ID from message id "%s": %w`, s.ourObservation.MessageID(), err) } - vb, err := p.db.GetSignedVAABytes(*vaaID) - if err != nil { - if err == db.ErrVAANotFound { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("VAA not in DB", - zap.String("message_id", s.ourObservation.MessageID()), - zap.String("digest", hash), - ) + // If the VAA is waiting to be written to the DB, use that version. Otherwise use the DB. + v := p.getVaaFromUpdateMap(msgId) + if v == nil { + vb, err := p.db.GetSignedVAABytes(*vaaID) + if err != nil { + if err == db.ErrVAANotFound { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("VAA not in DB", + zap.String("message_id", s.ourObservation.MessageID()), + zap.String("digest", hash), + ) + } + return false, nil + } else { + return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } - return false, nil - } else { - return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } - } - v, err := vaa.Unmarshal(vb) - if err != nil { - return false, fmt.Errorf("failed to unmarshal VAA: %w", err) + v, err = vaa.Unmarshal(vb) + if err != nil { + return false, fmt.Errorf("failed to unmarshal VAA: %w", err) + } } oldHash := hex.EncodeToString(v.SigningDigest().Bytes()) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index e1545ce49c..2e15ea8fd4 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -337,11 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - if err := p.storeSignedVAA(v); err != nil { - p.logger.Error("failed to store signed VAA", - zap.String("message_id", v.MessageID()), - zap.Error(err), - ) - return - } + p.storeSignedVAA(v) } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index b1e88268ef..95a9c1cb4f 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "sync" "time" "github.com/certusone/wormhole/node/pkg/db" @@ -139,6 +140,8 @@ type Processor struct { acctReadC <-chan *common.MessagePublication pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer + updateVAALock sync.Mutex + updatedVAAs map[string]*vaa.VAA } var ( @@ -193,10 +196,15 @@ func NewProcessor( acctReadC: acctReadC, pythnetVaas: make(map[string]PythNetVaaEntry), gatewayRelayer: gatewayRelayer, + updatedVAAs: make(map[string]*vaa.VAA), } } func (p *Processor) Run(ctx context.Context) error { + if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil { + return fmt.Errorf("failed to start vaa writer: %w", err) + } + cleanup := time.NewTicker(CleanupInterval) // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that. @@ -293,13 +301,16 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA) error { +func (p *Processor) storeSignedVAA(v *vaa.VAA) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} - return nil + return } - return p.db.StoreSignedVAA(v) + key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) + p.updateVAALock.Lock() + p.updatedVAAs[key] = v + p.updateVAALock.Unlock() } // haveSignedVAA returns true if we already have a VAA for the given VAAID @@ -313,12 +324,16 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return exists } + key := fmt.Sprintf("%d/%v/%v", id.EmitterChain, id.EmitterAddress, id.Sequence) + if p.getVaaFromUpdateMap(key) != nil { + return true + } + if p.db == nil { return false } ok, err := p.db.HasVAA(id) - if err != nil { p.logger.Error("failed to look up VAA in database", zap.String("vaaID", string(id.Bytes())), @@ -329,3 +344,35 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return ok } + +func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { + p.updateVAALock.Lock() + v, exists := p.updatedVAAs[key] + p.updateVAALock.Unlock() + if !exists { + return nil + } + return v +} + +func (p *Processor) vaaWriter(ctx context.Context) error { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + p.updateVAALock.Lock() + updatedVAAs := p.updatedVAAs + p.updatedVAAs = make(map[string]*vaa.VAA) + p.updateVAALock.Unlock() + if len(updatedVAAs) != 0 { + for _, v := range updatedVAAs { + if err := p.db.StoreSignedVAA(v); err != nil { + p.logger.Error("failed to write VAA to database", zap.Error(err)) + } + } + } + } + } +} diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index d1ce007d81..6f6e8591e6 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -32,14 +32,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { zap.String("digest", hash), ) - if err := p.storeSignedVAA(signed); err != nil { - p.logger.Error("failed to store signed VAA", - zap.String("message_id", signed.MessageID()), - zap.String("digest", hash), - zap.Error(err), - ) - } - + p.storeSignedVAA(signed) p.broadcastSignedVAA(signed) p.state.signatures[hash].submitted = true } From e1a0c44f8dc672d4a03c6d274f393dfb1bb36a3a Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Wed, 19 Jun 2024 13:13:57 -0500 Subject: [PATCH 02/11] Handle additional update while writing to db --- node/pkg/processor/observation.go | 2 +- node/pkg/processor/processor.go | 60 +++++++++++++++++++++++++------ node/pkg/processor/vaa.go | 9 +---- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 2e15ea8fd4..f0d2312360 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - p.storeSignedVAA(v) + p.storeSignedVAA(v, "") } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 95a9c1cb4f..0b46ddd688 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -141,7 +141,14 @@ type Processor struct { pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer updateVAALock sync.Mutex - updatedVAAs map[string]*vaa.VAA + updatedVAAs map[string]*updateVaaEntry +} + +// updateVaaEntry is used to queue up a VAA to be written to the database. +type updateVaaEntry struct { + v *vaa.VAA + hashToLog string + dirty bool } var ( @@ -196,7 +203,7 @@ func NewProcessor( acctReadC: acctReadC, pythnetVaas: make(map[string]PythNetVaaEntry), gatewayRelayer: gatewayRelayer, - updatedVAAs: make(map[string]*vaa.VAA), + updatedVAAs: make(map[string]*updateVaaEntry), } } @@ -301,7 +308,7 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA) { +func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -309,7 +316,7 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) { } key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) p.updateVAALock.Lock() - p.updatedVAAs[key] = v + p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true} p.updateVAALock.Unlock() } @@ -345,16 +352,23 @@ func (p *Processor) haveSignedVAA(id db.VAAID) bool { return ok } +// getVaaFromUpdateMap gets the VAA from the local map. If it's not there, it returns nil. func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { p.updateVAALock.Lock() - v, exists := p.updatedVAAs[key] + entry, exists := p.updatedVAAs[key] p.updateVAALock.Unlock() if !exists { return nil } - return v + return entry.v } +// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map +// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA +// gets updated again while we are in the process of writing it to the database. +// +// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more +// than a millisecond. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -362,16 +376,40 @@ func (p *Processor) vaaWriter(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: + var updatedVAAs map[string]*updateVaaEntry p.updateVAALock.Lock() - updatedVAAs := p.updatedVAAs - p.updatedVAAs = make(map[string]*vaa.VAA) + if len(p.updatedVAAs) != 0 { + // There's something to write. Create a local copy of the map so we can release the lock. + updatedVAAs = make(map[string]*updateVaaEntry) + for key, entry := range p.updatedVAAs { + updatedVAAs[key] = entry + entry.dirty = false + } + } p.updateVAALock.Unlock() - if len(updatedVAAs) != 0 { - for _, v := range updatedVAAs { - if err := p.db.StoreSignedVAA(v); err != nil { + if updatedVAAs != nil { + // If there's anything to write, do that. + for _, entry := range updatedVAAs { + if err := p.db.StoreSignedVAA(entry.v); err != nil { p.logger.Error("failed to write VAA to database", zap.Error(err)) } + if entry.hashToLog != "" { + p.logger.Info("signed VAA with quorum", + zap.String("message_id", entry.v.MessageID()), + zap.String("digest", entry.hashToLog), + ) + } + } + + // Go through the map and delete anything we have written that hasn't been updated again. + // If something has been updated again, it will get written next interval. + p.updateVAALock.Lock() + for key, entry := range p.updatedVAAs { + if !entry.dirty { + delete(p.updatedVAAs, key) + } } + p.updateVAALock.Unlock() } } } diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 6f6e8591e6..f0a960fe2b 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -2,7 +2,6 @@ package processor import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" - "go.uber.org/zap" ) type VAA struct { @@ -26,13 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - // Store signed VAA in database. - p.logger.Info("signed VAA with quorum", - zap.String("message_id", signed.MessageID()), - zap.String("digest", hash), - ) - - p.storeSignedVAA(signed) + p.storeSignedVAA(signed, hash) p.broadcastSignedVAA(signed) p.state.signatures[hash].submitted = true } From e203f3781f44a8b3bd383a0815bee02fd4a9d745 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 20 Jun 2024 10:48:50 -0500 Subject: [PATCH 03/11] Move the broadcasting of signed VAA to the worker --- node/pkg/processor/observation.go | 2 +- node/pkg/processor/processor.go | 23 +++++++++++++++++------ node/pkg/processor/vaa.go | 3 +-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index f0d2312360..2e15ea8fd4 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -337,5 +337,5 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos ) } - p.storeSignedVAA(v, "") + p.storeSignedVAA(v) } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 0b46ddd688..f175fa0f0f 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -308,7 +308,19 @@ func (p *Processor) Run(ctx context.Context) error { } } -func (p *Processor) storeSignedVAA(v *vaa.VAA, hash string) { +// storeSignedVAA schedules a database update for a VAA. +func (p *Processor) storeSignedVAA(v *vaa.VAA) { + p.postUpdate(v, "") +} + +// postSignedVAA schedules handling of a newly signed VAA. This includes +// writing it to the database, broadcasting it and generating a log message. +func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) { + p.postUpdate(v, hash) +} + +// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine. +func (p *Processor) postUpdate(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -363,12 +375,10 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { return entry.v } -// vaaWriter is the routine that writes VAAs to the database once per minute. It creates a local copy of the map +// vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA -// gets updated again while we are in the process of writing it to the database. -// -// This routine also handles writing the "signed VAA with quorum" info log, since doing that inline can take more -// than a millisecond. +// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting +// a newly signed VAA and writing the "signed VAA with quorum" info log. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -398,6 +408,7 @@ func (p *Processor) vaaWriter(ctx context.Context) error { zap.String("message_id", entry.v.MessageID()), zap.String("digest", entry.hashToLog), ) + p.broadcastSignedVAA(entry.v) } } diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index f0a960fe2b..1029420931 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -25,8 +25,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - p.storeSignedVAA(signed, hash) - p.broadcastSignedVAA(signed) + p.postSignedVAA(signed, hash) p.state.signatures[hash].submitted = true } From 51a258bd63861a15ba7c93ca1b2c608e8448ffc3 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 20 Jun 2024 13:11:33 -0500 Subject: [PATCH 04/11] Tweak signaturesToVaaFormat --- node/pkg/processor/observation.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 2e15ea8fd4..54d3c7867d 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -46,11 +46,9 @@ var ( ) // signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys -// It also returns a bool array indicating which key in gsKeys had a signature // The processor state format is used for efficiently storing signatures during aggregation while the VAA format is more efficient for on-chain verification. -func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) { +func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) []*vaa.Signature { // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. - agg := make([]bool, len(gsKeys)) var sigs []*vaa.Signature for i, a := range gsKeys { sig, ok := signatures[a] @@ -66,10 +64,8 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common Signature: bs, }) } - - agg[i] = ok } - return sigs, agg + return sigs } // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, @@ -241,7 +237,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW // Now we *may* have quorum, depending on the guardian set in use. // Let's construct the VAA and check if we actually have quorum. - sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys) + sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian @@ -249,7 +245,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("digest", hash), zap.Any("set", gs.KeysAsHexStrings()), zap.Uint32("index", gs.Index), - zap.Bools("aggregation", agg), zap.Int("required_sigs", gs.Quorum()), zap.Int("have_sigs", len(sigsVaaFormat)), zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), From e5891b25942c9521c951090d23c4842d62d1929b Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 20 Jun 2024 13:34:27 -0500 Subject: [PATCH 05/11] Eliminate map look up in HandleQuorum --- node/pkg/processor/observation.go | 1 + node/pkg/processor/vaa.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 54d3c7867d..5af84cbe16 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -254,6 +254,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW if len(sigsVaaFormat) >= gs.Quorum() { // we have reached quorum *with the active guardian set* s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + s.submitted = true } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not met, doing nothing", diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 1029420931..f4896dad2e 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -26,7 +26,6 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { } p.postSignedVAA(signed, hash) - p.state.signatures[hash].submitted = true } func (v *VAA) IsReliable() bool { From e622d66b59866617b7ad32396c312ed6301a384f Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 20 Jun 2024 13:40:32 -0500 Subject: [PATCH 06/11] Remove unnecessary check for already submitted --- node/pkg/processor/observation.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 5af84cbe16..8b204e8613 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -209,14 +209,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s.signatures[their_addr] = m.Signature - if s.submitted { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("already submitted, doing nothing", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - } else if s.ourObservation != nil { + if s.ourObservation != nil { // We have made this observation on chain! // Check if we have more signatures than required for quorum. From 37c47edd0a87e74b72021cbdcbca90589fe22e72 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 28 Jun 2024 14:47:03 -0500 Subject: [PATCH 07/11] Use BadgerDB batch API to store VAAs --- node/pkg/db/db.go | 30 ++++++++++++++ node/pkg/db/db_test.go | 69 ++++++++++++++++++++++++++++++++- node/pkg/processor/processor.go | 9 +++-- 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 66810b30b4..def2cf517c 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -128,6 +128,36 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { return nil } +// StoreSignedVAABatch writes multiple VAAs to the database using the BadgerDB batch API. +// Note that the API takes care of splitting up the slice into the maximum allowed count +// and size so we don't need to worry about that. +func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error { + batchTx := d.db.NewWriteBatch() + defer batchTx.Cancel() + + for _, v := range vaaBatch { + if len(v.Signatures) == 0 { + panic("StoreSignedVAABatch called for unsigned VAA") + } + + b, err := v.Marshal() + if err != nil { + panic("StoreSignedVAABatch failed to marshall VAA") + } + + err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b) + if err != nil { + return err + } + + storedVaaTotal.Inc() + } + + // Wait for the batch to finish. + err := batchTx.Flush() + return err +} + func (d *Database) HasVAA(id VAAID) (bool, error) { err := d.db.View(func(txn *badger.Txn) error { _, err := txn.Get(id.Bytes()) diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index ffbdc2f804..5fde4c07d4 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -1,6 +1,7 @@ package db import ( + "bytes" "crypto/ecdsa" "crypto/rand" "fmt" @@ -22,6 +23,10 @@ import ( ) func getVAA() vaa.VAA { + return getVAAWithSeqNum(1) +} + +func getVAAWithSeqNum(seqNum uint64) vaa.VAA { var payload = []byte{97, 97, 97, 97, 97, 97} var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} @@ -31,7 +36,7 @@ func getVAA() vaa.VAA { Signatures: nil, Timestamp: time.Unix(0, 0), Nonce: uint32(1), - Sequence: uint64(1), + Sequence: seqNum, ConsistencyLevel: uint8(32), EmitterChain: vaa.ChainIDSolana, EmitterAddress: governanceEmitter, @@ -114,6 +119,68 @@ func TestStoreSignedVAASigned(t *testing.T) { assert.NoError(t, err2) } +func TestStoreSignedVAABatch(t *testing.T) { + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + + require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857. + require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329. + + // Make sure we exceed the max batch size. + numVAAs := uint64(db.db.MaxBatchCount() + 1) + + // Build the VAA batch. + vaaBatch := make([]*vaa.VAA, 0, numVAAs) + for seqNum := uint64(0); seqNum < numVAAs; seqNum++ { + v := getVAAWithSeqNum(seqNum) + v.AddSignature(privKey, 0) + vaaBatch = append(vaaBatch, &v) + } + + // Store the batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } + + // Verify that updates work as well by tweaking the VAAs and rewriting them. + for _, v := range vaaBatch { + v.Nonce += 1 + } + + // Store the updated batch in the database. + err = db.StoreSignedVAABatch(vaaBatch) + require.NoError(t, err) + + // Verify all the updated VAAs are in the database. + for _, v := range vaaBatch { + storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v)) + require.NoError(t, err) + + origBytes, err := v.Marshal() + require.NoError(t, err) + + assert.True(t, bytes.Equal(origBytes, storedBytes)) + } +} + func TestGetSignedVAABytes(t *testing.T) { dbPath := t.TempDir() db, err := Open(dbPath) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index f175fa0f0f..f5f4496169 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -399,10 +399,9 @@ func (p *Processor) vaaWriter(ctx context.Context) error { p.updateVAALock.Unlock() if updatedVAAs != nil { // If there's anything to write, do that. + vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs)) for _, entry := range updatedVAAs { - if err := p.db.StoreSignedVAA(entry.v); err != nil { - p.logger.Error("failed to write VAA to database", zap.Error(err)) - } + vaaBatch = append(vaaBatch, entry.v) if entry.hashToLog != "" { p.logger.Info("signed VAA with quorum", zap.String("message_id", entry.v.MessageID()), @@ -412,6 +411,10 @@ func (p *Processor) vaaWriter(ctx context.Context) error { } } + if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil { + p.logger.Error("failed to write VAAs to database", zap.Int("numVAAs", len(vaaBatch)), zap.Error(err)) + } + // Go through the map and delete anything we have written that hasn't been updated again. // If something has been updated again, it will get written next interval. p.updateVAALock.Lock() From 34f08d42b3797635b1c7c8be8425eda6197a94be Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Mon, 1 Jul 2024 13:22:44 -0500 Subject: [PATCH 08/11] Don't move broadcasting to worker --- node/pkg/db/db.go | 2 +- node/pkg/processor/processor.go | 28 ++++------------------------ node/pkg/processor/vaa.go | 10 +++++++++- 3 files changed, 14 insertions(+), 26 deletions(-) diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index def2cf517c..09e60fab0c 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -142,7 +142,7 @@ func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error { b, err := v.Marshal() if err != nil { - panic("StoreSignedVAABatch failed to marshall VAA") + panic("StoreSignedVAABatch failed to marshal VAA") } err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index f5f4496169..bc0588a998 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -146,9 +146,8 @@ type Processor struct { // updateVaaEntry is used to queue up a VAA to be written to the database. type updateVaaEntry struct { - v *vaa.VAA - hashToLog string - dirty bool + v *vaa.VAA + dirty bool } var ( @@ -310,17 +309,6 @@ func (p *Processor) Run(ctx context.Context) error { // storeSignedVAA schedules a database update for a VAA. func (p *Processor) storeSignedVAA(v *vaa.VAA) { - p.postUpdate(v, "") -} - -// postSignedVAA schedules handling of a newly signed VAA. This includes -// writing it to the database, broadcasting it and generating a log message. -func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) { - p.postUpdate(v, hash) -} - -// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine. -func (p *Processor) postUpdate(v *vaa.VAA, hash string) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} @@ -328,7 +316,7 @@ func (p *Processor) postUpdate(v *vaa.VAA, hash string) { } key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) p.updateVAALock.Lock() - p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true} + p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true} p.updateVAALock.Unlock() } @@ -377,8 +365,7 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA -// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting -// a newly signed VAA and writing the "signed VAA with quorum" info log. +// gets updated again while we are in the process of writing it to the database. func (p *Processor) vaaWriter(ctx context.Context) error { ticker := time.NewTicker(time.Second) for { @@ -402,13 +389,6 @@ func (p *Processor) vaaWriter(ctx context.Context) error { vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs)) for _, entry := range updatedVAAs { vaaBatch = append(vaaBatch, entry.v) - if entry.hashToLog != "" { - p.logger.Info("signed VAA with quorum", - zap.String("message_id", entry.v.MessageID()), - zap.String("digest", entry.hashToLog), - ) - p.broadcastSignedVAA(entry.v) - } } if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil { diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index f4896dad2e..7d41bbc244 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -2,6 +2,7 @@ package processor import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" ) type VAA struct { @@ -25,7 +26,14 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - p.postSignedVAA(signed, hash) + // Store signed VAA in database. + p.logger.Info("signed VAA with quorum", + zap.String("message_id", signed.MessageID()), + zap.String("digest", hash), + ) + + p.broadcastSignedVAA(signed) + p.storeSignedVAA(signed) } func (v *VAA) IsReliable() bool { From bf1a79c3cc7dce55084dd8958f8585f98833f532 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 5 Jul 2024 15:02:19 -0500 Subject: [PATCH 09/11] Speed up processing our own observation --- node/pkg/processor/broadcast.go | 36 ++++++++++++-------------- node/pkg/processor/observation.go | 21 ++++++++++----- node/pkg/processor/observation_test.go | 7 ++--- node/pkg/processor/processor.go | 18 +++++++++++-- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 641332de33..dd0e551690 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -10,7 +10,6 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/proto" - node_common "github.com/certusone/wormhole/node/pkg/common" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -35,6 +34,7 @@ var ( }) ) +// broadcastSignature broadcasts the observation for something we observed locally. func (p *Processor) broadcastSignature( o Observation, signature []byte, @@ -59,35 +59,33 @@ func (p *Processor) broadcastSignature( // Broadcast the observation. p.gossipSendC <- msg observationsBroadcast.Inc() + observationsPostedInternally.Inc() hash := hex.EncodeToString(digest.Bytes()) - if p.state.signatures[hash] == nil { - p.state.signatures[hash] = &state{ + s := p.state.signatures[hash] + if s == nil { + s = &state{ firstObserved: time.Now(), nextRetry: time.Now().Add(nextRetryDuration(0)), signatures: map[ethcommon.Address][]byte{}, source: "loopback", } - } - p.state.signatures[hash].ourObservation = o - p.state.signatures[hash].ourMsg = msg - p.state.signatures[hash].txHash = txhash - p.state.signatures[hash].source = o.GetEmitterChain().String() - p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - - // Fast path for our own signature - // send to obsvC directly if there is capacity, otherwise do it in a go routine. - // We can't block here because the same process would be responsible for reading from obsvC. - om := node_common.CreateMsgWithTimestamp[gossipv1.SignedObservation](&obsv) - select { - case p.obsvC <- om: - default: - go func() { p.obsvC <- om }() + p.state.signatures[hash] = s } - observationsPostedInternally.Inc() + s.ourObservation = o + s.ourMsg = msg + s.txHash = txhash + s.source = o.GetEmitterChain().String() + s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + s.signatures[p.ourAddr] = signature + + // Fast path for our own signature. + start := time.Now() + p.handleObservationAlreadyVerified(&obsv, s, s.gs, hash) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 8b204e8613..5a647cf758 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -2,7 +2,6 @@ package processor import ( - "context" "encoding/hex" "fmt" "time" @@ -70,12 +69,13 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { +func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + start := time.Now() observationsReceivedTotal.Inc() m := obs.Msg @@ -83,6 +83,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s := p.state.signatures[hash] if s != nil && s.submitted { // already submitted; ignoring additional signatures for it. + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -208,7 +209,16 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW } s.signatures[their_addr] = m.Signature + p.handleObservationAlreadyVerified(m, s, gs, hash) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) +} +// handleObservationAlreadyVerified handles an observation after it's validity has been confirmed. It is called both for local and external observations. +func (p *Processor) handleObservationAlreadyVerified(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { + if s.submitted { + return + } if s.ourObservation != nil { // We have made this observation on chain! @@ -246,8 +256,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW if len(sigsVaaFormat) >= gs.Quorum() { // we have reached quorum *with the active guardian set* + start := time.Now() s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) - s.submitted = true + timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not met, doing nothing", @@ -264,11 +275,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW ) } } - - observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) } -func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { +func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQuorum) { v, err := vaa.Unmarshal(m.Vaa) if err != nil { p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message", diff --git a/node/pkg/processor/observation_test.go b/node/pkg/processor/observation_test.go index c384ffc641..eeef648339 100644 --- a/node/pkg/processor/observation_test.go +++ b/node/pkg/processor/observation_test.go @@ -1,7 +1,6 @@ package processor import ( - "context" "crypto/ecdsa" "crypto/rand" "testing" @@ -45,12 +44,11 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.InfoLevel) observedLogger := zap.New(observedZapCore) - ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} processor := Processor{} processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) + processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) // Check to see if we got an error, which we should have, // because a `gs` is not defined on processor @@ -108,13 +106,12 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) { observedZapCore, observedLogs := observer.New(zap.InfoLevel) observedLogger := zap.New(observedZapCore) - ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} processor := Processor{} processor.gs = &guardianSet processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) + processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) // Check to see if we got an error, which we should have assert.Equal(t, 1, observedLogs.Len()) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index bc0588a998..e31402ccd2 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -164,6 +164,20 @@ var ( Help: "Latency histogram for total time to process signed observations", Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, }) + + timeToHandleObservation = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_observation_us", + Help: "Latency histogram for total time to handle observation on an observation", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) + + timeToHandleQuorum = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_quorum_us", + Help: "Latency histogram for total time to handle quorum on an observation", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) ) func NewProcessor( @@ -268,9 +282,9 @@ func (p *Processor) Run(ctx context.Context) error { p.handleMessage(k) case m := <-p.obsvC: observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) - p.handleObservation(ctx, m) + p.handleObservation(m) case m := <-p.signedInC: - p.handleInboundSignedVAAWithQuorum(ctx, m) + p.handleInboundSignedVAAWithQuorum(m) case <-cleanup.C: p.handleCleanup(ctx) case <-govTimer.C: From 9a050d075517f058439d5730729e126904ced852 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 5 Jul 2024 16:13:36 -0500 Subject: [PATCH 10/11] Simplify handleMessage and broadastSignature --- node/pkg/processor/broadcast.go | 50 +++------------ node/pkg/processor/cleanup.go | 4 +- node/pkg/processor/message.go | 62 ++++++++++-------- node/pkg/processor/observation.go | 100 +++++++++++++++--------------- node/pkg/processor/vaa.go | 3 +- 5 files changed, 100 insertions(+), 119 deletions(-) diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index dd0e551690..3c5cf859c0 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -1,13 +1,10 @@ package processor import ( - "encoding/hex" - "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ethcommon "github.com/ethereum/go-ethereum/common" + ethCommon "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/proto" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" @@ -21,12 +18,6 @@ var ( Help: "Total number of signed observations queued for broadcast", }) - observationsPostedInternally = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "wormhole_observations_posted_internally", - Help: "Total number of our observations posted internally", - }) - signedVAAsBroadcast = promauto.NewCounter( prometheus.CounterOpts{ Name: "wormhole_signed_vaas_queued_for_broadcast", @@ -36,17 +27,17 @@ var ( // broadcastSignature broadcasts the observation for something we observed locally. func (p *Processor) broadcastSignature( - o Observation, - signature []byte, + messageID string, txhash []byte, -) { - digest := o.SigningDigest() + digest ethCommon.Hash, + signature []byte, +) (*gossipv1.SignedObservation, []byte) { obsv := gossipv1.SignedObservation{ Addr: p.ourAddr.Bytes(), Hash: digest.Bytes(), Signature: signature, TxHash: txhash, - MessageId: o.MessageID(), + MessageId: messageID, } w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}} @@ -59,35 +50,10 @@ func (p *Processor) broadcastSignature( // Broadcast the observation. p.gossipSendC <- msg observationsBroadcast.Inc() - observationsPostedInternally.Inc() - - hash := hex.EncodeToString(digest.Bytes()) - - s := p.state.signatures[hash] - if s == nil { - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethcommon.Address][]byte{}, - source: "loopback", - } - - p.state.signatures[hash] = s - } - - s.ourObservation = o - s.ourMsg = msg - s.txHash = txhash - s.source = o.GetEmitterChain().String() - s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - s.signatures[p.ourAddr] = signature - - // Fast path for our own signature. - start := time.Now() - p.handleObservationAlreadyVerified(&obsv, s, s.gs, hash) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + return &obsv, msg } +// broadcastSignedVAA broadcasts a VAA to the gossip network. func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { b, err := v.Marshal() if err != nil { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index 14c72d1019..18843ffe33 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -290,9 +290,9 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) { ) } return false, nil - } else { - return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } + + return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } v, err = vaa.Unmarshal(vb) diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index c5351bf363..c23c0a55e9 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -2,12 +2,14 @@ package processor import ( "encoding/hex" + "time" "github.com/mr-tron/base58" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -26,13 +28,6 @@ var ( Help: "Total number of messages observed", }, []string{"emitter_chain"}) - - messagesSignedTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_message_observations_signed_total", - Help: "Total number of message observations that were successfully signed", - }, - []string{"emitter_chain"}) ) // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An @@ -48,18 +43,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { return } - if p.logger.Core().Enabled(zapcore.DebugLevel) { - p.logger.Debug("message publication confirmed", - zap.String("message_id", k.MessageIDString()), - zap.Uint32("nonce", k.Nonce), - zap.Stringer("txhash", k.TxHash), - zap.Time("timestamp", k.Timestamp), - ) - } - - messagesObservedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String(), - }).Add(1) + messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc() // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. @@ -83,9 +67,10 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { // Generate digest of the unsigned VAA. digest := v.SigningDigest() + hash := hex.EncodeToString(digest.Bytes()) // Sign the digest using our node's guardian key. - s, err := crypto.Sign(digest.Bytes(), p.gk) + signature, err := crypto.Sign(digest.Bytes(), p.gk) if err != nil { panic(err) } @@ -95,16 +80,43 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { zap.String("message_id", k.MessageIDString()), zap.Stringer("txhash", k.TxHash), zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), - zap.String("digest", hex.EncodeToString(digest.Bytes())), + zap.String("hash", hash), zap.Uint32("nonce", k.Nonce), + zap.Time("timestamp", k.Timestamp), zap.Uint8("consistency_level", k.ConsistencyLevel), - zap.String("signature", hex.EncodeToString(s)), + zap.String("signature", hex.EncodeToString(signature)), zap.Bool("isReobservation", k.IsReobservation), ) } - messagesSignedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String()}).Add(1) + // Broadcast the signature. + obsv, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature) - p.broadcastSignature(v, s, k.TxHash.Bytes()) + // Get / create our state entry. + s := p.state.signatures[hash] + if s == nil { + s = &state{ + firstObserved: time.Now(), + nextRetry: time.Now().Add(nextRetryDuration(0)), + signatures: map[ethCommon.Address][]byte{}, + source: "loopback", + } + + p.state.signatures[hash] = s + } + + // Update our state. + s.ourObservation = v + s.txHash = k.TxHash.Bytes() + s.source = v.GetEmitterChain().String() + s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + s.signatures[p.ourAddr] = signature + s.ourMsg = msg + + // Fast path for our own signature. + if !s.submitted { + start := time.Now() + p.checkForQuorum(obsv, s, s.gs, hash) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + } } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 5a647cf758..31a34b98c3 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -209,74 +209,76 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 } s.signatures[their_addr] = m.Signature - p.handleObservationAlreadyVerified(m, s, gs, hash) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) - observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) -} -// handleObservationAlreadyVerified handles an observation after it's validity has been confirmed. It is called both for local and external observations. -func (p *Processor) handleObservationAlreadyVerified(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { - if s.submitted { - return - } if s.ourObservation != nil { - // We have made this observation on chain! - - // Check if we have more signatures than required for quorum. - // s.signatures may contain signatures from multiple guardian sets during guardian set updates - // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, - // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. - // We will later check for quorum again after assembling the VAA for a particular guardian set. - if len(s.signatures) < gs.Quorum() { - // no quorum yet, we're done here - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not yet met", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - return + p.checkForQuorum(m, s, gs, hash) + } else { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("we have not yet seen this observation yet", + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + ) } + // Keep going to update metrics. + } - // Now we *may* have quorum, depending on the guardian set in use. - // Let's construct the VAA and check if we actually have quorum. - sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) +} +// checkForQuorum checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function +// is called both for local and external observations. It assumes we that we have made the observation ourselves but have not already submitted the VAA. +func (p *Processor) checkForQuorum(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { + // Check if we have more signatures than required for quorum. + // s.signatures may contain signatures from multiple guardian sets during guardian set updates + // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, + // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. + // We will later check for quorum again after assembling the VAA for a particular guardian set. + if len(s.signatures) < gs.Quorum() { + // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + p.logger.Debug("quorum not yet met", zap.String("messageId", m.MessageId), zap.String("digest", hash), - zap.Any("set", gs.KeysAsHexStrings()), - zap.Uint32("index", gs.Index), - zap.Int("required_sigs", gs.Quorum()), - zap.Int("have_sigs", len(sigsVaaFormat)), - zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), ) } + return + } - if len(sigsVaaFormat) >= gs.Quorum() { - // we have reached quorum *with the active guardian set* - start := time.Now() - s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) - timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) - } else { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not met, doing nothing", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - } - } else { + // Now we *may* have quorum, depending on the guardian set in use. + // Let's construct the VAA and check if we actually have quorum. + sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) + + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + zap.Any("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), + zap.Int("required_sigs", gs.Quorum()), + zap.Int("have_sigs", len(sigsVaaFormat)), + zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), + ) + } + + if len(sigsVaaFormat) < gs.Quorum() { if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian + p.logger.Debug("quorum not met, doing nothing", zap.String("messageId", m.MessageId), zap.String("digest", hash), ) } + return } + + // We have reached quorum *with the active guardian set*. + start := time.Now() + s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + s.submitted = true + timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } +// handleInboundSignedVAAWithQuorum takes a VAA received from the network. If we have not already seen it and it is valid, we store it in the database. func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQuorum) { v, err := vaa.Unmarshal(m.Vaa) if err != nil { diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 7d41bbc244..1f4a64fc86 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -11,6 +11,7 @@ type VAA struct { Reobservation bool } +// HandleQuorum is called when a VAA reaches quorum. It publishes the VAA to the gossip network and stores it in the database. func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { // Deep copy the observation and add signatures signed := &vaa.VAA{ @@ -26,12 +27,12 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - // Store signed VAA in database. p.logger.Info("signed VAA with quorum", zap.String("message_id", signed.MessageID()), zap.String("digest", hash), ) + // Broadcast the VAA and store it in the database. p.broadcastSignedVAA(signed) p.storeSignedVAA(signed) } From 0ebe68b70306db1cf8e47b74cd19b9ba26293995 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Tue, 23 Jul 2024 10:39:07 -0500 Subject: [PATCH 11/11] Code review rework --- node/pkg/db/db.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 09e60fab0c..04cc806fc9 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -149,12 +149,11 @@ func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error { if err != nil { return err } - - storedVaaTotal.Inc() } // Wait for the batch to finish. err := batchTx.Flush() + storedVaaTotal.Add(float64(len(vaaBatch))) return err }