Skip to content

Commit

Permalink
Add cutover time
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 7, 2024
1 parent 194b608 commit 6594a5e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 5 deletions.
2 changes: 1 addition & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(),
node.GuardianOptionProcessor(*p2pNetworkID),
}

if shouldStart(publicGRPCSocketPath) {
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
GuardianOptionProcessor(),
GuardianOptionProcessor(networkID),
}

guardianNode := NewGuardianNode(
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {

// GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
// Dependencies: db, governor, accountant
func GuardianOptionProcessor() *GuardianOption {
func GuardianOptionProcessor(networkId string) *GuardianOption {
return &GuardianOption{
name: "processor",
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
Expand All @@ -598,6 +598,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.acct,
g.acctC.readC,
g.gatewayRelayer,
networkId,
).Run

return nil
Expand Down
3 changes: 1 addition & 2 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common"
"google.golang.org/protobuf/proto"

"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
Expand Down Expand Up @@ -42,7 +41,7 @@ func (p *Processor) broadcastSignature(
MessageId: messageID,
}

if p2p.GossipCutoverComplete() {
if batchCutoverComplete() {
if shouldPublishImmediately {
msg = p.publishImmediately(ourObs)
observationsBroadcast.Inc()
Expand Down
87 changes: 87 additions & 0 deletions node/pkg/processor/cutover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package processor

import (
"fmt"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"
)

// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
const mainnetCutOverTimeStr = ""
const testnetCutOverTimeStr = ""
const devnetCutOverTimeStr = ""
const cutOverFmtStr = "2006-01-02T15:04:05-0700"

// batchCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish observation batches.
var batchCutoverCompleteFlag atomic.Bool

// batchCutoverComplete returns true if the cutover time has passed, meaning we should publish observation batches.
func batchCutoverComplete() bool {
return batchCutoverCompleteFlag.Load()
}

// evaluateCutOver determines if the cutover time has passed yet and sets the global flag accordingly. If the time has
// not yet passed, it creates a go routine to wait for that time and then sets the flag.
func evaluateBatchCutover(logger *zap.Logger, networkID string) error {
cutOverTimeStr := getCutOverTimeStr(networkID)

sco, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, time.Now())
if err != nil {
return err
}

batchCutoverCompleteFlag.Store(sco)
logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", batchCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "batchco"))

if delay != time.Duration(0) {
// Wait for the cut over time and then update the flag.
go func() {
time.Sleep(delay)
logger.Info("time to cut over to batch publishing", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "batchco"))
batchCutoverCompleteFlag.Store(true)
}()
}

return nil
}

// evaluateBatchCutoverImpl performs the actual cut over check. It is a separate function for testing purposes.
func evaluateBatchCutoverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
if cutOverTimeStr == "" {
return false, 0, nil
}

cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
if err != nil {
return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
}

if cutOverTime.Before(now) {
logger.Info("cut over time has passed, should publish observation batches", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "batchco"))
return true, 0, nil
}

// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
zap.String("now", now.Format(cutOverFmtStr)),
zap.Stringer("delay", delay),
zap.String("component", "batchco"))

return false, delay, nil
}

// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
func getCutOverTimeStr(networkID string) string { //nolint:unparam
if strings.Contains(networkID, "/mainnet/") {
return mainnetCutOverTimeStr
}
if strings.Contains(networkID, "/testnet/") {
return testnetCutOverTimeStr
}
return devnetCutOverTimeStr
}
81 changes: 81 additions & 0 deletions node/pkg/processor/cutover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package processor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestVerifyCutOverTime(t *testing.T) {
if mainnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
require.NoError(t, err)
}
if testnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
require.NoError(t, err)
}
if devnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
require.NoError(t, err)
}
}

func TestGetCutOverTimeStr(t *testing.T) {
assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
}

func TestCutOverDisabled(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := ""
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverInvalidTime(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "Hello World"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

_, _, err = evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
}

func TestCutOverAlreadyHappened(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.True(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverDelayRequired(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(60*time.Minute), delay)
}
10 changes: 10 additions & 0 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Processor struct {
gatewayRelayer *gwrelayer.GatewayRelayer
updateVAALock sync.Mutex
updatedVAAs map[string]*updateVaaEntry
networkID string

// batchObsvPubC is the internal channel used to publish observations to the batch processor for publishing.
batchObsvPubC chan *gossipv1.Observation
Expand Down Expand Up @@ -234,6 +235,7 @@ func NewProcessor(
acct *accountant.Accountant,
acctReadC <-chan *common.MessagePublication,
gatewayRelayer *gwrelayer.GatewayRelayer,
networkID string,
) *Processor {

return &Processor{
Expand All @@ -259,10 +261,18 @@ func NewProcessor(
gatewayRelayer: gatewayRelayer,
batchObsvPubC: make(chan *gossipv1.Observation, batchObsvPubChanSize),
updatedVAAs: make(map[string]*updateVaaEntry),
networkID: networkID,
}
}

func (p *Processor) Run(ctx context.Context) error {
// Evaluate the batch cutover time. If it has passed, then the flag will be set to make us publish observation batches.
// If not, a routine will be started to wait for that time before starting to publish batches.
cutoverErr := evaluateBatchCutover(p.logger, p.networkID)
if cutoverErr != nil {
panic(cutoverErr)
}

if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
return fmt.Errorf("failed to start vaa writer: %w", err)
}
Expand Down

0 comments on commit 6594a5e

Please sign in to comment.