Skip to content

Commit

Permalink
[nodeinfo] broadcast node's height info into p2p network (#3744)
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored and dustinxie committed Feb 15, 2023
1 parent 23984f0 commit 06e7c74
Show file tree
Hide file tree
Showing 15 changed files with 859 additions and 13 deletions.
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/iotexproject/iotex-core/consensus"
rp "github.com/iotexproject/iotex-core/consensus/scheme/rolldpos"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/state/factory"
Expand Down Expand Up @@ -377,6 +378,12 @@ func (builder *Builder) createBlockchain(forSubChain, forTest bool) blockchain.B
return blockchain.NewBlockchain(builder.cfg.Chain, builder.cfg.Genesis, builder.cs.blockdao, factory.NewMinter(builder.cs.factory, builder.cs.actpool), chainOpts...)
}

func (builder *Builder) buildNodeInfoManager() {
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, builder.cs.p2pAgent, builder.cs.chain, builder.cfg.Chain.ProducerPrivateKey())
builder.cs.nodeInfoManager = dm
builder.cs.lifecycle.Add(dm)
}

func (builder *Builder) buildBlockSyncer() error {
if builder.cs.blocksync != nil {
return nil
Expand Down Expand Up @@ -622,6 +629,7 @@ func (builder *Builder) build(forSubChain, forTest bool) (*ChainService, error)
if err := builder.buildBlockSyncer(); err != nil {
return nil, err
}
builder.buildNodeInfoManager()
cs := builder.cs
builder.cs = nil

Expand Down
18 changes: 18 additions & 0 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/consensus"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
Expand Down Expand Up @@ -84,6 +85,7 @@ type ChainService struct {
candidateIndexer *poll.CandidateIndexer
candBucketsIndexer *staking.CandidatesBucketsIndexer
registry *protocol.Registry
nodeInfoManager *nodeinfo.InfoManager
}

// Start starts the server
Expand Down Expand Up @@ -163,6 +165,17 @@ func (cs *ChainService) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) err
return cs.consensus.HandleConsensusMsg(msg)
}

// HandleNodeInfo handles nodeinfo message.
func (cs *ChainService) HandleNodeInfo(ctx context.Context, peer string, msg *iotextypes.NodeInfo) error {
cs.nodeInfoManager.HandleNodeInfo(ctx, peer, msg)
return nil
}

// HandleNodeInfoRequest handles request node info message
func (cs *ChainService) HandleNodeInfoRequest(ctx context.Context, peer peer.AddrInfo, msg *iotextypes.NodeInfoRequest) error {
return cs.nodeInfoManager.HandleNodeInfoRequest(ctx, peer)
}

// ChainID returns ChainID.
func (cs *ChainService) ChainID() uint32 { return cs.chain.ChainID() }

Expand Down Expand Up @@ -196,6 +209,11 @@ func (cs *ChainService) BlockSync() blocksync.BlockSync {
return cs.blocksync
}

// NodeInfoManager returns the delegate manager
func (cs *ChainService) NodeInfoManager() *nodeinfo.InfoManager {
return cs.nodeInfoManager
}

// Registry returns a pointer to the registry
func (cs *ChainService) Registry() *protocol.Registry { return cs.registry }

Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotexproject/iotex-core/consensus/consensusfsm"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/dispatcher"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
)
Expand Down Expand Up @@ -77,9 +78,10 @@ var (
StartSubChainInterval: 10 * time.Second,
SystemLogDBPath: "/var/log",
},
DB: db.DefaultConfig,
Indexer: blockindex.DefaultConfig,
Genesis: genesis.Default,
DB: db.DefaultConfig,
Indexer: blockindex.DefaultConfig,
Genesis: genesis.Default,
NodeInfo: nodeinfo.DefaultConfig,
}

// ErrInvalidCfg indicates the invalid config value
Expand Down Expand Up @@ -129,6 +131,7 @@ type (
Log log.GlobalConfig `yaml:"log"`
SubLogs map[string]log.GlobalConfig `yaml:"subLogs"`
Genesis genesis.Genesis `yaml:"genesis"`
NodeInfo nodeinfo.Config `yaml:"nodeinfo"`
}

// Validate is the interface of validating the config
Expand Down
52 changes: 46 additions & 6 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Subscriber interface {
HandleBlock(context.Context, string, *iotextypes.Block) error
HandleSyncRequest(context.Context, peer.AddrInfo, *iotexrpc.BlockSync) error
HandleConsensusMsg(*iotextypes.ConsensusMessage) error
HandleNodeInfoRequest(context.Context, peer.AddrInfo, *iotextypes.NodeInfoRequest) error
HandleNodeInfo(context.Context, string, *iotextypes.NodeInfo) error
}

