Skip to content

Commit

Permalink
feat: add Ready method to Node (#1216)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Feb 1, 2024
1 parent cfd5d33 commit 50d426a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
4 changes: 2 additions & 2 deletions gno.land/pkg/integration/testing_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestingInMemoryNode(t TestingTS, logger *slog.Logger, config *gnoland.InMem
require.NoError(t, err)

select {
case <-gnoland.GetNodeReadiness(node):
case <-time.After(time.Second * 6):
case <-node.Ready():
case <-time.After(time.Second * 10):
require.FailNow(t, "timeout while waiting for the node to start")
}

Expand Down
22 changes: 22 additions & 0 deletions tm2/pkg/bft/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
_ "net/http/pprof" //nolint:gosec
"strings"
"sync"
"time"

"golang.org/x/exp/slog"
Expand Down Expand Up @@ -168,6 +169,7 @@ type Node struct {
rpcListeners []net.Listener // rpc servers
txEventStore eventstore.TxEventStore
eventStoreService *eventstore.Service
firstBlockSignal <-chan struct{}
}

func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
Expand Down Expand Up @@ -439,6 +441,20 @@ func NewNode(config *cfg.Config,
// but before it indexed the txs, or, endblocker panicked)
evsw := events.NewEventSwitch()

// Signal readiness when receiving the first block.
const readinessListenerID = "first_block_listener"

cFirstBlock := make(chan struct{})
var once sync.Once
evsw.AddListener(readinessListenerID, func(ev events.Event) {
if _, ok := ev.(types.EventNewBlock); ok {
once.Do(func() {
close(cFirstBlock)
evsw.RemoveListener(readinessListenerID)
})
}
})

// Transaction event storing
eventStoreService, txEventStore, err := createAndStartEventStoreService(config, evsw, logger)
if err != nil {
Expand Down Expand Up @@ -554,6 +570,7 @@ func NewNode(config *cfg.Config,
proxyApp: proxyApp,
txEventStore: txEventStore,
eventStoreService: eventStoreService,
firstBlockSignal: cFirstBlock,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

Expand Down Expand Up @@ -653,6 +670,11 @@ func (n *Node) OnStop() {
}
}

// Ready signals that the node is ready by returning a blocking channel. This channel is closed when the node receives its first block.
func (n *Node) Ready() <-chan struct{} {
return n.firstBlockSignal
}

// ConfigureRPC sets all variables in rpccore so they will serve
// rpc calls from this node
func (n *Node) ConfigureRPC() {
Expand Down
33 changes: 33 additions & 0 deletions tm2/pkg/bft/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,39 @@ func TestNodeDelayedStart(t *testing.T) {
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
}

func TestNodeReady(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
defer os.RemoveAll(config.RootDir)

// Create & start node
n, err := DefaultNewNode(config, log.NewTestingLogger(t))
require.NoError(t, err)

// Assert that blockstore has zero block before waiting for the first block
require.Equal(t, int64(0), n.BlockStore().Height())

// Assert that first block signal is not alreay received by calling Ready
select {
case <-n.Ready():
require.FailNow(t, "first block signal should not be close before starting the node")
default: // ok
}

err = n.Start()
require.NoError(t, err)
defer n.Stop()

// Wait until the node is ready or timeout
select {
case <-time.After(time.Second):
require.FailNow(t, "timeout while waiting for first block signal")
case <-n.Ready(): // ready
}

// Check that blockstore have at last one block
require.GreaterOrEqual(t, n.BlockStore().Height(), int64(1))
}

func TestNodeSetAppVersion(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
Expand Down

0 comments on commit 50d426a

Please sign in to comment.