Skip to content

Commit

Permalink
Refactor full node constructor (cosmos#1227)
Browse files Browse the repository at this point in the history
<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview

Closes: cosmos#1229 

<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
Manav-Aggarwal authored Oct 5, 2023
1 parent 6c4a346 commit 3e7ff30
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 76 deletions.
3 changes: 1 addition & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func NewManager(
dalc da.DataAvailabilityLayerClient,
eventBus *cmtypes.EventBus,
logger log.Logger,
doneBuildingCh chan struct{},
blockStore *goheaderstore.Store[*types.Block],
) (*Manager, error) {
s, err := getInitialState(store, genesis)
Expand Down Expand Up @@ -182,7 +181,7 @@ func NewManager(
retrieveCh: make(chan struct{}, 1),
logger: logger,
txsAvailable: txsAvailableCh,
doneBuildingBlock: doneBuildingCh,
doneBuildingBlock: make(chan struct{}),
buildingBlock: false,
pendingBlocks: NewPendingBlocks(),
}
Expand Down
3 changes: 1 addition & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ func TestInitialState(t *testing.T) {
defer func() {
require.NoError(t, dalc.Stop())
}()
dumbChan := make(chan struct{})
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, dumbChan, nil)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger, nil)
assert.NoError(err)
assert.NotNil(agg)
agg.lastStateMtx.RLock()
Expand Down
176 changes: 112 additions & 64 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,115 +53,95 @@ var _ Node = &FullNode{}
// It connects all the components and orchestrates their work.
type FullNode struct {
service.BaseService
eventBus *cmtypes.EventBus
proxyApp proxy.AppConns

genesis *cmtypes.GenesisDoc
// cache of chunked genesis data.
genChunks []string

conf config.NodeConfig
P2P *p2p.Client
nodeConfig config.NodeConfig

proxyApp proxy.AppConns
eventBus *cmtypes.EventBus
dalc da.DataAvailabilityLayerClient
p2pClient *p2p.Client
hExService *block.HeaderExchangeService
bExService *block.BlockExchangeService
// TODO(tzdybal): consider extracting "mempool reactor"
Mempool mempool.Mempool
mempoolIDs *mempoolIDs

Mempool mempool.Mempool
mempoolIDs *mempoolIDs
Store store.Store
blockManager *block.Manager
dalc da.DataAvailabilityLayerClient

// Preserves cometBFT compatibility
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
IndexerService *txindex.IndexerService

hExService *block.HeaderExchangeService
bExService *block.BlockExchangeService

// keep context here only because of API compatibility
// - it's used in `OnStart` (defined in service.Service interface)
ctx context.Context

ctx context.Context
cancel context.CancelFunc
}

// newFullNode creates a new Rollkit full node.
func newFullNode(
ctx context.Context,
conf config.NodeConfig,
nodeConfig config.NodeConfig,
p2pKey crypto.PrivKey,
signingKey crypto.PrivKey,
clientCreator proxy.ClientCreator,
genesis *cmtypes.GenesisDoc,
logger log.Logger,
) (*FullNode, error) {
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
proxyApp, err := initProxyApp(clientCreator, logger)
if err != nil {
return nil, err
}

eventBus := cmtypes.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
eventBus, err := initEventBus(logger)
if err != nil {
return nil, err
}

var err error
var baseKV ds.TxnDatastore
if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing
logger.Info("WARNING: working in in-memory mode")
baseKV, err = store.NewDefaultInMemoryKVStore()
} else {
baseKV, err = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollkit")
}
baseKV, err := initBaseKV(nodeConfig, logger)
if err != nil {
return nil, err
}

mainKV := newPrefixKV(baseKV, mainPrefix)
dalcKV := newPrefixKV(baseKV, dalcPrefix)
indexerKV := newPrefixKV(baseKV, indexerPrefix)

client, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p"))
dalc, err := initDALC(nodeConfig, dalcKV, logger)
if err != nil {
return nil, err
}
s := store.New(ctx, mainKV)

dalc := registry.GetClient(conf.DALayer)
if dalc == nil {
return nil, fmt.Errorf("couldn't get data availability client named '%s'", conf.DALayer)
}
err = dalc.Init(conf.NamespaceID, []byte(conf.DAConfig), dalcKV, logger.With("module", "da_client"))
p2pClient, err := p2p.NewClient(nodeConfig.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p"))
if err != nil {
return nil, fmt.Errorf("data availability layer client initialization error: %w", err)
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, conf, indexerKV, eventBus, logger)
mainKV := newPrefixKV(baseKV, mainPrefix)
headerExchangeService, err := initHeaderExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
if err != nil {
return nil, err
}

mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mpIDs := newMempoolIDs()
mp.EnableTxsAvailable()

headerExchangeService, err := block.NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
blockExchangeService, err := initBlockExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
return nil, err
}

blockExchangeService, err := block.NewBlockExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "BlockExchangeService"))
mempool := initMempool(logger, proxyApp)