// Dispatcher is used by peers, handles incoming block and header notifications and relays announcements of new blocks.
Expand All @@ -69,12 +71,14 @@ type Dispatcher interface {
HandleTell(context.Context, uint32, peer.AddrInfo, proto.Message)
}

var requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
var (
requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
)
)

func init() {
Expand Down Expand Up @@ -430,6 +434,10 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, pe
}
case *iotextypes.Block:
d.dispatchBlock(ctx, chainID, peer, message.(*iotextypes.Block))
case *iotextypes.NodeInfo:
if err := subscriber.HandleNodeInfo(ctx, peer, msg); err != nil {
log.L().Warn("Failed to handle node info message.", zap.Error(err))
}
default:
msgType, _ := goproto.GetTypeFromRPCMsg(message)
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
Expand All @@ -447,11 +455,43 @@ func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer pe
d.dispatchBlockSyncReq(ctx, chainID, peer, message)
case iotexrpc.MessageType_BLOCK:
d.dispatchBlock(ctx, chainID, peer.ID.Pretty(), message.(*iotextypes.Block))
case iotexrpc.MessageType_NODE_INFO_REQUEST:
d.dispatchNodeInfoRequest(ctx, chainID, peer, message.(*iotextypes.NodeInfoRequest))
case iotexrpc.MessageType_NODE_INFO:
d.dispatchNodeInfo(ctx, chainID, peer.ID.Pretty(), message.(*iotextypes.NodeInfo))
default:
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Any("msgType", msgType))
}
}

func (d *IotxDispatcher) dispatchNodeInfoRequest(ctx context.Context, chainID uint32, peer peer.AddrInfo, message *iotextypes.NodeInfoRequest) {
if atomic.LoadInt32(&d.shutdown) != 0 {
return
}
subscriber := d.subscriber(chainID)
if subscriber == nil {
log.L().Debug("no subscriber for this chain id, drop the node info", zap.Uint32("chain id", chainID))
return
}
if err := subscriber.HandleNodeInfoRequest(ctx, peer, message); err != nil {
log.L().Warn("failed to handle request node info message", zap.Error(err))
}
}

func (d *IotxDispatcher) dispatchNodeInfo(ctx context.Context, chainID uint32, peerID string, message *iotextypes.NodeInfo) {
if atomic.LoadInt32(&d.shutdown) != 0 {
return
}
subscriber := d.subscriber(chainID)
if subscriber == nil {
log.L().Debug("no subscriber for this chain id, drop the node info", zap.Uint32("chain id", chainID))
return
}
if err := subscriber.HandleNodeInfo(ctx, peerID, message); err != nil {
log.L().Warn("failed to handle node info message", zap.Error(err))
}
}

func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) {
d.eventAuditLock.Lock()
defer d.eventAuditLock.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func setTestCase() []proto.Message {
&iotextypes.Block{},
&iotexrpc.BlockSync{},
&testingpb.TestPayload{},
&iotextypes.NodeInfoRequest{},
&iotextypes.NodeInfo{},
}
}

