From 905d6341f5498341174045ada39faf107e5fbd37 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Tue, 14 May 2024 16:04:15 -0500 Subject: [PATCH] Node: Multithreaded processor --- node/pkg/processor/broadcast.go | 25 +- node/pkg/processor/cleanup.go | 316 ++++++++++++++----------- node/pkg/processor/message.go | 5 +- node/pkg/processor/observation.go | 81 ++++--- node/pkg/processor/observation_test.go | 6 +- node/pkg/processor/processor.go | 200 +++++++++++----- 6 files changed, 381 insertions(+), 252 deletions(-) diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index a5cee6a568..279fd4dae4 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -2,12 +2,10 @@ package processor import ( "encoding/hex" - "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "google.golang.org/protobuf/proto" @@ -50,20 +48,19 @@ func (p *Processor) broadcastSignature( // Store our VAA in case we're going to submit it to Solana hash := hex.EncodeToString(digest.Bytes()) - if p.state.signatures[hash] == nil { - p.state.signatures[hash] = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethcommon.Address][]byte{}, - source: "loopback", - } + obsState, created := p.state.getOrCreateState(hash) + obsState.lock.Lock() + defer obsState.lock.Unlock() + + if created { + obsState.source = "loopback" } - p.state.signatures[hash].ourObservation = o - p.state.signatures[hash].ourMsg = msg - p.state.signatures[hash].txHash = txhash - p.state.signatures[hash].source = o.GetEmitterChain().String() - p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + obsState.ourObservation = o + obsState.ourMsg = msg + obsState.txHash = txhash + obsState.source = o.GetEmitterChain().String() + obsState.gs = p.gst.Get() // Fast path for our own signature // send to obsvC directly if there is capacity, otherwise do it in a go routine. diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index d9a9cfc7d9..ae6ba664fd 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -70,175 +70,196 @@ var ( // handleCleanup handles periodic retransmissions and cleanup of observations func (p *Processor) handleCleanup(ctx context.Context) { + p.cleanupState() + p.cleanupPythnetVaas() +} + +// cleanupState walks through the aggregation state map and cleans up entries that are no longer needed. It grabs the state lock. +func (p *Processor) cleanupState() { + p.state.signaturesLock.Lock() + defer p.state.signaturesLock.Unlock() + p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures))) aggregationStateEntries.Set(float64(len(p.state.signatures))) for hash, s := range p.state.signatures { - delta := time.Since(s.firstObserved) + if shouldDelete := p.cleanUpStateEntry(hash, s); shouldDelete { + delete(p.state.signatures, hash) // Can't use p.state.delete() because we're holding the lock. + } + } +} - if !s.submitted && s.ourObservation != nil && delta > settlementTime { - // Expire pending VAAs post settlement time if we have a stored quorum VAA. - // - // This occurs when we observed a message after the cluster has already reached - // consensus on it, causing us to never achieve quorum. - if ourVaa, ok := s.ourObservation.(*VAA); ok { - if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) { - // If we have a stored quorum VAA, we can safely expire the state. - // - // This is a rare case, and we can safely expire the state, since we - // have a quorum VAA. - p.logger.Info("Expiring late VAA", - zap.String("message_id", ourVaa.VAA.MessageID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - ) - aggregationStateLate.Inc() - delete(p.state.signatures, hash) - continue - } +// cleanUpStateEntry cleans up a single aggregation state entry. It grabs the lock for that entry. Returns true if the entry should be deleted. +func (p *Processor) cleanUpStateEntry(hash string, s *state) bool { + s.lock.Lock() + defer s.lock.Unlock() + + delta := time.Since(s.firstObserved) + + if !s.submitted && s.ourObservation != nil && delta > settlementTime { + // Expire pending VAAs post settlement time if we have a stored quorum VAA. + // + // This occurs when we observed a message after the cluster has already reached + // consensus on it, causing us to never achieve quorum. + if ourVaa, ok := s.ourObservation.(*VAA); ok { + if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) { + // If we have a stored quorum VAA, we can safely expire the state. + // + // This is a rare case, and we can safely expire the state, since we + // have a quorum VAA. + p.logger.Info("Expiring late VAA", + zap.String("message_id", ourVaa.VAA.MessageID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + ) + aggregationStateLate.Inc() + return true } } + } - switch { - case !s.settled && delta > settlementTime: - // After 30 seconds, the observation is considered settled - it's unlikely that more observations will - // arrive, barring special circumstances. This is a better time to count misses than submission, - // because we submit right when we quorum rather than waiting for all observations to arrive. - s.settled = true + switch { + case !s.settled && delta > settlementTime: + // After 30 seconds, the observation is considered settled - it's unlikely that more observations will + // arrive, barring special circumstances. This is a better time to count misses than submission, + // because we submit right when we quorum rather than waiting for all observations to arrive. + s.settled = true - // Use either the most recent (in case of a observation we haven't seen) or stored gs, if available. - var gs *common.GuardianSet - if s.gs != nil { - gs = s.gs - } else { - gs = p.gs + // Peg the appropriate settlement metric using the current guardian set. If we don't have a guardian set (extremely unlikely), we just won't peg the metric. + gs := s.gs + if gs == nil { + gs = p.gst.Get() + if gs == nil { + return false } + } - hasSigs := len(s.signatures) - wantSigs := vaa.CalculateQuorum(len(gs.Keys)) - quorum := hasSigs >= wantSigs + hasSigs := len(s.signatures) + wantSigs := vaa.CalculateQuorum(len(gs.Keys)) + quorum := hasSigs >= wantSigs - var chain vaa.ChainID - if s.ourObservation != nil { - chain = s.ourObservation.GetEmitterChain() - } + var chain vaa.ChainID + if s.ourObservation != nil { + chain = s.ourObservation.GetEmitterChain() + } - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("observation considered settled", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - zap.Int("have_sigs", hasSigs), - zap.Int("required_sigs", wantSigs), - zap.Bool("quorum", quorum), - zap.Stringer("emitter_chain", chain), - ) - } + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("observation considered settled", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.Int("have_sigs", hasSigs), + zap.Int("required_sigs", wantSigs), + zap.Bool("quorum", quorum), + zap.Stringer("emitter_chain", chain), + ) + } - for _, k := range gs.Keys { - if _, ok := s.signatures[k]; ok { - aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc() - } else { - aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc() - } + for _, k := range gs.Keys { + if _, ok := s.signatures[k]; ok { + aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc() + } else { + aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc() } - case s.submitted && delta.Hours() >= 1: - // We could delete submitted observations right away, but then we'd lose context about additional (late) - // observation that come in. Therefore, keep it for a reasonable amount of time. - // If a very late observation arrives after cleanup, a nil aggregation state will be created - // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian). - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("expiring submitted observation", + } + case s.submitted && delta.Hours() >= 1: + // We could delete submitted observations right away, but then we'd lose context about additional (late) + // observation that come in. Therefore, keep it for a reasonable amount of time. + // If a very late observation arrives after cleanup, a nil aggregation state will be created + // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian). + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("expiring submitted observation", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + ) + } + aggregationStateExpiration.Inc() + return true + case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)): + // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. + p.logger.Info("expiring unsubmitted observation after exhausting retries", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.Bool("weObserved", s.ourMsg != nil), + ) + aggregationStateTimeout.Inc() + return true + case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0: + // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. + // If we have previously submitted an observation, and it was reliable, we can make another attempt to get + // it over the finish line by sending a re-observation request to the network and rebroadcasting our + // sig. If we do not have an observation, it means we either never observed it, or it got + // revived by a malfunctioning guardian node, in which case, we can't do anything about it + // and just delete it to keep our state nice and lean. + if s.ourMsg != nil { + // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes + if !s.ourObservation.IsReliable() { + p.logger.Info("expiring unsubmitted unreliable observation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), ) + aggregationStateTimeout.Inc() + return true } - delete(p.state.signatures, hash) - aggregationStateExpiration.Inc() - case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)): - // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. - p.logger.Info("expiring unsubmitted observation after exhausting retries", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - zap.Bool("weObserved", s.ourMsg != nil), - ) - delete(p.state.signatures, hash) - aggregationStateTimeout.Inc() - case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0: - // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. - // If we have previously submitted an observation, and it was reliable, we can make another attempt to get - // it over the finish line by sending a re-observation request to the network and rebroadcasting our - // sig. If we do not have an observation, it means we either never observed it, or it got - // revived by a malfunctioning guardian node, in which case, we can't do anything about it - // and just delete it to keep our state nice and lean. - if s.ourMsg != nil { - // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes - if !s.ourObservation.IsReliable() { - p.logger.Info("expiring unsubmitted unreliable observation", + + // Reobservation requests should not be resubmitted but we will keep waiting for more observations. + if s.ourObservation.IsReobservation() { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("not submitting reobservation request for reobservation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), ) - delete(p.state.signatures, hash) - aggregationStateTimeout.Inc() - break - } - - // Reobservation requests should not be resubmitted but we will keep waiting for more observations. - if s.ourObservation.IsReobservation() { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("not submitting reobservation request for reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - ) - } - break } + return false + } - // If we have already stored this VAA, there is no reason for us to request reobservation. - alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) - if err != nil { - p.logger.Error("failed to check if observation is already in DB, requesting reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("hash", hash), - zap.Error(err)) - } + // If we have already stored this VAA, there is no reason for us to request reobservation. + alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) + if err != nil { + p.logger.Error("failed to check if observation is already in DB, requesting reobservation", + zap.String("message_id", s.LoggingID()), + zap.String("hash", hash), + zap.Error(err)) + } - if alreadyInDB { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("observation already in DB, not requesting reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - ) - } - } else { - p.logger.Info("resubmitting observation", + if alreadyInDB { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("observation already in DB, not requesting reobservation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), - zap.Duration("delta", delta), - zap.String("firstObserved", s.firstObserved.String()), ) - req := &gossipv1.ObservationRequest{ - ChainId: uint32(s.ourObservation.GetEmitterChain()), - TxHash: s.txHash, - } - if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { - p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) - } - p.gossipSendC <- s.ourMsg - s.retryCtr++ - s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) - aggregationStateRetries.Inc() } } else { - // For nil state entries, we log the quorum to determine whether the - // network reached consensus without us. We don't know the correct guardian - // set, so we simply use the most recent one. - hasSigs := len(s.signatures) - wantSigs := vaa.CalculateQuorum(len(p.gs.Keys)) + p.logger.Info("resubmitting observation", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.String("firstObserved", s.firstObserved.String()), + ) + req := &gossipv1.ObservationRequest{ + ChainId: uint32(s.ourObservation.GetEmitterChain()), + TxHash: s.txHash, + } + if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { + p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) + } + p.gossipSendC <- s.ourMsg + s.retryCtr++ + s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) + aggregationStateRetries.Inc() + } + } else { + // For nil state entries, we log the quorum to determine whether the + // network reached consensus without us. We don't know the correct guardian + // set, so we simply use the most recent one. + hasSigs := len(s.signatures) + gs := p.gst.Get() + if gs != nil { + wantSigs := vaa.CalculateQuorum(len(gs.Keys)) if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("expiring unsubmitted nil observation", @@ -250,13 +271,28 @@ func (p *Processor) handleCleanup(ctx context.Context) { zap.Bool("quorum", hasSigs >= wantSigs), ) } - delete(p.state.signatures, hash) - aggregationStateUnobserved.Inc() + } else { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("expiring unsubmitted nil observation, gs is nil", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.Int("have_sigs", hasSigs), + ) + } } + aggregationStateUnobserved.Inc() + return true } } - // Clean up old pythnet VAAs. + return false +} + +// cleanupPythnetVaas deletes expired pythnet vaas. +func (p *Processor) cleanupPythnetVaas() { + p.pythnetVaaLock.Lock() + defer p.pythnetVaaLock.Unlock() oldestTime := time.Now().Add(-time.Hour) for key, pe := range p.pythnetVaas { if pe.updateTime.Before(oldestTime) { diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index c5351bf363..dcf30e0a20 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -38,7 +38,8 @@ var ( // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An // event may be received multiple times and must be handled in an idempotent fashion. func (p *Processor) handleMessage(k *common.MessagePublication) { - if p.gs == nil { + gs := p.gst.Get() + if gs == nil { p.logger.Warn("dropping observation since we haven't initialized our guardian set yet", zap.String("message_id", k.MessageIDString()), zap.Uint32("nonce", k.Nonce), @@ -67,7 +68,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { v := &VAA{ VAA: vaa.VAA{ Version: vaa.SupportedVAAVersion, - GuardianSetIndex: p.gs.Index, + GuardianSetIndex: gs.Index, Signatures: nil, Timestamp: k.Timestamp, Nonce: k.Nonce, diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 0c92e761df..e723e9ac15 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -82,10 +82,18 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW m := obs.Msg hash := hex.EncodeToString(m.Hash) - s := p.state.signatures[hash] - if s != nil && s.submitted { - // already submitted; ignoring additional signatures for it. - return + + obsState, created := p.state.getOrCreateState(hash) + obsState.lock.Lock() + defer obsState.lock.Unlock() + + if !created { + if obsState.submitted { + // already submitted; ignoring additional signatures for it. + return + } + } else { + obsState.source = "unknown" } if p.logger.Core().Enabled(zapcore.DebugLevel) { @@ -112,6 +120,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("addr", hex.EncodeToString(m.Addr)), zap.Error(err)) observationsFailedTotal.WithLabelValues("invalid_signature").Inc() + if created { + p.state.delete(hash) + } return } @@ -127,6 +138,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("addr", hex.EncodeToString(m.Addr)), zap.String("pk", signer_pk.Hex())) observationsFailedTotal.WithLabelValues("pubkey_mismatch").Inc() + if created { + p.state.delete(hash) + } return } @@ -146,39 +160,42 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW // // During an update, vaaState.signatures can contain signatures from *both* guardian sets. // - var gs *node_common.GuardianSet - if s != nil && s.gs != nil { - gs = s.gs - } else { - gs = p.gs + if obsState.gs == nil { + obsState.gs = p.gst.Get() } // We haven't yet observed the trusted guardian set on Ethereum, and therefore, it's impossible to verify it. // May as well not have received it/been offline - drop it and wait for the guardian set. - if gs == nil { + if obsState.gs == nil { p.logger.Warn("dropping observations since we haven't initialized our guardian set yet", zap.String("messageId", m.MessageId), zap.String("digest", hash), zap.String("their_addr", their_addr.Hex()), ) observationsFailedTotal.WithLabelValues("uninitialized_guardian_set").Inc() + if created { + p.state.delete(hash) + } return } // Verify that m.Addr is included in the guardian set. If it's not, drop the message. In case it's us // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually. - _, ok := gs.KeyIndex(their_addr) + _, ok := obsState.gs.KeyIndex(their_addr) if !ok { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("received observation by unknown guardian - is our guardian set outdated?", zap.String("messageId", m.MessageId), zap.String("digest", hash), zap.String("their_addr", their_addr.Hex()), - zap.Uint32("index", gs.Index), + zap.Uint32("index", obsState.gs.Index), //zap.Any("keys", gs.KeysAsHexStrings()), ) } observationsFailedTotal.WithLabelValues("unknown_guardian").Inc() + if created { + p.state.delete(hash) + } return } @@ -190,7 +207,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. - if s == nil { + if created { // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like. // However, we have established that a valid guardian has signed it, and therefore we can // already start aggregating signatures for it. @@ -200,30 +217,21 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them. observationsUnknownTotal.Inc() - - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[common.Address][]byte{}, - source: "unknown", - } - - p.state.signatures[hash] = s } - s.signatures[their_addr] = m.Signature + obsState.signatures[their_addr] = m.Signature - if s.ourObservation != nil { + if obsState.ourObservation != nil { // We have made this observation on chain! - quorum := vaa.CalculateQuorum(len(gs.Keys)) + quorum := vaa.CalculateQuorum(len(obsState.gs.Keys)) // Check if we have more signatures than required for quorum. // s.signatures may contain signatures from multiple guardian sets during guardian set updates // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. // We will later check for quorum again after assembling the VAA for a particular guardian set. - if len(s.signatures) < quorum { + if len(obsState.signatures) < quorum { // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not yet met", @@ -236,14 +244,16 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW // Now we *may* have quorum, depending on the guardian set in use. // Let's construct the VAA and check if we actually have quorum. - sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys) + sigsVaaFormat, agg := signaturesToVaaFormat(obsState.signatures, obsState.gs.Keys) if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian zap.String("messageId", m.MessageId), zap.String("digest", hash), - zap.Any("set", gs.KeysAsHexStrings()), - zap.Uint32("index", gs.Index), + zap.Any("set", obsState.gs.KeysAsHexStrings()), + zap.Uint32("index", obsState.gs.Index), + zap.Any("set", obsState.gs.KeysAsHexStrings()), + zap.Uint32("index", obsState.gs.Index), zap.Bools("aggregation", agg), zap.Int("required_sigs", quorum), zap.Int("have_sigs", len(sigsVaaFormat)), @@ -251,9 +261,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW ) } - if len(sigsVaaFormat) >= quorum && !s.submitted { + if len(sigsVaaFormat) >= quorum && !obsState.submitted { // we have reached quorum *with the active guardian set* - s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + obsState.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian @@ -288,12 +298,14 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored", zap.String("message_id", v.MessageID()), + zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())), ) } return } - if p.gs == nil { + gs := p.gst.Get() + if gs == nil { p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet", zap.String("message_id", v.MessageID()), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())), @@ -303,7 +315,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos } // Check if guardianSet doesn't have any keys - if len(p.gs.Keys) == 0 { + if len(gs.Keys) == 0 { p.logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys", zap.String("message_id", v.MessageID()), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())), @@ -312,8 +324,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos return } - if err := v.Verify(p.gs.Keys); err != nil { - // We format the error as part of the message so the tests can check for it. + if err := v.Verify(gs.Keys); err != nil { p.logger.Warn("dropping SignedVAAWithQuorum message because it failed verification: "+err.Error(), zap.String("message_id", v.MessageID())) return } diff --git a/node/pkg/processor/observation_test.go b/node/pkg/processor/observation_test.go index c384ffc641..0f126073b4 100644 --- a/node/pkg/processor/observation_test.go +++ b/node/pkg/processor/observation_test.go @@ -47,7 +47,7 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) { ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} - processor := Processor{} + processor := Processor{gst: common.NewGuardianSetState(nil)} processor.logger = observedLogger processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) @@ -110,8 +110,8 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) { ctx := context.Background() signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} - processor := Processor{} - processor.gs = &guardianSet + processor := Processor{gst: common.NewGuardianSetState(nil)} + processor.gst.Set(&guardianSet) processor.logger = observedLogger processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 3a1fdfaab9..44a8d5b702 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -5,6 +5,9 @@ import ( "crypto/ecdsa" "encoding/hex" "fmt" + "math" + "runtime" + "sync" "time" "github.com/certusone/wormhole/node/pkg/db" @@ -29,6 +32,10 @@ import ( var GovInterval = time.Minute var CleanupInterval = time.Second * 30 +// WorkerFactor is used to compute the number of workers to process requests. A value of 0.0 means single threaded mode. Anything else +// is multiplied by the number of cores to give the number of workers. In both cases, there is an additional house keeping routine. +const WorkerFactor = 2.0 + type ( // Observation defines the interface for any events observed by the guardian. Observation interface { @@ -50,6 +57,9 @@ type ( // state represents the local view of a given observation state struct { + // Mutex protects this particular state entry. + lock sync.Mutex + // First time this digest was seen (possibly even before we observed it ourselves). firstObserved time.Time // A re-observation request shall not be sent before this time. @@ -79,7 +89,9 @@ type ( // aggregationState represents the node's aggregation of guardian signatures. aggregationState struct { - signatures observationMap + // signaturesLock should be held when inserting / deleting / iterating over the map, but not when working with a single entry. + signaturesLock sync.Mutex + signatures observationMap } ) @@ -89,10 +101,34 @@ func (s *state) LoggingID() string { if s.ourObservation != nil { return s.ourObservation.MessageID() } - return hex.EncodeToString(s.txHash) } +// getOrCreateState returns the state for a given hash, creating it if it doesn't exist. It grabs the lock. +func (s *aggregationState) getOrCreateState(hash string) (*state, bool) { + s.signaturesLock.Lock() + defer s.signaturesLock.Unlock() + + created := false + if _, ok := s.signatures[hash]; !ok { + created = true + s.signatures[hash] = &state{ + firstObserved: time.Now(), + nextRetry: time.Now().Add(nextRetryDuration(0)), + signatures: make(map[ethcommon.Address][]byte), + } + } + + return s.signatures[hash], created +} + +// delete removes a state entry from the map. It grabs the lock. +func (s *aggregationState) delete(hash string) { + s.signaturesLock.Lock() + delete(s.signatures, hash) + s.signaturesLock.Unlock() +} + type PythNetVaaEntry struct { v *vaa.VAA updateTime time.Time // Used for determining when to delete entries @@ -123,8 +159,6 @@ type Processor struct { // Runtime state - // gs is the currently valid guardian set - gs *common.GuardianSet // gst is managed by the processor and allows concurrent access to the // guardian set by other components. gst *common.GuardianSetState @@ -137,6 +171,7 @@ type Processor struct { governor *governor.ChainGovernor acct *accountant.Accountant acctReadC <-chan *common.MessagePublication + pythnetVaaLock sync.Mutex pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer } @@ -185,8 +220,8 @@ func NewProcessor( gst: gst, db: db, - logger: supervisor.Logger(ctx), - state: &aggregationState{observationMap{}}, + logger: supervisor.Logger(ctx).With(zap.String("component", "processor")), + state: &aggregationState{signatures: observationMap{}}, ourAddr: crypto.PubkeyToAddress(gk.PublicKey), governor: g, acct: acct, @@ -197,33 +232,109 @@ func NewProcessor( } func (p *Processor) Run(ctx context.Context) error { - cleanup := time.NewTicker(CleanupInterval) + errC := make(chan error) + + if WorkerFactor < 0.0 { + return fmt.Errorf("workerFactor must be positive or zero") + } + + var numWorkers int + if WorkerFactor == 0.0 { + numWorkers = 1 + p.logger.Info("processor running in single worker mode") + } else { + numWorkers = int(math.Ceil(float64(runtime.NumCPU()) * WorkerFactor)) + p.logger.Info("processor configured to use workers", zap.Int("numWorkers", numWorkers), zap.Float64("workerFactor", WorkerFactor)) + } + + // Start the routine to do housekeeping tasks that don't need to be distributed to the workers. + common.RunWithScissors(ctx, errC, "processor_housekeeper", p.runHousekeeper) + + // Start the workers. + for workerId := 1; workerId <= numWorkers; workerId++ { + workerId := workerId + common.RunWithScissors(ctx, errC, fmt.Sprintf("processor_worker_%d", workerId), p.runWorker) + } + + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case e := <-errC: + err = e + } + + // Log these as warnings so they show up in the benchmark logs. + metric := &dto.Metric{} + _ = observationChanDelay.Write(metric) + p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationChannelDelay", metric.String())) + + metric = &dto.Metric{} + _ = observationTotalDelay.Write(metric) + p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String())) + + return err +} + +// runHousekeeper performs general tasks that do not need to be distributed to the workers. There will always be exactly one instance of this. +func (p *Processor) runHousekeeper(ctx context.Context) error { + // Always start the timers to avoid nil pointer dereferences below. They will only be rearmed on worker 1. + cleanup := time.NewTimer(CleanupInterval) + defer cleanup.Stop() - // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that. govTimer := time.NewTimer(GovInterval) + defer cleanup.Stop() for { select { case <-ctx.Done(): - if p.acct != nil { - p.acct.Close() + return ctx.Err() + case gs := <-p.setC: + p.logger.Info("guardian set updated", + zap.Strings("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index)) + p.gst.Set(gs) + case <-cleanup.C: + cleanup.Reset(CleanupInterval) + p.handleCleanup(ctx) + case <-govTimer.C: + if p.governor != nil { + toBePublished, err := p.governor.CheckPending() + if err != nil { + return err + } + if len(toBePublished) != 0 { + for _, k := range toBePublished { + // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message. + if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil { + return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err) + } else if !msgIsGoverned { + return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString()) + } + if p.acct != nil { + shouldPub, err := p.acct.SubmitObservation(k) + if err != nil { + return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err) + } + if !shouldPub { + continue + } + } + p.handleMessage(k) + } + } + govTimer.Reset(GovInterval) } + } + } +} - // Log these as warnings so they show up in the benchmark logs. - metric := &dto.Metric{} - _ = observationChanDelay.Write(metric) - p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationChannelDelay", metric.String())) - - metric = &dto.Metric{} - _ = observationTotalDelay.Write(metric) - p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String())) - +// runWorker performs the per-observation tasks that can be distributed to the workers. There will be at least one of these. +func (p *Processor) runWorker(ctx context.Context) error { + for { + select { + case <-ctx.Done(): return ctx.Err() - case p.gs = <-p.setC: - p.logger.Info("guardian set updated", - zap.Strings("set", p.gs.KeysAsHexStrings()), - zap.Uint32("index", p.gs.Index)) - p.gst.Set(p.gs) case k := <-p.msgC: if p.governor != nil { if !p.governor.ProcessMsg(k) { @@ -240,7 +351,6 @@ func (p *Processor) Run(ctx context.Context) error { } } p.handleMessage(k) - case k := <-p.acctReadC: if p.acct == nil { return fmt.Errorf("received an accountant event when accountant is not configured") @@ -255,38 +365,6 @@ func (p *Processor) Run(ctx context.Context) error { p.handleObservation(ctx, m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(ctx, m) - case <-cleanup.C: - p.handleCleanup(ctx) - case <-govTimer.C: - if p.governor != nil { - toBePublished, err := p.governor.CheckPending() - if err != nil { - return err - } - if len(toBePublished) != 0 { - for _, k := range toBePublished { - // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message. - if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil { - return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err) - } else if !msgIsGoverned { - return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString()) - } - if p.acct != nil { - shouldPub, err := p.acct.SubmitObservation(k) - if err != nil { - return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err) - } - if !shouldPub { - continue - } - } - p.handleMessage(k) - } - } - } - if (p.governor != nil) || (p.acct != nil) { - govTimer.Reset(GovInterval) - } } } } @@ -294,7 +372,11 @@ func (p *Processor) Run(ctx context.Context) error { func (p *Processor) storeSignedVAA(v *vaa.VAA) error { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) - p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} + p.pythnetVaaLock.Lock() + defer p.pythnetVaaLock.Unlock() + if p.pythnetVaas != nil { + p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} + } return nil } return p.db.StoreSignedVAA(v) @@ -303,6 +385,8 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) error { // haveSignedVAA returns true if we already have a VAA for the given VAAID func (p *Processor) haveSignedVAA(id db.VAAID) bool { if id.EmitterChain == vaa.ChainIDPythNet { + p.pythnetVaaLock.Lock() + defer p.pythnetVaaLock.Unlock() if p.pythnetVaas == nil { return false }