store := store.New(ctx, mainKV)
blockManager, err := initBlockManager(signingKey, nodeConfig, genesis, store, mempool, proxyApp, dalc, eventBus, logger, blockExchangeService)
if err != nil {
return nil, fmt.Errorf("BlockExchangeService initialization error: %w", err)
return nil, err
}

doneBuildingChannel := make(chan struct{})
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel, blockExchangeService.BlockStore())
indexerKV := newPrefixKV(baseKV, indexerPrefix)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, nodeConfig, indexerKV, eventBus, logger)
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -170,13 +150,13 @@ func newFullNode(
proxyApp: proxyApp,
eventBus: eventBus,
genesis: genesis,
conf: conf,
P2P: client,
nodeConfig: nodeConfig,
p2pClient: p2pClient,
blockManager: blockManager,
dalc: dalc,
Mempool: mp,
mempoolIDs: mpIDs,
Store: s,
Mempool: mempool,
mempoolIDs: newMempoolIDs(),
Store: store,
TxIndexer: txIndexer,
IndexerService: indexerService,
BlockIndexer: blockIndexer,
Expand All @@ -187,12 +167,80 @@ func newFullNode(
}

node.BaseService = *service.NewBaseService(logger, "Node", node)

node.P2P.SetTxValidator(node.newTxValidator())
node.p2pClient.SetTxValidator(node.newTxValidator())

return node, nil
}

func initProxyApp(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}

func initEventBus(logger log.Logger) (*cmtypes.EventBus, error) {
eventBus := cmtypes.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
}

// initBaseKV initializes the base key-value store.
func initBaseKV(nodeConfig config.NodeConfig, logger log.Logger) (ds.TxnDatastore, error) {
if nodeConfig.RootDir == "" && nodeConfig.DBPath == "" { // this is used for testing
logger.Info("WARNING: working in in-memory mode")
return store.NewDefaultInMemoryKVStore()
}
return store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, "rollkit")
}

func initDALC(nodeConfig config.NodeConfig, dalcKV ds.TxnDatastore, logger log.Logger) (da.DataAvailabilityLayerClient, error) {
dalc := registry.GetClient(nodeConfig.DALayer)
if dalc == nil {
return nil, fmt.Errorf("couldn't get data availability client named '%s'", nodeConfig.DALayer)
}
err := dalc.Init(nodeConfig.NamespaceID, []byte(nodeConfig.DAConfig), dalcKV, logger.With("module", "da_client"))
if err != nil {
return nil, fmt.Errorf("data availability layer client initialization error: %w", err)
}
return dalc, nil
}

func initMempool(logger log.Logger, proxyApp proxy.AppConns) *mempoolv1.TxMempool {
mempool := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mempool.EnableTxsAvailable()
return mempool
}

func initHeaderExchangeService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderExchangeService, error) {
headerExchangeService, err := block.NewHeaderExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
return headerExchangeService, nil
}

func initBlockExchangeService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockExchangeService, error) {
blockExchangeService, err := block.NewBlockExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
return blockExchangeService, nil
}

func initBlockManager(signingKey crypto.PrivKey, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, dalc da.DataAvailabilityLayerClient, eventBus *cmtypes.EventBus, logger log.Logger, blockExchangeService *block.BlockExchangeService) (*block.Manager, error) {
blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), blockExchangeService.BlockStore())
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}
return blockManager, nil
}

