Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast node's height info into p2p network #3744

Merged
merged 36 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2412217
init monitor message framework
envestcc Dec 29, 2022
0f70388
init broadcast framework
envestcc Dec 30, 2022
5507558
update broadcast framework
envestcc Dec 30, 2022
47e97d5
update broadcast framework
envestcc Dec 30, 2022
935f9cc
add node package to handle node info manager
envestcc Jan 3, 2023
3ecc432
update
envestcc Jan 3, 2023
1090b2a
add node info request and response message
envestcc Jan 6, 2023
6eb404d
remove delegate manager dependency
envestcc Jan 6, 2023
98884a2
add message sign
envestcc Jan 9, 2023
eb56750
add sign for node info message
envestcc Jan 10, 2023
37af551
fix unittest error
envestcc Jan 10, 2023
96d185a
add unittest for node
envestcc Jan 11, 2023
5c0b500
add pubkey in nodeinfo message
envestcc Jan 11, 2023
b15fea7
Merge branch 'master' into monitor_msg
envestcc Jan 11, 2023
6e191c6
fix compile error
envestcc Jan 11, 2023
4660879
update go.mod
envestcc Jan 11, 2023
4284fa8
refactor node unittest
envestcc Jan 12, 2023
fbb6983
refactor unittest
envestcc Jan 12, 2023
19cf088
convert NodeManager.UpdateNode to private
envestcc Jan 13, 2023
b9cd38a
use recovered pubkey from sign to verify
envestcc Jan 13, 2023
fb8f2a9
fix comment
envestcc Jan 13, 2023
5f9bbfd
simplify finding in array
envestcc Jan 13, 2023
1a10a7b
fix lint error
envestcc Jan 13, 2023
8d53cd4
add unicast single request message
envestcc Jan 16, 2023
ebbec69
node push(broadcast) node info message proactively and periodically
envestcc Jan 18, 2023
dc07084
Merge branch 'master' into monitor_msg
envestcc Jan 19, 2023
cbc2a66
fix comment
envestcc Jan 25, 2023
e09792b
chang node to nodeinfo
envestcc Jan 28, 2023
0016e7f
add e2etest for nodeinfo
envestcc Jan 29, 2023
0c6e7ed
fix comment
envestcc Feb 1, 2023
5b4038e
fix comments
envestcc Feb 2, 2023
e9af1cf
Merge branch 'master' into monitor_msg
envestcc Feb 2, 2023
fd70cd1
update mock
envestcc Feb 2, 2023
9947145
move const var _nodeMapSize to config struct
envestcc Feb 6, 2023
63e56ef
Merge branch 'master' into monitor_msg
envestcc Feb 6, 2023
37fdaaa
Update manager.go
dustinxie Feb 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/iotexproject/iotex-core/consensus"
rp "github.com/iotexproject/iotex-core/consensus/scheme/rolldpos"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/state/factory"
Expand Down Expand Up @@ -377,6 +378,12 @@ func (builder *Builder) createBlockchain(forSubChain, forTest bool) blockchain.B
return blockchain.NewBlockchain(builder.cfg.Chain, builder.cfg.Genesis, builder.cs.blockdao, factory.NewMinter(builder.cs.factory, builder.cs.actpool), chainOpts...)
}

func (builder *Builder) buildDelegateManager() {
dm := nodeinfo.NewDelegateManager(&builder.cfg.NodeInfo, builder.cs.p2pAgent, builder.cs.chain, builder.cfg.Chain.ProducerPrivateKey())
builder.cs.delegateManager = dm
builder.cs.lifecycle.Add(dm)
}

