diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index e39a3d3f99..260488dc7b 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -337,9 +337,6 @@ func runSpy(cmd *cobra.Command, args []string) { rootCtx, rootCtxCancel = context.WithCancel(context.Background()) defer rootCtxCancel() - // Outbound gossip message queue - sendC := make(chan []byte) - // Inbound signed VAAs signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 1024) @@ -391,35 +388,21 @@ func runSpy(cmd *cobra.Command, args []string) { supervisor.New(rootCtx, logger, func(ctx context.Context) error { components := p2p.DefaultComponents() components.Port = *p2pPort + params, err := p2p.NewRunParams( + *p2pBootstrap, + *p2pNetworkID, + priv, + gst, + rootCtxCancel, + p2p.WithSignedVAAListener(signedInC), + ) + if err != nil { + return err + } + if err := supervisor.Run(ctx, "p2p", - p2p.Run(nil, // Ignore incoming observations. - nil, // Ignore observation requests. - nil, - sendC, - signedInC, - priv, - nil, - gst, - *p2pNetworkID, - *p2pBootstrap, - "", - false, - rootCtxCancel, - nil, - nil, - nil, - nil, - components, - nil, // ibc feature string - false, // gateway relayer enabled - false, // ccqEnabled - nil, // query requests - nil, // query responses - "", // query bootstrap peers - 0, // query port - "", // query allow list - )); err != nil { + p2p.Run(params)); err != nil { return err } diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index c84147bf32..8ccd9e3073 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -55,33 +55,39 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, // Add the gossip advertisement address components.GossipAdvertiseAddress = gossipAdvertiseAddress - g.runnables["p2p"] = p2p.Run( - g.obsvC, - g.obsvReqC.writeC, - g.obsvReqSendC.readC, - g.gossipSendC, - g.signedInC.writeC, + params, err := p2p.NewRunParams( + bootstrapPeers, + networkId, p2pKey, - g.gk, g.gst, - networkId, - bootstrapPeers, - nodeName, - disableHeartbeatVerify, g.rootCtxCancel, - g.acct, - g.gov, - nil, - nil, - components, - ibcFeaturesFunc, - (g.gatewayRelayer != nil), - (g.queryHandler != nil), - g.signedQueryReqC.writeC, - g.queryResponsePublicationC.readC, - ccqBootstrapPeers, - ccqPort, - ccqAllowedPeers, + p2p.WithGuardianOptions( + nodeName, + g.gk, + g.obsvC, + g.signedInC.writeC, + g.obsvReqC.writeC, + g.gossipSendC, + g.obsvReqSendC.readC, + g.acct, + g.gov, + disableHeartbeatVerify, + components, + ibcFeaturesFunc, + (g.gatewayRelayer != nil), // gatewayRelayerEnabled, + (g.queryHandler != nil), // ccqEnabled, + g.signedQueryReqC.writeC, + g.queryResponsePublicationC.readC, + ccqBootstrapPeers, + ccqPort, + ccqAllowedPeers), + ) + if err != nil { + return err + } + + g.runnables["p2p"] = p2p.Run( + params, ) return nil diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index e28d1b41eb..b4ffded1d7 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -10,10 +10,7 @@ import ( "sync" "time" - "github.com/certusone/wormhole/node/pkg/accountant" "github.com/certusone/wormhole/node/pkg/common" - "github.com/certusone/wormhole/node/pkg/governor" - "github.com/certusone/wormhole/node/pkg/query" "github.com/certusone/wormhole/node/pkg/version" eth_common "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" @@ -293,36 +290,14 @@ func NewHost(logger *zap.Logger, ctx context.Context, networkID string, bootstra return h, err } -func Run( - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], - obsvReqC chan<- *gossipv1.ObservationRequest, - obsvReqSendC <-chan *gossipv1.ObservationRequest, - gossipSendC chan []byte, - signedInC chan<- *gossipv1.SignedVAAWithQuorum, - priv crypto.PrivKey, - gk *ecdsa.PrivateKey, - gst *common.GuardianSetState, - networkID string, - bootstrapPeers string, - nodeName string, - disableHeartbeatVerify bool, - rootCtxCancel context.CancelFunc, - acct *accountant.Accountant, - gov *governor.ChainGovernor, - signedGovCfg chan *gossipv1.SignedChainGovernorConfig, - signedGovSt chan *gossipv1.SignedChainGovernorStatus, - components *Components, - ibcFeaturesFunc func() string, - gatewayRelayerEnabled bool, - ccqEnabled bool, - signedQueryReqC chan<- *gossipv1.SignedQueryRequest, - queryResponseReadC <-chan *query.QueryResponsePublication, - ccqBootstrapPeers string, - ccqPort uint, - ccqAllowedPeers string, -) func(ctx context.Context) error { - if components == nil { - components = DefaultComponents() +func Run(params *RunParams) func(ctx context.Context) error { + if params == nil { + return func(ctx context.Context) error { + return errors.New("params may not be nil") + } + } + if params.components == nil { + params.components = DefaultComponents() } return func(ctx context.Context) error { @@ -336,10 +311,10 @@ func Run( // TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted. // But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background. logger.Warn("p2p routine has exited, cancelling root context...") - rootCtxCancel() + params.rootCtxCancel() }() - h, err := NewHost(logger, ctx, networkID, bootstrapPeers, components, priv) + h, err := NewHost(logger, ctx, params.networkID, params.bootstrapPeers, params.components, params.priv) if err != nil { panic(err) } @@ -355,15 +330,15 @@ func Run( panic(err) } - topic := fmt.Sprintf("%s/%s", networkID, "broadcast") + topic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") - bootstrappers, bootstrapNode := BootstrapAddrs(logger, bootstrapPeers, h.ID()) + bootstrappers, bootstrapNode := BootstrapAddrs(logger, params.bootstrapPeers, h.ID()) if bootstrapNode { logger.Info("We are a bootstrap node.") - if networkID == "/wormhole/testnet/2/1" { - components.GossipParams.Dhi = TESTNET_BOOTSTRAP_DHI - logger.Info("We are a bootstrap node in Testnet. Setting gossipParams.Dhi.", zap.Int("gossipParams.Dhi", components.GossipParams.Dhi)) + if params.networkID == "/wormhole/testnet/2/1" { + params.components.GossipParams.Dhi = TESTNET_BOOTSTRAP_DHI + logger.Info("We are a bootstrap node in Testnet. Setting gossipParams.Dhi.", zap.Int("gossipParams.Dhi", params.components.GossipParams.Dhi)) } } @@ -371,7 +346,7 @@ func Run( ourTracer := &traceHandler{} ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE), - pubsub.WithGossipSubParams(components.GossipParams), + pubsub.WithGossipSubParams(params.components.GossipParams), pubsub.WithEventTracer(ourTracer), // TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that. // pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), @@ -416,10 +391,10 @@ func Run( bootTime := time.Now() - if ccqEnabled { + if params.ccqEnabled { ccqErrC := make(chan error) - ccq := newCcqRunP2p(logger, ccqAllowedPeers, components) - if err := ccq.run(ctx, priv, gk, networkID, ccqBootstrapPeers, ccqPort, signedQueryReqC, queryResponseReadC, ccqErrC); err != nil { + ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components) + if err := ccq.run(ctx, params.priv, params.gk, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil { return fmt.Errorf("failed to start p2p for CCQ: %w", err) } defer ccq.close() @@ -430,7 +405,7 @@ func Run( return case ccqErr := <-ccqErrC: logger.Error("ccqp2p returned an error", zap.Error(ccqErr), zap.String("component", "ccqp2p")) - rootCtxCancel() + params.rootCtxCancel() return } } @@ -444,7 +419,7 @@ func Run( for { select { case <-ticker.C: - gst.Cleanup() + params.gst.Cleanup() case <-ctx.Done(): return } @@ -453,10 +428,10 @@ func Run( go func() { // Disable heartbeat when no node name is provided (spy mode) - if nodeName == "" { + if params.nodeName == "" { return } - ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) + ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey) ctr := int64(0) // Guardians should send out their first heartbeat immediately to speed up test runs. @@ -483,27 +458,27 @@ func Run( } features := make([]string, 0) - if gov != nil { + if params.gov != nil { features = append(features, "governor") } - if acct != nil { - features = append(features, acct.FeatureString()) + if params.acct != nil { + features = append(features, params.acct.FeatureString()) } - if ibcFeaturesFunc != nil { - ibcFlags := ibcFeaturesFunc() + if params.ibcFeaturesFunc != nil { + ibcFlags := params.ibcFeaturesFunc() if ibcFlags != "" { features = append(features, ibcFlags) } } - if gatewayRelayerEnabled { + if params.gatewayRelayerEnabled { features = append(features, "gwrelayer") } - if ccqEnabled { + if params.ccqEnabled { features = append(features, "ccq") } heartbeat := &gossipv1.Heartbeat{ - NodeName: nodeName, + NodeName: params.nodeName, Counter: ctr, Timestamp: time.Now().UnixNano(), Networks: networks, @@ -513,22 +488,22 @@ func Run( Features: features, } - if components.P2PIDInHeartbeat { + if params.components.P2PIDInHeartbeat { heartbeat.P2PNodeId = nodeIdBytes } - if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { + if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { panic(err) } collectNodeMetrics(ourAddr, h.ID(), heartbeat) - if gov != nil { - gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr) + if params.gov != nil { + params.gov.CollectMetrics(heartbeat, params.gossipSendC, params.gk, ourAddr) } msg := gossipv1.GossipMessage{ Message: &gossipv1.GossipMessage_SignedHeartbeat{ - SignedHeartbeat: createSignedHeartbeat(gk, heartbeat), + SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat), }, } @@ -555,13 +530,13 @@ func Run( select { case <-ctx.Done(): return - case msg := <-gossipSendC: + case msg := <-params.gossipSendC: err := th.Publish(ctx, msg) p2pMessagesSent.Inc() if err != nil { logger.Error("failed to publish message from queue", zap.Error(err)) } - case msg := <-obsvReqSendC: + case msg := <-params.obsvReqSendC: b, err := proto.Marshal(msg) if err != nil { panic(err) @@ -569,7 +544,7 @@ func Run( // Sign the observation request using our node's guardian key. digest := signedObservationRequestDigest(b) - sig, err := ethcrypto.Sign(digest.Bytes(), gk) + sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) if err != nil { panic(err) } @@ -577,7 +552,7 @@ func Run( sReq := &gossipv1.SignedObservationRequest{ ObservationRequest: b, Signature: sig, - GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(), + GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), } envelope := &gossipv1.GossipMessage{ @@ -590,8 +565,8 @@ func Run( } // Send to local observation request queue (the loopback message is ignored) - if obsvReqC != nil { - obsvReqC <- msg + if params.obsvReqC != nil { + params.obsvReqC <- msg } err = th.Publish(ctx, b) @@ -639,17 +614,17 @@ func Run( switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedHeartbeat: s := m.SignedHeartbeat - gs := gst.Get() + gs := params.gst.Get() if gs == nil { // No valid guardian set yet - dropping heartbeat - logger.Log(components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) break } - if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil { + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - logger.Log(components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", zap.Error(err), zap.Any("payload", msg.Message), zap.Any("value", s), @@ -657,14 +632,14 @@ func Run( zap.String("from", envelope.GetFrom().String())) } else { p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() - logger.Log(components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", zap.Any("value", heartbeat), zap.String("from", envelope.GetFrom().String())) func() { if len(heartbeat.P2PNodeId) != 0 { - components.ProtectedHostByGuardianKeyLock.Lock() - defer components.ProtectedHostByGuardianKeyLock.Unlock() + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() var peerId peer.ID if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { logger.Error("p2p_node_id_in_heartbeat_invalid", @@ -674,8 +649,8 @@ func Run( zap.String("from", envelope.GetFrom().String())) } else { guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) - if gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(gk.PublicKey) { - prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr] + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] if ok { if prevPeerId != peerId { logger.Info("p2p_guardian_peer_changed", @@ -683,13 +658,13 @@ func Run( zap.String("prevPeerId", prevPeerId.String()), zap.String("newPeerId", peerId.String()), ) - components.ConnMgr.Unprotect(prevPeerId, "heartbeat") - components.ConnMgr.Protect(peerId, "heartbeat") - components.ProtectedHostByGuardianKey[guardianAddr] = peerId + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId } } else { - components.ConnMgr.Protect(peerId, "heartbeat") - components.ProtectedHostByGuardianKey[guardianAddr] = peerId + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId } } } @@ -701,23 +676,23 @@ func Run( }() } case *gossipv1.GossipMessage_SignedObservation: - if obsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil { + if params.obsvC != nil { + if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { - if components.WarnChannelOverflow { + if params.components.WarnChannelOverflow { logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } case *gossipv1.GossipMessage_SignedVaaWithQuorum: - if signedInC != nil { + if params.signedInC != nil { select { - case signedInC <- m.SignedVaaWithQuorum: + case params.signedInC <- m.SignedVaaWithQuorum: p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: - if components.WarnChannelOverflow { + if params.components.WarnChannelOverflow { // TODO do not log this in production var hexStr string if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { @@ -729,9 +704,9 @@ func Run( } } case *gossipv1.GossipMessage_SignedObservationRequest: - if obsvReqC != nil { + if params.obsvReqC != nil { s := m.SignedObservationRequest - gs := gst.Get() + gs := params.gst.Get() if gs == nil { if logger.Level().Enabled(zapcore.DebugLevel) { logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) @@ -755,7 +730,7 @@ func Run( } select { - case obsvReqC <- r: + case params.obsvReqC <- r: p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() default: p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() @@ -763,12 +738,12 @@ func Run( } } case *gossipv1.GossipMessage_SignedChainGovernorConfig: - if signedGovCfg != nil { - signedGovCfg <- m.SignedChainGovernorConfig + if params.signedGovCfg != nil { + params.signedGovCfg <- m.SignedChainGovernorConfig } case *gossipv1.GossipMessage_SignedChainGovernorStatus: - if signedGovSt != nil { - signedGovSt <- m.SignedChainGovernorStatus + if params.signedGovSt != nil { + params.signedGovSt <- m.SignedChainGovernorStatus } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go new file mode 100644 index 0000000000..59652f21ca --- /dev/null +++ b/node/pkg/p2p/run_params.go @@ -0,0 +1,220 @@ +package p2p + +import ( + "context" + "crypto/ecdsa" + "errors" + + "github.com/certusone/wormhole/node/pkg/accountant" + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/governor" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/node/pkg/query" + "github.com/libp2p/go-libp2p/core/crypto" +) + +type ( + // RunParams is used to pass parameters into `p2p.Run()`. It allows applications to specify only what they need. + RunParams struct { + // These parameters are always required. + bootstrapPeers string + networkID string + priv crypto.PrivKey + gst *common.GuardianSetState + rootCtxCancel context.CancelFunc + + // obsvC is optional and can be set with `WithSignedObservationListener`. + obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] + + // obsvReqC is optional and can be set with `WithObservationRequestListener`. + obsvReqC chan<- *gossipv1.ObservationRequest + + // signedInC is optional and can be set with `WithSignedVAAListener`. + signedInC chan<- *gossipv1.SignedVAAWithQuorum + + // signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`. + signedGovCfg chan *gossipv1.SignedChainGovernorConfig + + // WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`. + signedGovSt chan *gossipv1.SignedChainGovernorStatus + + // disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`. + disableHeartbeatVerify bool + + // The following options are guardian specific. Set with `WithGuardianOptions`. + nodeName string + gk *ecdsa.PrivateKey + gossipSendC chan []byte + obsvReqSendC <-chan *gossipv1.ObservationRequest + acct *accountant.Accountant + gov *governor.ChainGovernor + components *Components + ibcFeaturesFunc func() string + gatewayRelayerEnabled bool + ccqEnabled bool + signedQueryReqC chan<- *gossipv1.SignedQueryRequest + queryResponseReadC <-chan *query.QueryResponsePublication + ccqBootstrapPeers string + ccqPort uint + ccqAllowedPeers string + } + + // RunOpt is used to specify optional parameters. + RunOpt func(p *RunParams) error +) + +// NewRunParams is used to create the `RunParams` which gets passed into `p2p.Run()`. It takes the required parameters, +// plus any desired optional ones, which can be set using the various `With` functions defined below. +func NewRunParams( + bootstrapPeers string, + networkID string, + priv crypto.PrivKey, + gst *common.GuardianSetState, + rootCtxCancel context.CancelFunc, + opts ...RunOpt, +) (*RunParams, error) { + p := &RunParams{ + bootstrapPeers: bootstrapPeers, + networkID: networkID, + priv: priv, + gst: gst, + rootCtxCancel: rootCtxCancel, + } + + for _, opt := range opts { + err := opt(p) + if err != nil { + return nil, err + } + } + + if err := p.verify(); err != nil { + return nil, err + } + + return p, nil +} + +// WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages. +func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { + return func(p *RunParams) error { + p.obsvC = obsvC + return nil + } +} + +// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages. +func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { + return func(p *RunParams) error { + p.signedInC = signedInC + return nil + } +} + +// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages. +func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt { + return func(p *RunParams) error { + p.obsvReqC = obsvReqC + return nil + } +} + +// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages. +func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt { + return func(p *RunParams) error { + p.signedGovCfg = signedGovCfg + return nil + } +} + +// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages. +func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt { + return func(p *RunParams) error { + p.signedGovSt = signedGovSt + return nil + } +} + +// WithDisableHeartbeatVerify is used to set disableHeartbeatVerify. +func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt { + return func(p *RunParams) error { + p.disableHeartbeatVerify = disableHeartbeatVerify + return nil + } +} + +// WithGuardianOptions is used to set options that are only meaningful to the guardian. +func WithGuardianOptions( + nodeName string, + gk *ecdsa.PrivateKey, + obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], + signedInC chan<- *gossipv1.SignedVAAWithQuorum, + obsvReqC chan<- *gossipv1.ObservationRequest, + gossipSendC chan []byte, + obsvReqSendC <-chan *gossipv1.ObservationRequest, + acct *accountant.Accountant, + gov *governor.ChainGovernor, + disableHeartbeatVerify bool, + components *Components, + ibcFeaturesFunc func() string, + gatewayRelayerEnabled bool, + ccqEnabled bool, + signedQueryReqC chan<- *gossipv1.SignedQueryRequest, + queryResponseReadC <-chan *query.QueryResponsePublication, + ccqBootstrapPeers string, + ccqPort uint, + ccqAllowedPeers string, +) RunOpt { + return func(p *RunParams) error { + p.nodeName = nodeName + p.gk = gk + p.obsvC = obsvC + p.signedInC = signedInC + p.obsvReqC = obsvReqC + p.gossipSendC = gossipSendC + p.obsvReqSendC = obsvReqSendC + p.acct = acct + p.gov = gov + p.disableHeartbeatVerify = disableHeartbeatVerify + p.components = components + p.ibcFeaturesFunc = ibcFeaturesFunc + p.gatewayRelayerEnabled = gatewayRelayerEnabled + p.ccqEnabled = ccqEnabled + p.signedQueryReqC = signedQueryReqC + p.queryResponseReadC = queryResponseReadC + p.ccqBootstrapPeers = ccqBootstrapPeers + p.ccqPort = ccqPort + p.ccqAllowedPeers = ccqAllowedPeers + return nil + } +} + +// verify is used to verify the RunParams object. +func (p *RunParams) verify() error { + if p.bootstrapPeers == "" { + return errors.New("bootstrapPeers may not be nil") + } + if p.networkID == "" { + return errors.New("networkID may not be nil") + } + if p.priv == nil { + return errors.New("priv may not be nil") + } + if p.gst == nil { + return errors.New("gst may not be nil") + } + if p.rootCtxCancel == nil { + return errors.New("rootCtxCancel may not be nil") + } + if p.nodeName != "" { // Heartbeating is enabled. + if p.gk == nil { + return errors.New("if heart beating is enabled, gk may not be nil") + } + } + if p.obsvReqSendC != nil { + if p.gk == nil { + return errors.New("if obsvReqSendC is not nil, gk may not be nil") + } + } + return nil +} diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go new file mode 100644 index 0000000000..aebebd952c --- /dev/null +++ b/node/pkg/p2p/run_params_test.go @@ -0,0 +1,210 @@ +package p2p + +import ( + "context" + "crypto/ecdsa" + "crypto/rand" + "testing" + + "github.com/certusone/wormhole/node/pkg/accountant" + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/governor" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/node/pkg/query" + "github.com/ethereum/go-ethereum/crypto" + p2pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const bootstrapPeers = "/dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu" +const networkId = "/wormhole/dev" +const nodeName = "guardian-0" + +func TestRunParamsBootstrapPeersRequired(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + "", // bootstrapPeers, + networkId, + priv, + gst, + rootCtxCancel, + ) + require.ErrorContains(t, err, "bootstrapPeers may not be nil") + require.Nil(t, params) +} + +func TestRunParamsNetworkIdRequired(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + bootstrapPeers, + "", // networkId, + priv, + gst, + rootCtxCancel, + ) + require.ErrorContains(t, err, "networkID may not be nil") + require.Nil(t, params) +} + +func TestRunParamsPrivRequired(t *testing.T) { + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + bootstrapPeers, + networkId, + nil, // priv, + gst, + rootCtxCancel, + ) + require.ErrorContains(t, err, "priv may not be nil") + require.Nil(t, params) +} + +func TestRunParamsGstRequired(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + nil, // gst, + rootCtxCancel, + ) + require.ErrorContains(t, err, "gst may not be nil") + require.Nil(t, params) +} + +func TestRunParamsRootCtxCancelRequired(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + gst, + nil, // rootCtxCancel, + ) + require.ErrorContains(t, err, "rootCtxCancel may not be nil") + require.Nil(t, params) +} + +func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + gst, + rootCtxCancel, + WithDisableHeartbeatVerify(true), + ) + + require.NoError(t, err) + require.NotNil(t, params) + assert.True(t, params.disableHeartbeatVerify) +} + +func TestRunParamsWithGuardianOptions(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + gk, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + require.NoError(t, err) + require.NotNil(t, gk) + + obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42) + signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42) + obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42) + gossipSendC := make(chan []byte, 42) + obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42) + + acct := &accountant.Accountant{} + gov := &governor.ChainGovernor{} + disableHeartbeatVerify := false + components := &Components{} + ibcFeaturesFunc := func() string { return "Hello, World!" } + gatewayRelayerEnabled := true + + ccqEnabled := true + signedQueryReqC := make(chan<- *gossipv1.SignedQueryRequest, 42) + queryResponseReadC := make(<-chan *query.QueryResponsePublication, 42) + ccqBootstrapPeers := "some bootstrap string" + ccqPort := uint(4242) + ccqAllowedPeers := "some allowed peers" + + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + gst, + rootCtxCancel, + WithGuardianOptions( + nodeName, + gk, + obsvC, + signedInC, + obsvReqC, + gossipSendC, + obsvReqSendC, + acct, + gov, + disableHeartbeatVerify, + components, + ibcFeaturesFunc, + gatewayRelayerEnabled, + ccqEnabled, + signedQueryReqC, + queryResponseReadC, + ccqBootstrapPeers, + ccqPort, + ccqAllowedPeers), + ) + + require.NoError(t, err) + require.NotNil(t, params) + assert.Equal(t, nodeName, params.nodeName) + assert.Equal(t, obsvC, params.obsvC) + assert.Equal(t, signedInC, params.signedInC) + assert.Equal(t, obsvReqC, params.obsvReqC) + assert.Equal(t, gossipSendC, params.gossipSendC) + assert.Equal(t, obsvReqSendC, params.obsvReqSendC) + assert.Equal(t, acct, params.acct) + assert.Equal(t, gov, params.gov) + assert.Equal(t, components, params.components) + assert.NotNil(t, params.ibcFeaturesFunc) // Can't compare function pointers, so just verify it's set. + assert.True(t, params.gatewayRelayerEnabled) + assert.True(t, params.ccqEnabled) + assert.Equal(t, signedQueryReqC, params.signedQueryReqC) + assert.Equal(t, queryResponseReadC, params.queryResponseReadC) + assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers) + assert.Equal(t, ccqPort, params.ccqPort) + assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers) +} diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 3e1373b282..661decf30e 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -61,6 +61,8 @@ func NewG(t *testing.T, nodeName string) *G { panic(err) } + _, rootCtxCancel := context.WithCancel(context.Background()) + g := &G{ obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs), obsvReqC: make(chan *gossipv1.ObservationRequest, cs), @@ -72,7 +74,7 @@ func NewG(t *testing.T, nodeName string) *G { gst: node_common.NewGuardianSetState(nil), nodeName: nodeName, disableHeartbeatVerify: false, - rootCtxCancel: nil, + rootCtxCancel: rootCtxCancel, gov: nil, signedGovCfg: make(chan *gossipv1.SignedChainGovernorConfig, cs), signedGovSt: make(chan *gossipv1.SignedChainGovernorStatus, cs), @@ -164,32 +166,35 @@ func TestWatermark(t *testing.T) { func startGuardian(t *testing.T, ctx context.Context, g *G) { t.Helper() - supervisor.New(ctx, zap.L(), - Run(g.obsvC, + params, err := NewRunParams( + g.bootstrapPeers, + g.networkID, + g.priv, + g.gst, + g.rootCtxCancel, + WithGuardianOptions( + g.nodeName, + g.gk, + g.obsvC, + g.signedInC, g.obsvReqC, - g.obsvReqSendC, g.sendC, - g.signedInC, - g.priv, - g.gk, - g.gst, - g.networkID, - g.bootstrapPeers, - g.nodeName, - g.disableHeartbeatVerify, - g.rootCtxCancel, + g.obsvReqSendC, g.acct, g.gov, - g.signedGovCfg, - g.signedGovSt, + g.disableHeartbeatVerify, g.components, - nil, // ibc feature string + nil, //g.ibcFeaturesFunc, false, // gateway relayer enabled false, // ccqEnabled nil, // signed query request channel nil, // query response channel "", // query bootstrap peers 0, // query port - "", // query allowed peers + "", // query allowed peers), )) + require.NoError(t, err) + + supervisor.New(ctx, zap.L(), + Run(params)) }