From ca16d36b38649bc77b53bd11e7d5a24aa2a75fc1 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 8 Aug 2024 08:57:34 -0500 Subject: [PATCH] Code review rework --- node/pkg/node/node.go | 6 +++--- node/pkg/p2p/p2p.go | 2 +- node/pkg/processor/broadcast.go | 7 +++++++ node/pkg/processor/cleanup.go | 1 + 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index 9dddc4a01e..3bb934d2de 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -34,9 +34,9 @@ const ( inboundObservationBufferSize = 10000 // inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians. - // Since a batch contains many observations, the guardians should not be publishing too many of these, so we can keep the channel small. With - // 19 guardians, we would expect 19 messages per second during normal operations. This gives us plenty of extra room. - inboundBatchObservationBufferSize = 100 + // Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages + // per second during normal operations. However, since some messages get published immediately, we need to allow extra room. + inboundBatchObservationBufferSize = 1000 // inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians. // One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't. diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 7f24d6515a..3ea025f7ab 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvRecvC != nil { + if params.obsvRecvC != nil || params.batchObsvRecvC != nil { logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 02840b2ff9..81d09bb1c9 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -18,6 +18,12 @@ var ( Help: "Total number of signed observations queued for broadcast", }) + batchObservationsBroadcast = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "wormhole_batch_observations_queued_for_broadcast", + Help: "Total number of signed batched observations queued for broadcast", + }) + signedVAAsBroadcast = promauto.NewCounter( prometheus.CounterOpts{ Name: "wormhole_signed_vaas_queued_for_broadcast", @@ -46,6 +52,7 @@ func (p *Processor) broadcastSignature( msg = p.publishImmediately(ourObs) observationsBroadcast.Inc() } else { + batchObservationsBroadcast.Inc() p.postObservationToBatch(ourObs) } } else { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index b93b49aa8d..496d5e9952 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -229,6 +229,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) } if s.ourMsg != nil { + // This is the case for immediately published messages (as well as anything still pending from before the cutover). p.gossipAttestationSendC <- s.ourMsg } else { p.postObservationToBatch(s.ourObs)