Skip to content

Commit

Permalink
Code review rework
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 9, 2024
1 parent 42ab842 commit 5a3a3b2
Showing 1 changed file with 80 additions and 79 deletions.
159 changes: 80 additions & 79 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Components struct {
// is only accessed by a single routine at any given time in a running Guardian.
ProtectedHostByGuardianKeyLock sync.Mutex
// WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn
// WARNING: This should not be enabled in production. It is only used in node tests to watch for overflows.
WarnChannelOverflow bool
// SignedHeartbeatLogLevel is the log level at which SignedHeartbeatReceived events will be logged.
SignedHeartbeatLogLevel zapcore.Level
Expand Down Expand Up @@ -584,6 +585,9 @@ func Run(params *RunParams) func(ctx context.Context) error {
}()

if GossipCutoverComplete() {
if controlPubsubTopic == nil {
panic("controlPubsubTopic should not be nil when nodeName is set")
}
err = controlPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("control").Inc()
if err != nil {
Expand Down Expand Up @@ -612,99 +616,97 @@ func Run(params *RunParams) func(ctx context.Context) error {
case <-ctx.Done():
return
case msg := <-params.gossipControlSendC:
if controlPubsubTopic != nil {
if GossipCutoverComplete() {
err := controlPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("control").Inc()
if err != nil {
logger.Error("failed to publish message from control queue", zap.Error(err))
}
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish message from control queue to old topic", zap.Error(err))
}
if GossipCutoverComplete() {
if controlPubsubTopic == nil {
panic("controlPubsubTopic should not be nil when gossipControlSendC is set")
}
} else {
logger.Error("received a message on the control queue when we do not have a control topic")
}
case msg := <-params.gossipAttestationSendC:
if attestationPubsubTopic != nil {
if GossipCutoverComplete() {
err := attestationPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("attestation").Inc()
if err != nil {
logger.Error("failed to publish message from attestation queue", zap.Error(err))
}
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("old_attestation").Inc()
if err != nil {
logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err))
}
err := controlPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("control").Inc()
if err != nil {
logger.Error("failed to publish message from control queue", zap.Error(err))
}
} else {
logger.Error("received a message on the attestation queue when we do not have an attestation topic")
}
case msg := <-params.gossipVaaSendC:
if vaaPubsubTopic != nil {
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("vaa").Inc()
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish message from vaa queue", zap.Error(err))
logger.Error("failed to publish message from control queue to old topic", zap.Error(err))
}
} else {
logger.Error("received a message on the vaa queue when we do not have a vaa topic")
}
case msg := <-params.obsvReqSendC:
if controlPubsubTopic != nil {
b, err := proto.Marshal(msg)
case msg := <-params.gossipAttestationSendC:
if GossipCutoverComplete() {
if attestationPubsubTopic == nil {
panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set")
}
err := attestationPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("attestation").Inc()
if err != nil {
panic(err)
logger.Error("failed to publish message from attestation queue", zap.Error(err))
}

// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), params.gk)
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("old_attestation").Inc()
if err != nil {
panic(err)
logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err))
}
}
case msg := <-params.gossipVaaSendC:
if vaaPubsubTopic == nil {
panic("vaaPubsubTopic should not be nil when gossipVaaSendC is set")
}
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("vaa").Inc()
if err != nil {
logger.Error("failed to publish message from vaa queue", zap.Error(err))
}
case msg := <-params.obsvReqSendC:
b, err := proto.Marshal(msg)
if err != nil {
panic(err)
}

sReq := &gossipv1.SignedObservationRequest{
ObservationRequest: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(),
}
// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), params.gk)
if err != nil {
panic(err)
}

envelope := &gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedObservationRequest{
SignedObservationRequest: sReq}}
sReq := &gossipv1.SignedObservationRequest{
ObservationRequest: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(),
}

b, err = proto.Marshal(envelope)
if err != nil {
panic(err)
}
envelope := &gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedObservationRequest{
SignedObservationRequest: sReq}}

// Send to local observation request queue (the loopback message is ignored)
if params.obsvReqC != nil {
params.obsvReqC <- msg
}
b, err = proto.Marshal(envelope)
if err != nil {
panic(err)
}

if GossipCutoverComplete() {
err = controlPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("control").Inc()
if err != nil {
logger.Error("failed to publish observation request", zap.Error(err))
} else {
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
} else if vaaPubsubTopic != nil {
err = vaaPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish observation request to old topic", zap.Error(err))
}
// Send to local observation request queue (the loopback message is ignored)
if params.obsvReqC != nil {
params.obsvReqC <- msg
}

if GossipCutoverComplete() {
if controlPubsubTopic == nil {
panic("controlPubsubTopic should not be nil when obsvReqSendC is set")
}
err = controlPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("control").Inc()
if err != nil {
logger.Error("failed to publish observation request", zap.Error(err))
} else {
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
} else if vaaPubsubTopic != nil {
err = vaaPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish observation request to old topic", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -1040,7 +1042,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
if params.components.WarnChannelOverflow {
// TODO do not log this in production
var hexStr string
if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
hexStr = vaa.HexDigest()
Expand Down

0 comments on commit 5a3a3b2

Please sign in to comment.