diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 1d99e4c684..5e498ea778 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -240,6 +240,8 @@ var ( chainGovernorEnabled *bool governorFlowCancelEnabled *bool + processorWorkerFactor *float64 + ccqEnabled *bool ccqAllowedRequesters *string ccqP2pPort *uint @@ -450,6 +452,7 @@ func init() { chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor") governorFlowCancelEnabled = NodeCmd.Flags().Bool("governorFlowCancelEnabled", false, "Enable flow cancel on the governor") + processorWorkerFactor = NodeCmd.Flags().Float64("processorWorkerFactor", 0.0, "Multiplied by the number of available CPUs on the system to determine the number of workers that the processor uses. 0.0 means single worker") ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support") ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries") @@ -1639,7 +1642,7 @@ func runNode(cmd *cobra.Command, args []string) { node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures), node.GuardianOptionStatusServer(*statusAddr), - node.GuardianOptionProcessor(*p2pNetworkID), + node.GuardianOptionProcessor(*p2pNetworkID, *processorWorkerFactor), } if shouldStart(publicGRPCSocketPath) { diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 378f2acc0f..bfa295ccca 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "regexp" + "runtime" "strconv" "strings" "testing" @@ -196,7 +197,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""), GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap), GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)), - GuardianOptionProcessor(networkID), + GuardianOptionProcessor(networkID, 3.0/float64(runtime.NumCPU())), // Create three workers. } guardianNode := NewGuardianNode( diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index c5c5b014b7..86dbedbd45 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -575,7 +575,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption { // GuardianOptionProcessor enables the default processor, which is required to make consensus on messages. // Dependencies: db, governor, accountant -func GuardianOptionProcessor(networkId string) *GuardianOption { +func GuardianOptionProcessor(networkId string, workerFactor float64) *GuardianOption { return &GuardianOption{ name: "processor", // governor and accountant may be set to nil, but that choice needs to be made before the processor is configured @@ -600,6 +600,7 @@ func GuardianOptionProcessor(networkId string) *GuardianOption { g.acctC.readC, g.gatewayRelayer, networkId, + workerFactor, ).Run return nil diff --git a/node/pkg/processor/benchmark_test.go b/node/pkg/processor/benchmark_test.go index 0995317940..c844c266b9 100644 --- a/node/pkg/processor/benchmark_test.go +++ b/node/pkg/processor/benchmark_test.go @@ -174,11 +174,10 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db * gossipAttestationSendC: pd.gossipAttestationSendC, gossipVaaSendC: pd.gossipVaaSendC, guardianSigner: ourSigner, - gs: gs, gst: gst, db: db, logger: logger, - state: &aggregationState{observationMap{}}, + state: &aggregationState{signatures: observationMap{}}, ourAddr: crypto.PubkeyToAddress(ourSigner.PublicKey()), pythnetVaas: make(map[string]PythNetVaaEntry), updatedVAAs: make(map[string]*updateVaaEntry), diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index 496d5e9952..3ffba766e4 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -2,7 +2,6 @@ package processor import ( - "context" "encoding/hex" "fmt" "time" @@ -69,66 +68,85 @@ var ( ) // handleCleanup handles periodic retransmissions and cleanup of observations -func (p *Processor) handleCleanup(ctx context.Context) { +func (p *Processor) handleCleanup() { + p.cleanupState() + p.cleanupPythnetVaas() +} + +// cleanupState walks through the aggregation state map and cleans up entries that are no longer needed. It grabs the state lock. +func (p *Processor) cleanupState() { + p.state.signaturesLock.Lock() + defer p.state.signaturesLock.Unlock() + p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures))) aggregationStateEntries.Set(float64(len(p.state.signatures))) for hash, s := range p.state.signatures { - delta := time.Since(s.firstObserved) - - if !s.submitted && s.ourObservation != nil && delta > settlementTime { - // Expire pending VAAs post settlement time if we have a stored quorum VAA. - // - // This occurs when we observed a message after the cluster has already reached - // consensus on it, causing us to never achieve quorum. - if ourVaa, ok := s.ourObservation.(*VAA); ok { - if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) { - // If we have a stored quorum VAA, we can safely expire the state. - // - // This is a rare case, and we can safely expire the state, since we - // have a quorum VAA. - p.logger.Info("Expiring late VAA", - zap.String("message_id", ourVaa.VAA.MessageID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - ) - aggregationStateLate.Inc() - delete(p.state.signatures, hash) - continue - } - } + if shouldDelete := p.cleanUpStateEntry(hash, s); shouldDelete { + delete(p.state.signatures, hash) // Can't use p.state.delete() because we're holding the lock. } + } +} - switch { - case !s.settled && delta > settlementTime: - // After 30 seconds, the observation is considered settled - it's unlikely that more observations will - // arrive, barring special circumstances. This is a better time to count misses than submission, - // because we submit right when we quorum rather than waiting for all observations to arrive. - s.settled = true +// cleanUpStateEntry cleans up a single aggregation state entry. It grabs the lock for that entry. Returns true if the entry should be deleted. +func (p *Processor) cleanUpStateEntry(hash string, s *state) bool { + s.lock.Lock() + defer s.lock.Unlock() - // Use either the most recent (in case of a observation we haven't seen) or stored gs, if available. - var gs *common.GuardianSet - if s.gs != nil { - gs = s.gs - } else { - gs = p.gs + delta := time.Since(s.firstObserved) + + if !s.submitted && s.ourObservation != nil && delta > settlementTime { + // Expire pending VAAs post settlement time if we have a stored quorum VAA. + // + // This occurs when we observed a message after the cluster has already reached + // consensus on it, causing us to never achieve quorum. + if ourVaa, ok := s.ourObservation.(*VAA); ok { + if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) { + // If we have a stored quorum VAA, we can safely expire the state. + // + // This is a rare case, and we can safely expire the state, since we + // have a quorum VAA. + p.logger.Info("Expiring late VAA", + zap.String("message_id", ourVaa.VAA.MessageID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + ) + aggregationStateLate.Inc() + return true } + } + } - hasSigs := len(s.signatures) - quorum := hasSigs >= gs.Quorum() + switch { + case !s.settled && delta > settlementTime: + // After 30 seconds, the observation is considered settled - it's unlikely that more observations will + // arrive, barring special circumstances. This is a better time to count misses than submission, + // because we submit right when we quorum rather than waiting for all observations to arrive. + s.settled = true - var chain vaa.ChainID - if s.ourObservation != nil { - chain = s.ourObservation.GetEmitterChain() - } + // Peg the appropriate settlement metric using the current guardian set. If we don't have a guardian set (extremely unlikely), we just won't peg the metric. + gs := s.gs + if gs == nil { + gs = p.gst.Get() + } + if gs != nil { if p.logger.Level().Enabled(zapcore.DebugLevel) { + hasSigs := len(s.signatures) + wantSigs := vaa.CalculateQuorum(len(gs.Keys)) + quorum := hasSigs >= wantSigs + + var chain vaa.ChainID + if s.ourObservation != nil { + chain = s.ourObservation.GetEmitterChain() + } + p.logger.Debug("observation considered settled", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), - zap.Int("required_sigs", gs.Quorum()), + zap.Int("required_sigs", wantSigs), zap.Bool("quorum", quorum), zap.Stringer("emitter_chain", chain), ) @@ -141,126 +159,139 @@ func (p *Processor) handleCleanup(ctx context.Context) { aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc() } } - case s.submitted && delta.Hours() >= 1: - // We could delete submitted observations right away, but then we'd lose context about additional (late) - // observation that come in. Therefore, keep it for a reasonable amount of time. - // If a very late observation arrives after cleanup, a nil aggregation state will be created - // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian). - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("expiring submitted observation", + } + case s.submitted && delta.Hours() >= 1: + // We could delete submitted observations right away, but then we'd lose context about additional (late) + // observation that come in. Therefore, keep it for a reasonable amount of time. + // If a very late observation arrives after cleanup, a nil aggregation state will be created + // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian). + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("expiring submitted observation", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + ) + } + aggregationStateExpiration.Inc() + return true + case !s.submitted && ((s.ourObs != nil && delta > retryLimitOurs) || (s.ourObs == nil && delta > retryLimitNotOurs)): + // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. + p.logger.Info("expiring unsubmitted observation after exhausting retries", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.Bool("weObserved", s.ourObs != nil), + ) + aggregationStateTimeout.Inc() + return true + case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0: + // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. + // If we have previously submitted an observation, and it was reliable, we can make another attempt to get + // it over the finish line by sending a re-observation request to the network and rebroadcasting our + // sig. If we do not have an observation, it means we either never observed it, or it got + // revived by a malfunctioning guardian node, in which case, we can't do anything about it + // and just delete it to keep our state nice and lean. + if s.ourObs != nil { + // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes + if !s.ourObservation.IsReliable() { + p.logger.Info("expiring unsubmitted unreliable observation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), ) + aggregationStateTimeout.Inc() + return true } - delete(p.state.signatures, hash) - aggregationStateExpiration.Inc() - case !s.submitted && ((s.ourObs != nil && delta > retryLimitOurs) || (s.ourObs == nil && delta > retryLimitNotOurs)): - // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. - p.logger.Info("expiring unsubmitted observation after exhausting retries", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - zap.Bool("weObserved", s.ourObs != nil), - ) - delete(p.state.signatures, hash) - aggregationStateTimeout.Inc() - case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0: - // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. - // If we have previously submitted an observation, and it was reliable, we can make another attempt to get - // it over the finish line by sending a re-observation request to the network and rebroadcasting our - // sig. If we do not have an observation, it means we either never observed it, or it got - // revived by a malfunctioning guardian node, in which case, we can't do anything about it - // and just delete it to keep our state nice and lean. - if s.ourObs != nil { - // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes - if !s.ourObservation.IsReliable() { - p.logger.Info("expiring unsubmitted unreliable observation", + + // Reobservation requests should not be resubmitted but we will keep waiting for more observations. + if s.ourObservation.IsReobservation() { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("not submitting reobservation request for reobservation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), ) - delete(p.state.signatures, hash) - aggregationStateTimeout.Inc() - break - } - - // Reobservation requests should not be resubmitted but we will keep waiting for more observations. - if s.ourObservation.IsReobservation() { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("not submitting reobservation request for reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - zap.Duration("delta", delta), - ) - } - break } + return false + } - // If we have already stored this VAA, there is no reason for us to request reobservation. - alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) - if err != nil { - p.logger.Error("failed to check if observation is already in DB, requesting reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("hash", hash), - zap.Error(err)) - } + // If we have already stored this VAA, there is no reason for us to request reobservation. + alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) + if err != nil { + p.logger.Error("failed to check if observation is already in DB, requesting reobservation", + zap.String("message_id", s.LoggingID()), + zap.String("hash", hash), + zap.Error(err)) + } - if alreadyInDB { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("observation already in DB, not requesting reobservation", - zap.String("message_id", s.LoggingID()), - zap.String("digest", hash), - ) - } - } else { - p.logger.Info("resubmitting observation", + if alreadyInDB { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("observation already in DB, not requesting reobservation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), - zap.Duration("delta", delta), - zap.String("firstObserved", s.firstObserved.String()), - zap.Int("numSignatures", len(s.signatures)), ) - req := &gossipv1.ObservationRequest{ - ChainId: uint32(s.ourObservation.GetEmitterChain()), - TxHash: s.txHash, - } - if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { - p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) - } - if s.ourMsg != nil { - // This is the case for immediately published messages (as well as anything still pending from before the cutover). - p.gossipAttestationSendC <- s.ourMsg - } else { - p.postObservationToBatch(s.ourObs) - } - s.retryCtr++ - s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) - aggregationStateRetries.Inc() } } else { - // For nil state entries, we log the quorum to determine whether the - // network reached consensus without us. We don't know the correct guardian - // set, so we simply use the most recent one. + p.logger.Info("resubmitting observation", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.String("firstObserved", s.firstObserved.String()), + ) + req := &gossipv1.ObservationRequest{ + ChainId: uint32(s.ourObservation.GetEmitterChain()), + TxHash: s.txHash, + } + if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { + p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) + } + if s.ourMsg != nil { + // This is the case for immediately published messages (as well as anything still pending from before the cutover). + p.gossipAttestationSendC <- s.ourMsg + } else { + p.postObservationToBatch(s.ourObs) + } + s.retryCtr++ + s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) + aggregationStateRetries.Inc() + } + } else { + // For nil state entries, we log the quorum to determine whether the + // network reached consensus without us. We don't know the correct guardian + // set, so we simply use the most recent one. + if p.logger.Level().Enabled(zapcore.DebugLevel) { hasSigs := len(s.signatures) - - if p.logger.Level().Enabled(zapcore.DebugLevel) { + gs := p.gst.Get() + if gs != nil { p.logger.Debug("expiring unsubmitted nil observation", zap.String("message_id", s.LoggingID()), zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), - zap.Int("required_sigs", p.gs.Quorum()), - zap.Bool("quorum", hasSigs >= p.gs.Quorum()), + zap.Int("required_sigs", gs.Quorum()), + zap.Bool("quorum", hasSigs >= gs.Quorum()), + ) + } else { + p.logger.Debug("expiring unsubmitted nil observation, gs is nil", + zap.String("message_id", s.LoggingID()), + zap.String("digest", hash), + zap.Duration("delta", delta), + zap.Int("have_sigs", hasSigs), ) } - delete(p.state.signatures, hash) - aggregationStateUnobserved.Inc() } + aggregationStateUnobserved.Inc() + return true } } - // Clean up old pythnet VAAs. + return false +} + +// cleanupPythnetVaas deletes expired pythnet vaas. +func (p *Processor) cleanupPythnetVaas() { + p.pythnetVaaLock.Lock() + defer p.pythnetVaaLock.Unlock() oldestTime := time.Now().Add(-time.Hour) for key, pe := range p.pythnetVaas { if pe.updateTime.Before(oldestTime) { diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index ed5946a330..2a391a0e90 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ethCommon "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -32,7 +31,8 @@ var ( // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An // event may be received multiple times and must be handled in an idempotent fashion. func (p *Processor) handleMessage(k *common.MessagePublication) { - if p.gs == nil { + gs := p.gst.Get() + if gs == nil { p.logger.Warn("dropping observation since we haven't initialized our guardian set yet", zap.String("message_id", k.MessageIDString()), zap.Uint32("nonce", k.Nonce), @@ -50,7 +50,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { v := &VAA{ VAA: vaa.VAA{ Version: vaa.SupportedVAAVersion, - GuardianSetIndex: p.gs.Index, + GuardianSetIndex: gs.Index, Signatures: nil, Timestamp: k.Timestamp, Nonce: k.Nonce, @@ -99,23 +99,19 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc() // Get / create our state entry. - s := p.state.signatures[hash] - if s == nil { - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethCommon.Address][]byte{}, - source: "loopback", - } - - p.state.signatures[hash] = s + s, created := p.state.getOrCreateState(hash) + s.lock.Lock() + defer s.lock.Unlock() + + if created { + s.source = "loopback" } // Update our state. s.ourObservation = v s.txHash = k.TxHash.Bytes() s.source = v.GetEmitterChain().String() - s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + s.gs = p.gst.Get() // guaranteed to match ourObservation - there's no concurrent access to p.gs s.signatures[p.ourAddr] = signature s.ourObs = ourObs s.ourMsg = msg @@ -123,7 +119,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { // Fast path for our own signature. if !s.submitted { start := time.Now() - p.checkForQuorum(ourObs, s, s.gs, hash) + p.checkForQuorumAlreadyLocked(ourObs, s, hash) timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 3fcea6318d..66e2300b2b 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -99,11 +99,19 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation their_addr := common.BytesToAddress(addr) hash := hex.EncodeToString(m.Hash) - s := p.state.signatures[hash] - if s != nil && s.submitted { - // already submitted; ignoring additional signatures for it. - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) - return + + s, created := p.state.getOrCreateState(hash) + s.lock.Lock() + defer s.lock.Unlock() + + if !created { + if s.submitted { + // already submitted; ignoring additional signatures for it. + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + return + } + } else { + s.source = "unknown" } if p.logger.Core().Enabled(zapcore.DebugLevel) { @@ -133,35 +141,35 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation // // During an update, vaaState.signatures can contain signatures from *both* guardian sets. // - var gs *node_common.GuardianSet - if s != nil && s.gs != nil { - gs = s.gs - } else { - gs = p.gs + if s.gs == nil { + s.gs = p.gst.Get() } // We haven't yet observed the trusted guardian set on Ethereum, and therefore, it's impossible to verify it. // May as well not have received it/been offline - drop it and wait for the guardian set. - if gs == nil { + if s.gs == nil { p.logger.Warn("dropping observations since we haven't initialized our guardian set yet", zap.String("messageId", m.MessageId), zap.String("digest", hash), zap.String("their_addr", their_addr.Hex()), ) observationsFailedTotal.WithLabelValues("uninitialized_guardian_set").Inc() + if created { + p.state.delete(hash) + } return } // Verify that addr is included in the guardian set. If it's not, drop the message. In case it's us // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually. - _, ok := gs.KeyIndex(their_addr) + _, ok := s.gs.KeyIndex(their_addr) if !ok { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("received observation by unknown guardian - is our guardian set outdated?", zap.String("messageId", m.MessageId), zap.String("digest", hash), zap.String("their_addr", their_addr.Hex()), - zap.Uint32("index", gs.Index), + zap.Uint32("index", s.gs.Index), //zap.Any("keys", gs.KeysAsHexStrings()), ) } @@ -205,7 +213,7 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. - if s == nil { + if created { // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like. // However, we have established that a valid guardian has signed it, and therefore we can // already start aggregating signatures for it. @@ -215,21 +223,12 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation // signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them. observationsUnknownTotal.Inc() - - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[common.Address][]byte{}, - source: "unknown", - } - - p.state.signatures[hash] = s } s.signatures[their_addr] = m.Signature if s.ourObservation != nil { - p.checkForQuorum(m, s, gs, hash) + p.checkForQuorumAlreadyLocked(m, s, hash) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("we have not yet seen this observation yet", @@ -243,14 +242,16 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } -// checkForQuorum checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function +// checkForQuorumAlreadyLocked checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function // is called both for local and external observations. It assumes we that we have made the observation ourselves but have not already submitted the VAA. -func (p *Processor) checkForQuorum(m *gossipv1.Observation, s *state, gs *node_common.GuardianSet, hash string) { +// NOTE: This function assumes the caller holds the state lock for this observation. +func (p *Processor) checkForQuorumAlreadyLocked(m *gossipv1.Observation, s *state, hash string) { // Check if we have more signatures than required for quorum. // s.signatures may contain signatures from multiple guardian sets during guardian set updates // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. // We will later check for quorum again after assembling the VAA for a particular guardian set. + gs := s.gs if len(s.signatures) < gs.Quorum() { // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { @@ -314,7 +315,8 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQu return } - if p.gs == nil { + gs := p.gst.Get() + if gs == nil { p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet", zap.String("message_id", v.MessageID()), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())), @@ -324,7 +326,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQu } // Check if guardianSet doesn't have any keys - if len(p.gs.Keys) == 0 { + if len(gs.Keys) == 0 { p.logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys", zap.String("message_id", v.MessageID()), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())), @@ -333,8 +335,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQu return } - if err := v.Verify(p.gs.Keys); err != nil { - // We format the error as part of the message so the tests can check for it. + if err := v.Verify(gs.Keys); err != nil { p.logger.Warn("dropping SignedVAAWithQuorum message because it failed verification: "+err.Error(), zap.String("message_id", v.MessageID())) return } diff --git a/node/pkg/processor/observation_test.go b/node/pkg/processor/observation_test.go index eeef648339..ff8cc346f6 100644 --- a/node/pkg/processor/observation_test.go +++ b/node/pkg/processor/observation_test.go @@ -45,7 +45,7 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) { observedLogger := zap.New(observedZapCore) signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} - processor := Processor{} + processor := Processor{gst: common.NewGuardianSetState(nil)} processor.logger = observedLogger processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) @@ -107,8 +107,8 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) { observedLogger := zap.New(observedZapCore) signedVAAWithQuorum := &gossipv1.SignedVAAWithQuorum{Vaa: marshalVAA} - processor := Processor{} - processor.gs = &guardianSet + processor := Processor{gst: common.NewGuardianSetState(nil)} + processor.gst.Set(&guardianSet) processor.logger = observedLogger processor.handleInboundSignedVAAWithQuorum(signedVAAWithQuorum) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index e483996505..f4712bda33 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -4,6 +4,8 @@ import ( "context" "encoding/hex" "fmt" + "math" + "runtime" "sync" "time" @@ -51,6 +53,9 @@ type ( // state represents the local view of a given observation state struct { + // Mutex protects this particular state entry. + lock sync.Mutex + // First time this digest was seen (possibly even before we observed it ourselves). firstObserved time.Time // A re-observation request shall not be sent before this time. @@ -82,7 +87,9 @@ type ( // aggregationState represents the node's aggregation of guardian signatures. aggregationState struct { - signatures observationMap + // signaturesLock should be held when inserting / deleting / iterating over the map, but not when working with a single entry. + signaturesLock sync.Mutex + signatures observationMap } ) @@ -92,10 +99,34 @@ func (s *state) LoggingID() string { if s.ourObservation != nil { return s.ourObservation.MessageID() } - return hex.EncodeToString(s.txHash) } +// getOrCreateState returns the state for a given hash, creating it if it doesn't exist. It grabs the lock. +func (s *aggregationState) getOrCreateState(hash string) (*state, bool) { + s.signaturesLock.Lock() + defer s.signaturesLock.Unlock() + + created := false + if _, ok := s.signatures[hash]; !ok { + created = true + s.signatures[hash] = &state{ + firstObserved: time.Now(), + nextRetry: time.Now().Add(nextRetryDuration(0)), + signatures: make(map[ethcommon.Address][]byte), + } + } + + return s.signatures[hash], created +} + +// delete removes a state entry from the map. It grabs the lock. +func (s *aggregationState) delete(hash string) { + s.signaturesLock.Lock() + delete(s.signatures, hash) + s.signaturesLock.Unlock() +} + type PythNetVaaEntry struct { v *vaa.VAA updateTime time.Time // Used for determining when to delete entries @@ -135,8 +166,6 @@ type Processor struct { // Runtime state - // gs is the currently valid guardian set - gs *common.GuardianSet // gst is managed by the processor and allows concurrent access to the // guardian set by other components. gst *common.GuardianSetState @@ -149,11 +178,13 @@ type Processor struct { governor *governor.ChainGovernor acct *accountant.Accountant acctReadC <-chan *common.MessagePublication + pythnetVaaLock sync.Mutex pythnetVaas map[string]PythNetVaaEntry gatewayRelayer *gwrelayer.GatewayRelayer updateVAALock sync.Mutex updatedVAAs map[string]*updateVaaEntry networkID string + workerFactor float64 // batchObsvPubC is the internal channel used to publish observations to the batch processor for publishing. batchObsvPubC chan *gossipv1.Observation @@ -236,6 +267,7 @@ func NewProcessor( acctReadC <-chan *common.MessagePublication, gatewayRelayer *gwrelayer.GatewayRelayer, networkID string, + workerFactor float64, ) *Processor { return &Processor{ @@ -252,7 +284,7 @@ func NewProcessor( db: db, logger: supervisor.Logger(ctx), - state: &aggregationState{observationMap{}}, + state: &aggregationState{signatures: observationMap{}}, ourAddr: crypto.PubkeyToAddress(guardianSigner.PublicKey()), governor: g, acct: acct, @@ -264,7 +296,6 @@ func NewProcessor( networkID: networkID, } } - func (p *Processor) Run(ctx context.Context) error { // Evaluate the batch cutover time. If it has passed, then the flag will be set to make us publish observation batches. // If not, a routine will be started to wait for that time before starting to publish batches. @@ -281,25 +312,103 @@ func (p *Processor) Run(ctx context.Context) error { return fmt.Errorf("failed to start batch processor: %w", err) } + errC := make(chan error) + + if p.workerFactor < 0.0 { + return fmt.Errorf("workerFactor must be positive or zero") + } + + var numWorkers int + if p.workerFactor == 0.0 { + numWorkers = 1 + p.logger.Info("processor running in single worker mode", zap.Int("numWorkers", numWorkers), zap.Float64("workerFactor", p.workerFactor)) + } else { + numWorkers = int(math.Ceil(float64(runtime.NumCPU()) * p.workerFactor)) + p.logger.Info("processor configured to use workers", zap.Int("numWorkers", numWorkers), zap.Float64("workerFactor", p.workerFactor)) + } + + // Start the routine to do housekeeping tasks that don't need to be distributed to the workers. + common.RunWithScissors(ctx, errC, "processor_housekeeper", p.runHousekeeper) + + // Start the workers. + for workerId := 1; workerId <= numWorkers; workerId++ { + workerId := workerId + common.RunWithScissors(ctx, errC, fmt.Sprintf("processor_worker_%d", workerId), p.runWorker) + } + + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case e := <-errC: + err = e + } + + if p.acct != nil { + p.acct.Close() + } + return err +} + +// runHousekeeper performs general tasks that do not need to be distributed to the workers. There will always be exactly one instance of this. +func (p *Processor) runHousekeeper(ctx context.Context) error { + // Always start the timers to avoid nil pointer dereferences below. They will only be rearmed on worker 1. cleanup := time.NewTicker(CleanupInterval) + defer cleanup.Stop() // Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that. govTimer := time.NewTimer(GovInterval) + defer govTimer.Stop() for { select { case <-ctx.Done(): - if p.acct != nil { - p.acct.Close() - } return ctx.Err() - case p.gs = <-p.setC: + case gs := <-p.setC: p.logger.Info("guardian set updated", - zap.Strings("set", p.gs.KeysAsHexStrings()), - zap.Uint32("index", p.gs.Index), - zap.Int("quorum", p.gs.Quorum()), - ) - p.gst.Set(p.gs) + zap.Strings("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index)) + p.gst.Set(gs) + case <-cleanup.C: + p.handleCleanup() + case <-govTimer.C: + if p.governor != nil { + toBePublished, err := p.governor.CheckPending() + if err != nil { + return err + } + if len(toBePublished) != 0 { + for _, k := range toBePublished { + // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message. + if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil { + return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err) + } else if !msgIsGoverned { + return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString()) + } + if p.acct != nil { + shouldPub, err := p.acct.SubmitObservation(k) + if err != nil { + return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err) + } + if !shouldPub { + continue + } + } + p.handleMessage(k) + } + } + govTimer.Reset(GovInterval) + } + } + } +} + +// runWorker performs the per-observation tasks that can be distributed to the workers. There will be at least one of these. +func (p *Processor) runWorker(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() case k := <-p.msgC: if p.governor != nil { if !p.governor.ProcessMsg(k) { @@ -316,7 +425,6 @@ func (p *Processor) Run(ctx context.Context) error { } } p.handleMessage(k) - case k := <-p.acctReadC: if p.acct == nil { return fmt.Errorf("received an accountant event when accountant is not configured") @@ -334,38 +442,6 @@ func (p *Processor) Run(ctx context.Context) error { p.handleBatchObservation(m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(m) - case <-cleanup.C: - p.handleCleanup(ctx) - case <-govTimer.C: - if p.governor != nil { - toBePublished, err := p.governor.CheckPending() - if err != nil { - return err - } - if len(toBePublished) != 0 { - for _, k := range toBePublished { - // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message. - if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil { - return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err) - } else if !msgIsGoverned { - return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString()) - } - if p.acct != nil { - shouldPub, err := p.acct.SubmitObservation(k) - if err != nil { - return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err) - } - if !shouldPub { - continue - } - } - p.handleMessage(k) - } - } - } - if (p.governor != nil) || (p.acct != nil) { - govTimer.Reset(GovInterval) - } } } } @@ -374,7 +450,9 @@ func (p *Processor) Run(ctx context.Context) error { func (p *Processor) storeSignedVAA(v *vaa.VAA) { if v.EmitterChain == vaa.ChainIDPythNet { key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) + p.pythnetVaaLock.Lock() p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} + p.pythnetVaaLock.Unlock() return } key := fmt.Sprintf("%d/%v/%v", v.EmitterChain, v.EmitterAddress, v.Sequence) @@ -386,11 +464,10 @@ func (p *Processor) storeSignedVAA(v *vaa.VAA) { // haveSignedVAA returns true if we already have a VAA for the given VAAID func (p *Processor) haveSignedVAA(id db.VAAID) bool { if id.EmitterChain == vaa.ChainIDPythNet { - if p.pythnetVaas == nil { - return false - } key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence) + p.pythnetVaaLock.Lock() _, exists := p.pythnetVaas[key] + p.pythnetVaaLock.Unlock() return exists } @@ -429,7 +506,7 @@ func (p *Processor) getVaaFromUpdateMap(key string) *vaa.VAA { // vaaWriter is the routine that writes VAAs to the database once per second. It creates a local copy of the map // being used by the processor to reduce lock contention. It uses a dirty flag to handle the case where the VAA // gets updated again while we are in the process of writing it to the database. -func (p *Processor) vaaWriter(ctx context.Context) error { +func (p *Processor) vaaWriter(ctx context.Context) error { //nolint: unparam ticker := time.NewTicker(time.Second) for { select {