Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Gossip Topic Split #4000

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ import (
)

const (
// gossipSendBufferSize configures the size of the gossip network send buffer
gossipSendBufferSize = 5000
// gossipControlSendBufferSize configures the size of the gossip network send buffer
gossipControlSendBufferSize = 100

// gossipAttestationSendBufferSize configures the size of the gossip network send buffer
gossipAttestationSendBufferSize = 5000

// gossipVaaSendBufferSize configures the size of the gossip network send buffer
gossipVaaSendBufferSize = 5000

// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
Expand Down Expand Up @@ -69,8 +75,10 @@ type G struct {
runnables map[string]supervisor.Runnable

// various channels
// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
gossipSendC chan []byte
// Outbound gossip message queues (need to be read/write because p2p needs read/write)
gossipControlSendC chan []byte
gossipAttestationSendC chan []byte
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Finalized guardian observations aggregated across all chains
Expand Down Expand Up @@ -109,7 +117,9 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.rootCtxCancel = rootCtxCancel

// Setup various channels...
g.gossipSendC = make(chan []byte, gossipSendBufferSize)
g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize)
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
Expand Down
7 changes: 5 additions & 2 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
g.obsvC,
g.signedInC.writeC,
g.obsvReqC.writeC,
g.gossipSendC,
g.gossipControlSendC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvReqSendC.readC,
g.acct,
g.gov,
Expand Down Expand Up @@ -564,7 +566,8 @@ func GuardianOptionProcessor() *GuardianOption {
g.db,
g.msgC.readC,
g.setC.readC,
g.gossipSendC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
Expand Down
87 changes: 87 additions & 0 deletions node/pkg/p2p/gossip_cutover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package p2p

import (
"fmt"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"
)

// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
const mainnetCutOverTimeStr = ""
const testnetCutOverTimeStr = ""
const devnetCutOverTimeStr = ""
const cutOverFmtStr = "2006-01-02T15:04:05-0700"

// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics.
var gossipCutoverCompleteFlag atomic.Bool

// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic.
func GossipCutoverComplete() bool {
return gossipCutoverCompleteFlag.Load()
}

// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has
// not yet passed, it creates a go routine to wait for that time and then set the flag.
func evaluateGossipCutOver(logger *zap.Logger, networkID string) error {
cutOverTimeStr := getCutOverTimeStr(networkID)

sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now())
if err != nil {
return err
}

gossipCutoverCompleteFlag.Store(sco)
logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))

if delay != time.Duration(0) {
// Wait for the cut over time and then update the flag.
go func() {
time.Sleep(delay)
logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
gossipCutoverCompleteFlag.Store(true)
}()
}

return nil
}

// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes.
func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
if cutOverTimeStr == "" {
return false, 0, nil
}

cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
if err != nil {
return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
}

if cutOverTime.Before(now) {
logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco"))
return true, 0, nil
}

// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
zap.String("now", now.Format(cutOverFmtStr)),
zap.Stringer("delay", delay),
zap.String("component", "p2pco"))

return false, delay, nil
}

// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
func getCutOverTimeStr(networkID string) string { //nolint:unparam
if strings.Contains(networkID, "/mainnet/") {
return mainnetCutOverTimeStr
}
if strings.Contains(networkID, "/testnet/") {
return testnetCutOverTimeStr
}
return devnetCutOverTimeStr
}
81 changes: 81 additions & 0 deletions node/pkg/p2p/gossip_cutover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package p2p

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestVerifyCutOverTime(t *testing.T) {
if mainnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
require.NoError(t, err)
}
if testnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
require.NoError(t, err)
}
if devnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
require.NoError(t, err)
}
}

func TestGetCutOverTimeStr(t *testing.T) {
assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
}

func TestCutOverDisabled(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := ""
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverInvalidTime(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "Hello World"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

_, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
}

func TestCutOverAlreadyHappened(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.True(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverDelayRequired(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(60*time.Minute), delay)
}
Loading
Loading