func (builder *Builder) buildBlockSyncer() error {
if builder.cs.blocksync != nil {
return nil
Expand Down Expand Up @@ -622,6 +629,7 @@ func (builder *Builder) build(forSubChain, forTest bool) (*ChainService, error)
if err := builder.buildBlockSyncer(); err != nil {
return nil, err
}
builder.buildDelegateManager()
cs := builder.cs
builder.cs = nil

Expand Down
29 changes: 29 additions & 0 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-address/address"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/consensus"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
Expand Down Expand Up @@ -84,6 +86,7 @@ type ChainService struct {
candidateIndexer *poll.CandidateIndexer
candBucketsIndexer *staking.CandidatesBucketsIndexer
registry *protocol.Registry
delegateManager *nodeinfo.DelegateManager
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

// Start starts the server
Expand Down Expand Up @@ -163,6 +166,27 @@ func (cs *ChainService) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) err
return cs.consensus.HandleConsensusMsg(msg)
}

// HandleNodeInfoMsg handles nodeinfo message.
func (cs *ChainService) HandleNodeInfoMsg(ctx context.Context, peer string, msg *iotextypes.ResponseNodeInfoMessage) error {
cs.delegateManager.HandleNodeInfo(ctx, peer, msg)
return nil
}

// HandleRequestNodeInfoMsg handles request node info message
func (cs *ChainService) HandleRequestNodeInfoMsg(ctx context.Context, peerID string, msg *iotextypes.RequestNodeInfoMessage) error {
peers, err := cs.p2pAgent.ConnectedPeers()
envestcc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "get connected peers failed")
}
id := slices.IndexFunc(peers, func(p peer.AddrInfo) bool {
return p.ID.Pretty() == peerID
})
if id < 0 {
return errors.Errorf("unicast node info msg failed: target peerID %s is not connected", peerID)
}
return cs.delegateManager.HandleNodeInfoRequest(ctx, peers[id])
}

// ChainID returns ChainID.
func (cs *ChainService) ChainID() uint32 { return cs.chain.ChainID() }

Expand Down Expand Up @@ -196,6 +220,11 @@ func (cs *ChainService) BlockSync() blocksync.BlockSync {
return cs.blocksync
}

// DelegateManager returns the delegate manager
func (cs *ChainService) DelegateManager() *nodeinfo.DelegateManager {
return cs.delegateManager
}

// Registry returns a pointer to the registry
func (cs *ChainService) Registry() *protocol.Registry { return cs.registry }

Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotexproject/iotex-core/consensus/consensusfsm"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/dispatcher"
"github.com/iotexproject/iotex-core/nodeinfo"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
)
Expand Down Expand Up @@ -77,9 +78,10 @@ var (
StartSubChainInterval: 10 * time.Second,
SystemLogDBPath: "/var/log",
},
DB: db.DefaultConfig,
Indexer: blockindex.DefaultConfig,
Genesis: genesis.Default,
DB: db.DefaultConfig,
Indexer: blockindex.DefaultConfig,
Genesis: genesis.Default,
NodeInfo: nodeinfo.DefaultConfig,
}

// ErrInvalidCfg indicates the invalid config value
Expand Down Expand Up @@ -130,6 +132,7 @@ type (
Log log.GlobalConfig `yaml:"log"`
SubLogs map[string]log.GlobalConfig `yaml:"subLogs"`
Genesis genesis.Genesis `yaml:"genesis"`
NodeInfo nodeinfo.Config `yaml:"nodeinfo"`
}

// Validate is the interface of validating the config
Expand Down
45 changes: 39 additions & 6 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Subscriber interface {
HandleBlock(context.Context, string, *iotextypes.Block) error
HandleSyncRequest(context.Context, peer.AddrInfo, *iotexrpc.BlockSync) error
HandleConsensusMsg(*iotextypes.ConsensusMessage) error
HandleRequestNodeInfoMsg(context.Context, string, *iotextypes.RequestNodeInfoMessage) error
envestcc marked this conversation as resolved.
Show resolved Hide resolved
HandleNodeInfoMsg(context.Context, string, *iotextypes.ResponseNodeInfoMessage) error
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

// Dispatcher is used by peers, handles incoming block and header notifications and relays announcements of new blocks.
Expand All @@ -67,12 +69,14 @@ type Dispatcher interface {
HandleTell(context.Context, uint32, peer.AddrInfo, proto.Message)
}

var requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
var (
requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
)
)

