Skip to content

Commit

Permalink
Don't move broadcasting to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 2, 2024
1 parent ce730f8 commit dfa81ea
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 26 deletions.
2 changes: 1 addition & 1 deletion node/pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error {

b, err := v.Marshal()
if err != nil {
panic("StoreSignedVAABatch failed to marshall VAA")
panic("StoreSignedVAABatch failed to marshal VAA")
}

err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b)
Expand Down
28 changes: 4 additions & 24 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ type Processor struct {

// updateVaaEntry is used to queue up a VAA to be written to the database.
type updateVaaEntry struct {
v *vaa.VAA
hashToLog string
dirty bool
v *vaa.VAA
dirty bool
}

var (
Expand Down Expand Up @@ -310,25 +309,14 @@ func (p *Processor) Run(ctx context.Context) error {

// storeSignedVAA schedules a database update for a VAA.
func (p *Processor) storeSignedVAA(v *vaa.VAA) {
p.postUpdate(v, "")
}

// postSignedVAA schedules handling of a newly signed VAA. This includes
// writing it to the database, broadcasting it and generating a log message.
func (p *Processor) postSignedVAA(v *vaa.VAA, hash string) {
p.postUpdate(v, hash)
}

// postUpdate adds a VAA to the map of VAAs to be updated by the vaa writer routine.
func (p *Processor) postUpdate(v *vaa.VAA, hash string) {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return
}
key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence)
p.updateVAALock.Lock()
p.updatedVAAs[key] = &updateVaaEntry{v: v, hashToLog: hash, dirty: true}
p.updatedVAAs[key] = &updateVaaEntry{v: v, dirty: true}
p.updateVAALock.Unlock()
}

Expand Down Expand Up @@ -377,8 +365,7 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA {

// vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map
// being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA
// gets updated again while we are in the process of writing it to the database. This routine also handles broadcasting
// a newly signed VAA and writing the "signed VAA with quorum" info log.
// gets updated again while we are in the process of writing it to the database.
func (p *Processor) vaaWriter(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
for {
Expand All @@ -402,13 +389,6 @@ func (p *Processor) vaaWriter(ctx context.Context) error {
vaaBatch := make([]*vaa.VAA, 0, len(updatedVAAs))
for _, entry := range updatedVAAs {
vaaBatch = append(vaaBatch, entry.v)
if entry.hashToLog != "" {
p.logger.Info("signed VAA with quorum",
zap.String("message_id", entry.v.MessageID()),
zap.String("digest", entry.hashToLog),
)
p.broadcastSignedVAA(entry.v)
}
}

if err := p.db.StoreSignedVAABatch(vaaBatch); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package processor

import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)

type VAA struct {
Expand All @@ -25,7 +26,14 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
ConsistencyLevel: v.ConsistencyLevel,
}

p.postSignedVAA(signed, hash)
// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
)

p.broadcastSignedVAA(signed)
p.storeSignedVAA(signed)
}

func (v *VAA) IsReliable() bool {
Expand Down

0 comments on commit dfa81ea

Please sign in to comment.