Expand Down Expand Up @@ -98,3 +100,11 @@ func (ds *dummySubscriber) HandleSyncRequest(context.Context, peer.AddrInfo, *io
func (ds *dummySubscriber) HandleAction(context.Context, *iotextypes.Action) error { return nil }

func (ds *dummySubscriber) HandleConsensusMsg(*iotextypes.ConsensusMessage) error { return nil }

func (ds *dummySubscriber) HandleNodeInfoRequest(context.Context, peer.AddrInfo, *iotextypes.NodeInfoRequest) error {
return nil
}

func (ds *dummySubscriber) HandleNodeInfo(context.Context, string, *iotextypes.NodeInfo) error {
return nil
}
132 changes: 132 additions & 0 deletions e2etest/nodeinfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package e2etest

import (
"context"
"testing"
"time"

"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/config"
"github.com/iotexproject/iotex-core/server/itx"
"github.com/iotexproject/iotex-core/testutil"
"github.com/stretchr/testify/require"
)

func newConfigForNodeInfoTest(triePath, dBPath, idxDBPath string) (config.Config, func(), error) {
cfg, err := newTestConfig()
if err != nil {
return cfg, nil, err
}
testTriePath, err := testutil.PathOfTempFile(triePath)
if err != nil {
return cfg, nil, err
}
testDBPath, err := testutil.PathOfTempFile(dBPath)
if err != nil {
return cfg, nil, err
}
indexDBPath, err := testutil.PathOfTempFile(idxDBPath)
if err != nil {
return cfg, nil, err
}
cfg.Chain.TrieDBPatchFile = ""
cfg.Chain.TrieDBPath = testTriePath
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = indexDBPath
return cfg, func() {
testutil.CleanupPath(testTriePath)
testutil.CleanupPath(testDBPath)
testutil.CleanupPath(indexDBPath)
}, nil
}

func TestBroadcastNodeInfo(t *testing.T) {
require := require.New(t)

cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.NodeInfo.EnableBroadcastNodeInfo = true
cfgSender.NodeInfo.BroadcastNodeInfoInterval = time.Second
cfgSender.Network.ReconnectInterval = 2 * time.Second
srvSender, err := itx.NewServer(cfgSender)
require.NoError(err)
ctxSender := genesis.WithGenesisContext(context.Background(), cfgSender.Genesis)
err = srvSender.Start(ctxSender)
require.NoError(err)
defer func() {
require.NoError(srvSender.Stop(ctxSender))
}()
addrsSender, err := srvSender.P2PAgent().Self()
require.NoError(err)

cfgReciever, teardown2, err := newConfigForNodeInfoTest("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.BootstrapNodes = []string{validNetworkAddr(addrsSender)}
cfgReciever.Network.ReconnectInterval = 2 * time.Second
srvReciever, err := itx.NewServer(cfgReciever)
require.NoError(err)
ctxReciever := genesis.WithGenesisContext(context.Background(), cfgReciever.Genesis)
err = srvReciever.Start(ctxReciever)
require.NoError(err)
defer func() {
require.NoError(srvReciever.Stop(ctxReciever))
}()

// check if there is sender's info in reciever delegatemanager
require.NoError(srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager().BroadcastNodeInfo(context.Background()))
time.Sleep(1 * time.Second)
addrSender := cfgSender.Chain.ProducerAddress().String()
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeByAddr(addrSender)
require.True(ok)
}

func TestUnicastNodeInfo(t *testing.T) {
require := require.New(t)

cfgReciever, teardown2, err := newConfigForNodeInfoTest("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.ReconnectInterval = 2 * time.Second
srvReciever, err := itx.NewServer(cfgReciever)
require.NoError(err)
ctxReciever := genesis.WithGenesisContext(context.Background(), cfgReciever.Genesis)
err = srvReciever.Start(ctxReciever)
require.NoError(err)
defer func() {
require.NoError(srvReciever.Stop(ctxReciever))
}()
addrsReciever, err := srvReciever.P2PAgent().Self()
require.NoError(err)

cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.Network.ReconnectInterval = 2 * time.Second
cfgSender.Network.BootstrapNodes = []string{validNetworkAddr(addrsReciever)}
srvSender, err := itx.NewServer(cfgSender)
require.NoError(err)
ctxSender := genesis.WithGenesisContext(context.Background(), cfgSender.Genesis)
err = srvSender.Start(ctxSender)
require.NoError(err)
defer func() {
require.NoError(srvSender.Stop(ctxSender))
}()

// check if there is reciever's info in sender delegatemanager
peerReciever, err := srvReciever.P2PAgent().Info()
require.NoError(err)
dmSender := srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager()
err = dmSender.RequestSingleNodeInfoAsync(context.Background(), peerReciever)
require.NoError(err)
time.Sleep(1 * time.Second)
addrReciever := cfgReciever.Chain.ProducerAddress().String()
_, ok := dmSender.GetNodeByAddr(addrReciever)
require.True(ok)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (

require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/prometheus/client_model v0.2.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0
Expand Down Expand Up @@ -173,7 +174,6 @@ require (
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
Expand All @@ -195,6 +195,7 @@ require (
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/tools v0.2.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1606,7 +1606,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
Expand Down
7 changes: 7 additions & 0 deletions misc/scripts/mockgen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,10 @@ mockgen -destination=./test/mock/mock_web3server/mock_web3server.go \
-source=./api/web3server.go \
-package=mock_web3server \
Web3Handler

mkdir -p ./test/mock/mock_nodeinfo
mockgen -destination=./test/mock/mock_nodeinfo/mock_manager.go \
-source=./nodeinfo/manager.go \
-package=mock_nodeinfo \
transmitter chain

Loading

0 comments on commit 06e7c74

Please sign in to comment.