// initGenesisChunks creates a chunked format of the genesis document to make it easier to
// iterate through larger genesis structures.
func (n *FullNode) initGenesisChunks() error {
Expand Down Expand Up @@ -258,7 +306,7 @@ func (n *FullNode) blockPublishLoop(ctx context.Context) {
func (n *FullNode) OnStart() error {

n.Logger.Info("starting P2P client")
err := n.P2P.Start(n.ctx)
err := n.p2pClient.Start(n.ctx)
if err != nil {
return fmt.Errorf("error while starting P2P client: %w", err)
}
Expand All @@ -275,9 +323,9 @@ func (n *FullNode) OnStart() error {
return fmt.Errorf("error while starting data availability layer client: %w", err)
}

if n.conf.Aggregator {
n.Logger.Info("working in aggregator mode", "block time", n.conf.BlockTime)
go n.blockManager.AggregationLoop(n.ctx, n.conf.LazyAggregator)
if n.nodeConfig.Aggregator {
n.Logger.Info("working in aggregator mode", "block time", n.nodeConfig.BlockTime)
go n.blockManager.AggregationLoop(n.ctx, n.nodeConfig.LazyAggregator)
go n.blockManager.BlockSubmissionLoop(n.ctx)
go n.headerPublishLoop(n.ctx)
go n.blockPublishLoop(n.ctx)
Expand Down Expand Up @@ -307,7 +355,7 @@ func (n *FullNode) OnStop() {
n.Logger.Info("halting full node...")
n.cancel()
err := n.dalc.Stop()
err = multierr.Append(err, n.P2P.Close())
err = multierr.Append(err, n.p2pClient.Close())
err = multierr.Append(err, n.hExService.Stop())
err = multierr.Append(err, n.bExService.Stop())
n.Logger.Error("errors while stopping node:", "errors", err)
Expand Down
12 changes: 6 additions & 6 deletions node/full_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *FullClient) BroadcastTxCommit(ctx context.Context, tx cmtypes.Tx) (*cty
}

// broadcast tx
err = c.node.P2P.GossipTx(ctx, tx)
err = c.node.p2pClient.GossipTx(ctx, tx)
if err != nil {
return nil, fmt.Errorf("tx added to local mempool but failure to broadcast: %w", err)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (c *FullClient) BroadcastTxAsync(ctx context.Context, tx cmtypes.Tx) (*ctyp
return nil, err
}
// gossipTx optimistically
err = c.node.P2P.GossipTx(ctx, tx)
err = c.node.p2pClient.GossipTx(ctx, tx)
if err != nil {
return nil, fmt.Errorf("tx added to local mempool but failed to gossip: %w", err)
}
Expand All @@ -217,7 +217,7 @@ func (c *FullClient) BroadcastTxSync(ctx context.Context, tx cmtypes.Tx) (*ctype
// Note: we have to do this here because, unlike the tendermint mempool reactor, there
// is no routine that gossips transactions after they enter the pool
if r.Code == abci.CodeTypeOK {
err = c.node.P2P.GossipTx(ctx, tx)
err = c.node.p2pClient.GossipTx(ctx, tx)
if err != nil {
// the transaction must be removed from the mempool if it cannot be gossiped.
// if this does not occur, then the user will not be able to try again using
Expand Down Expand Up @@ -348,10 +348,10 @@ func (c *FullClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error)
res := ctypes.ResultNetInfo{
Listening: true,
}
for _, ma := range c.node.P2P.Addrs() {
for _, ma := range c.node.p2pClient.Addrs() {
res.Listeners = append(res.Listeners, ma.String())
}
peers := c.node.P2P.Peers()
peers := c.node.p2pClient.Peers()
res.NPeers = len(peers)
for _, peer := range peers {
res.Peers = append(res.Peers, ctypes.Peer{
Expand Down Expand Up @@ -716,7 +716,7 @@ func (c *FullClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
state.Version.Consensus.Block,
state.Version.Consensus.App,
)
id, addr, network, err := c.node.P2P.Info()
id, addr, network, err := c.node.p2pClient.Info()
if err != nil {
return nil, fmt.Errorf("failed to load node p2p2 info: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, source Source)
// Get the trusted hash from node1 and pass it to node2 config
trustedHash, err := node1.hExService.HeaderStore().GetByHeight(aggCtx, 1)
require.NoError(err)
node2.conf.TrustedHash = trustedHash.Hash().String()
node2.nodeConfig.TrustedHash = trustedHash.Hash().String()
require.NoError(node2.Start())
defer func() {
require.NoError(node2.Stop())
Expand Down Expand Up @@ -447,7 +447,7 @@ func startNodes(nodes []*FullNode, apps []*mocks.Application, t *testing.T) {

for i := 1; i < len(nodes); i++ {
data := strconv.Itoa(i) + time.Now().String()
require.NoError(t, nodes[i].P2P.GossipTx(context.TODO(), []byte(data)))
require.NoError(t, nodes[i].p2pClient.GossipTx(context.TODO(), []byte(data)))
}

timeout := time.NewTimer(time.Second * 30)
Expand Down

0 comments on commit 3e7ff30

Please sign in to comment.