func init() {
Expand Down Expand Up @@ -427,6 +431,10 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, pe
d.dispatchAction(ctx, chainID, message)
case *iotextypes.Block:
d.dispatchBlock(ctx, chainID, peer, message)
case *iotextypes.ResponseNodeInfoMessage:
envestcc marked this conversation as resolved.
Show resolved Hide resolved
if err := subscriber.HandleNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Warn("Failed to handle node info message.", zap.Error(err))
}
default:
msgType, _ := goproto.GetTypeFromRPCMsg(message)
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
Expand All @@ -444,11 +452,36 @@ func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer pe
d.dispatchBlockSyncReq(ctx, chainID, peer, message)
case iotexrpc.MessageType_BLOCK:
d.dispatchBlock(ctx, chainID, peer.ID.Pretty(), message)
case iotexrpc.MessageType_NODE_INFO_REQUEST, iotexrpc.MessageType_NODE_INFO:
envestcc marked this conversation as resolved.
Show resolved Hide resolved
d.dispatchNodeInfo(ctx, chainID, peer.ID.Pretty(), message)
default:
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Any("msgType", msgType))
}
}

func (d *IotxDispatcher) dispatchNodeInfo(ctx context.Context, chainID uint32, peer string, message proto.Message) {
if !d.IsReady() {
return
}
subscriber := d.subscriber(chainID)
if subscriber == nil {
log.L().Debug("no subscriber for this chain id, drop the node info", zap.Uint32("chain id", chainID))
return
}
switch msg := message.(type) {
case *iotextypes.ResponseNodeInfoMessage:
if err := subscriber.HandleNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Warn("failed to handle node info message", zap.Error(err))
}
case *iotextypes.RequestNodeInfoMessage:
if err := subscriber.HandleRequestNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Warn("failed to handle request node info message", zap.Error(err))
}
default:
log.L().Warn("Unexpected nodeinfo msgType.", zap.Any("msgType", msg))
}
}

func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) {
d.eventAuditLock.Lock()
defer d.eventAuditLock.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func setTestCase() []proto.Message {
&iotextypes.Block{},
&iotexrpc.BlockSync{},
&testingpb.TestPayload{},
&iotextypes.RequestNodeInfoMessage{},
&iotextypes.ResponseNodeInfoMessage{},
}
}

