Skip to content

Commit

Permalink
Code review rework
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 8, 2024
1 parent 6594a5e commit ca16d36
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 4 deletions.
6 changes: 3 additions & 3 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these, so we can keep the channel small. With
// 19 guardians, we would expect 19 messages per second during normal operations. This gives us plenty of extra room.
inboundBatchObservationBufferSize = 100
// Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
inboundBatchObservationBufferSize = 1000

// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvRecvC != nil {
if params.obsvRecvC != nil || params.batchObsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ var (
Help: "Total number of signed observations queued for broadcast",
})

batchObservationsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_batch_observations_queued_for_broadcast",
Help: "Total number of signed batched observations queued for broadcast",
})

signedVAAsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_signed_vaas_queued_for_broadcast",
Expand Down Expand Up @@ -46,6 +52,7 @@ func (p *Processor) broadcastSignature(
msg = p.publishImmediately(ourObs)
observationsBroadcast.Inc()
} else {
batchObservationsBroadcast.Inc()
p.postObservationToBatch(ourObs)
}
} else {
Expand Down
1 change: 1 addition & 0 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
}
if s.ourMsg != nil {
// This is the case for immediately published messages (as well as anything still pending from before the cutover).
p.gossipAttestationSendC <- s.ourMsg
} else {
p.postObservationToBatch(s.ourObs)
Expand Down

0 comments on commit ca16d36

Please sign in to comment.