Skip to content

Commit

Permalink
Node: Observation batching with override (#4066)
Browse files Browse the repository at this point in the history
* Node: Observation batching with override

* Add cutover time

* Code review rework
  • Loading branch information
bruce-riley committed Aug 13, 2024
1 parent 7bef236 commit 9bd569d
Show file tree
Hide file tree
Showing 19 changed files with 983 additions and 274 deletions.
2 changes: 1 addition & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(),
node.GuardianOptionProcessor(*p2pNetworkID),
}

if shouldStart(publicGRPCSocketPath) {
Expand Down
8 changes: 8 additions & 0 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
inboundBatchObservationBufferSize = 1000

// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
Expand Down Expand Up @@ -81,6 +86,8 @@ type G struct {
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
Expand Down Expand Up @@ -121,6 +128,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
GuardianOptionProcessor(),
GuardianOptionProcessor(networkID),
}

guardianNode := NewGuardianNode(
Expand Down
5 changes: 4 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func GuardianOptionP2P(
nodeName,
g.gk,
g.obsvC,
g.batchObsvC.writeC,
signedInC,
g.obsvReqC.writeC,
g.gossipControlSendC,
Expand Down Expand Up @@ -573,7 +574,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {

// GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
// Dependencies: db, governor, accountant
func GuardianOptionProcessor() *GuardianOption {
func GuardianOptionProcessor(networkId string) *GuardianOption {
return &GuardianOption{
name: "processor",
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
Expand All @@ -588,6 +589,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
g.gk,
Expand All @@ -596,6 +598,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.acct,
g.acctC.readC,
g.gatewayRelayer,
networkId,
).Run

return nil
Expand Down
40 changes: 31 additions & 9 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const P2P_SUBSCRIPTION_BUFFER_SIZE = 1024
// TESTNET_BOOTSTRAP_DHI configures how many nodes may connect to the testnet bootstrap node. This number should not exceed HighWaterMark.
const TESTNET_BOOTSTRAP_DHI = 350

// MaxObservationBatchSize is the maximum number of observations that will fit in a single `SignedObservationBatch` message.
const MaxObservationBatchSize = 4000

// MaxObservationBatchDelay is the longest we will wait before publishing any queued up observations.
const MaxObservationBatchDelay = time.Second

var (
p2pHeartbeatsSent = promauto.NewCounter(
prometheus.CounterOpts{
Expand All @@ -74,11 +80,6 @@ var (
Name: "wormhole_p2p_drops",
Help: "Total number of messages that were dropped by libp2p",
})
p2pReject = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_rejects",
Help: "Total number of messages rejected by libp2p",
})
)

var heartbeatMessagePrefix = []byte("heartbeat|")
Expand Down Expand Up @@ -173,8 +174,6 @@ func (*traceHandler) Trace(evt *libp2ppb.TraceEvent) {
if evt.Type != nil {
if *evt.Type == libp2ppb.TraceEvent_DROP_RPC {
p2pDrop.Inc()
} else if *evt.Type == libp2ppb.TraceEvent_REJECT_MESSAGE {
p2pReject.Inc()
}
}
}
Expand Down Expand Up @@ -306,6 +305,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pMessagesSent.WithLabelValues("attestation").Add(0)
p2pMessagesSent.WithLabelValues("vaa").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0)

Expand Down Expand Up @@ -397,7 +397,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil {
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil || params.batchObsvRecvC != nil {
attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
attestationPubsubTopic, err = ps.Join(attestationTopic)
Expand All @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvRecvC != nil {
if params.obsvRecvC != nil || params.batchObsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down Expand Up @@ -918,6 +918,17 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because batchObsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type on attestation topic (running outdated software?)",
Expand Down Expand Up @@ -1039,6 +1050,17 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch: // TODO: Get rid of this after the cutover.
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because obsvRecvC is full")
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if params.signedIncomingVaaRecvC != nil {
select {
Expand Down
23 changes: 18 additions & 5 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvRecvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

// obsvReqRecvC is optional and can be set with `WithObservationRequestListener`.
obsvReqRecvC chan<- *gossipv1.ObservationRequest

Expand Down Expand Up @@ -97,39 +100,47 @@ func NewRunParams(
return p, nil
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation messages.
// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvRecvC = obsvRecvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
p.batchObsvRecvC = batchObsvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages`.
func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
return nil
}
}

// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
// WithObservationRequestListener is used to set the channel to receive `ObservationRequest` messages.
func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt {
return func(p *RunParams) error {
p.obsvReqRecvC = obsvReqRecvC
return nil
}
}

// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig` messages.
func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt {
return func(p *RunParams) error {
p.signedGovCfgRecvC = signedGovCfgRecvC
return nil
}
}

// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus` messages.
func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt {
return func(p *RunParams) error {
p.signedGovStatusRecvC = signedGovStatusRecvC
Expand All @@ -150,6 +161,7 @@ func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqRecvC chan<- *gossipv1.ObservationRequest,
gossipControlSendC chan []byte,
Expand All @@ -173,6 +185,7 @@ func WithGuardianOptions(
p.nodeName = nodeName
p.gk = gk
p.obsvRecvC = obsvRecvC
p.batchObsvRecvC = batchObsvRecvC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
p.obsvReqRecvC = obsvReqRecvC
p.gossipControlSendC = gossipControlSendC
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NotNil(t, gk)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
gossipControlSendC := make(chan []byte, 42)
Expand Down Expand Up @@ -172,6 +173,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
nodeName,
gk,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
gossipControlSendC,
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
controlSendC chan []byte
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewG(t *testing.T, nodeName string) *G {

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
controlSendC: make(chan []byte, cs),
Expand Down Expand Up @@ -182,6 +184,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.nodeName,
g.gk,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
g.controlSendC,
Expand Down
87 changes: 87 additions & 0 deletions node/pkg/processor/batch_obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package processor

import (
"context"
"errors"
"fmt"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"google.golang.org/protobuf/proto"
)

// postObservationToBatch posts an individual observation to the batch processor.
func (p *Processor) postObservationToBatch(obs *gossipv1.Observation) {
select {
case p.batchObsvPubC <- obs:
default:
batchObservationChannelOverflow.WithLabelValues("batchObsvPub").Inc()
}
}

// batchProcessor is the entry point for the batch processor, which is responsible for taking individual
// observations and publishing them as batches. It limits the size of a batch and the delay before publishing.
func (p *Processor) batchProcessor(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
if err := p.handleBatch(ctx); err != nil {
return err
}
}
}
}

// handleBatch reads observations from the channel, either until a timeout occurs or the batch is full.
// Then it builds a `SendObservationBatch` gossip message and posts it to p2p.
func (p *Processor) handleBatch(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, p2p.MaxObservationBatchDelay)
defer cancel()

observations, err := common.ReadFromChannelWithTimeout(ctx, p.batchObsvPubC, p2p.MaxObservationBatchSize)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to read observations from the internal observation batch channel: %w", err)
}

if len(observations) != 0 {
_ = p.publishBatch(observations)
}

return nil
}

// publishBatch formats the set of observations into a gossip message, publishes it, and returns the message bytes.
func (p *Processor) publishBatch(observations []*gossipv1.Observation) []byte {
batchMsg := gossipv1.SignedObservationBatch{
Addr: p.ourAddr.Bytes(),
Observations: observations,
}

gossipMsg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservationBatch{SignedObservationBatch: &batchMsg}}
msg, err := proto.Marshal(&gossipMsg)
if err != nil {
panic(err)
}

select {
case p.gossipAttestationSendC <- msg:
default:
batchObservationChannelOverflow.WithLabelValues("gossipSend").Inc()
}

return msg
}

// shouldPublishImmediately returns true if the observation should be published immediately, rather than waiting for the next batch.
func (p *Processor) shouldPublishImmediately(v *vaa.VAA) bool {
return v.EmitterChain == vaa.ChainIDPythNet
}

// publishImmediately formats a single observation into a `SignedObservationBatch` gossip message and publishes it. It the returns the message bytes.
func (p *Processor) publishImmediately(obs *gossipv1.Observation) []byte {
return p.publishBatch([]*gossipv1.Observation{obs})
}
Loading

0 comments on commit 9bd569d

Please sign in to comment.