Expand Down Expand Up @@ -98,3 +100,11 @@ func (ds *dummySubscriber) HandleSyncRequest(context.Context, peer.AddrInfo, *io
func (ds *dummySubscriber) HandleAction(context.Context, *iotextypes.Action) error { return nil }

func (ds *dummySubscriber) HandleConsensusMsg(*iotextypes.ConsensusMessage) error { return nil }

func (ds *dummySubscriber) HandleRequestNodeInfoMsg(context.Context, string, *iotextypes.RequestNodeInfoMessage) error {
return nil
}

func (ds *dummySubscriber) HandleNodeInfoMsg(context.Context, string, *iotextypes.ResponseNodeInfoMessage) error {
return nil
}
132 changes: 132 additions & 0 deletions e2etest/nodeinfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package e2etest

import (
"context"
"testing"
"time"

"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/config"
"github.com/iotexproject/iotex-core/server/itx"
"github.com/iotexproject/iotex-core/testutil"
"github.com/stretchr/testify/require"
)

func newNodeInfoTestConfig(triePath, dBPath, idxDBPath string) (config.Config, func(), error) {
cfg, err := newTestConfig()
if err != nil {
return cfg, nil, err
}
testTriePath, err := testutil.PathOfTempFile(triePath)
if err != nil {
return cfg, nil, err
}
testDBPath, err := testutil.PathOfTempFile(dBPath)
if err != nil {
return cfg, nil, err
}
indexDBPath, err := testutil.PathOfTempFile(idxDBPath)
if err != nil {
return cfg, nil, err
}
cfg.Chain.TrieDBPatchFile = ""
cfg.Chain.TrieDBPath = testTriePath
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = indexDBPath
return cfg, func() {
testutil.CleanupPath(testTriePath)
testutil.CleanupPath(testDBPath)
testutil.CleanupPath(indexDBPath)
}, nil
}

func TestBroadcastNodeInfo(t *testing.T) {
require := require.New(t)

cfgSender, teardown, err := newNodeInfoTestConfig("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.NodeInfo.OnlyDelegateBroadcast = false
cfgSender.NodeInfo.BroadcastNodeInfoInterval = time.Second
cfgSender.Network.ReconnectInterval = 2 * time.Second
srvSender, err := itx.NewServer(cfgSender)
require.NoError(err)
ctxSender := genesis.WithGenesisContext(context.Background(), cfgSender.Genesis)
err = srvSender.Start(ctxSender)
require.NoError(err)
defer func() {
require.NoError(srvSender.Stop(ctxSender))
}()
addrsSender, err := srvSender.P2PAgent().Self()
require.NoError(err)

cfgReciever, teardown2, err := newNodeInfoTestConfig("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.BootstrapNodes = []string{validNetworkAddr(addrsSender)}
cfgReciever.Network.ReconnectInterval = 2 * time.Second
srvReciever, err := itx.NewServer(cfgReciever)
require.NoError(err)
ctxReciever := genesis.WithGenesisContext(context.Background(), cfgReciever.Genesis)
err = srvReciever.Start(ctxReciever)
require.NoError(err)
defer func() {
require.NoError(srvReciever.Stop(ctxReciever))
}()

// check if there is sender's info in reciever delegatemanager
time.Sleep(5 * time.Second)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
addrSender := cfgSender.Chain.ProducerAddress().String()
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).DelegateManager().GetNodeByAddr(addrSender)
require.True(ok)
}

func TestUnicastNodeInfo(t *testing.T) {
require := require.New(t)

cfgReciever, teardown2, err := newNodeInfoTestConfig("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.ReconnectInterval = 2 * time.Second
srvReciever, err := itx.NewServer(cfgReciever)
require.NoError(err)
ctxReciever := genesis.WithGenesisContext(context.Background(), cfgReciever.Genesis)
err = srvReciever.Start(ctxReciever)
require.NoError(err)
defer func() {
require.NoError(srvReciever.Stop(ctxReciever))
}()
addrsReciever, err := srvReciever.P2PAgent().Self()
require.NoError(err)

cfgSender, teardown, err := newNodeInfoTestConfig("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.Network.ReconnectInterval = 2 * time.Second
cfgSender.Network.BootstrapNodes = []string{validNetworkAddr(addrsReciever)}
srvSender, err := itx.NewServer(cfgSender)
require.NoError(err)
ctxSender := genesis.WithGenesisContext(context.Background(), cfgSender.Genesis)
err = srvSender.Start(ctxSender)
require.NoError(err)
defer func() {
require.NoError(srvSender.Stop(ctxSender))
}()

// check if there is reciever's info in sender delegatemanager
time.Sleep(5 * time.Second)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
peerReciever, err := srvReciever.P2PAgent().Info()
require.NoError(err)
dmSender := srvSender.ChainService(cfgSender.Chain.ID).DelegateManager()
err = dmSender.RequestSingleNodeInfoAsync(context.Background(), peerReciever)
require.NoError(err)
time.Sleep(5 * time.Second)
addrReciever := cfgReciever.Chain.ProducerAddress().String()
_, ok := dmSender.GetNodeByAddr(addrReciever)
require.True(ok)
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/iotexproject/iotex-address v0.2.8
github.com/iotexproject/iotex-antenna-go/v2 v2.5.1
github.com/iotexproject/iotex-election v0.3.5-0.20210611041425-20ddf674363d
github.com/iotexproject/iotex-proto v0.5.10
github.com/iotexproject/iotex-proto v0.5.11
github.com/libp2p/go-libp2p-core v0.8.5
github.com/mattn/go-sqlite3 v1.14.8 // indirect
github.com/miguelmota/go-ethereum-hdwallet v0.1.1
Expand Down Expand Up @@ -54,9 +54,11 @@ require (
)

require (
github.com/prometheus/client_model v0.2.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0
golang.org/x/exp v0.0.0-20230111222715-75897c7a292a
golang.org/x/text v0.3.7
)

Expand Down Expand Up @@ -173,7 +175,6 @@ require (
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
Expand All @@ -192,7 +193,7 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
Loading