Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast node's height info into p2p network #3744

Merged
merged 36 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2412217
init monitor message framework
envestcc Dec 29, 2022
0f70388
init broadcast framework
envestcc Dec 30, 2022
5507558
update broadcast framework
envestcc Dec 30, 2022
47e97d5
update broadcast framework
envestcc Dec 30, 2022
935f9cc
add node package to handle node info manager
envestcc Jan 3, 2023
3ecc432
update
envestcc Jan 3, 2023
1090b2a
add node info request and response message
envestcc Jan 6, 2023
6eb404d
remove delegate manager dependency
envestcc Jan 6, 2023
98884a2
add message sign
envestcc Jan 9, 2023
eb56750
add sign for node info message
envestcc Jan 10, 2023
37af551
fix unittest error
envestcc Jan 10, 2023
96d185a
add unittest for node
envestcc Jan 11, 2023
5c0b500
add pubkey in nodeinfo message
envestcc Jan 11, 2023
b15fea7
Merge branch 'master' into monitor_msg
envestcc Jan 11, 2023
6e191c6
fix compile error
envestcc Jan 11, 2023
4660879
update go.mod
envestcc Jan 11, 2023
4284fa8
refactor node unittest
envestcc Jan 12, 2023
fbb6983
refactor unittest
envestcc Jan 12, 2023
19cf088
convert NodeManager.UpdateNode to private
envestcc Jan 13, 2023
b9cd38a
use recovered pubkey from sign to verify
envestcc Jan 13, 2023
fb8f2a9
fix comment
envestcc Jan 13, 2023
5f9bbfd
simplify finding in array
envestcc Jan 13, 2023
1a10a7b
fix lint error
envestcc Jan 13, 2023
8d53cd4
add unicast single request message
envestcc Jan 16, 2023
ebbec69
node push(broadcast) node info message proactively and periodically
envestcc Jan 18, 2023
dc07084
Merge branch 'master' into monitor_msg
envestcc Jan 19, 2023
cbc2a66
fix comment
envestcc Jan 25, 2023
e09792b
chang node to nodeinfo
envestcc Jan 28, 2023
0016e7f
add e2etest for nodeinfo
envestcc Jan 29, 2023
0c6e7ed
fix comment
envestcc Feb 1, 2023
5b4038e
fix comments
envestcc Feb 2, 2023
e9af1cf
Merge branch 'master' into monitor_msg
envestcc Feb 2, 2023
fd70cd1
update mock
envestcc Feb 2, 2023
9947145
move const var _nodeMapSize to config struct
envestcc Feb 6, 2023
63e56ef
Merge branch 'master' into monitor_msg
envestcc Feb 6, 2023
37fdaaa
Update manager.go
dustinxie Feb 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ func (builder *Builder) createBlockchain(forSubChain, forTest bool) blockchain.B
return blockchain.NewBlockchain(builder.cfg.Chain, builder.cfg.Genesis, builder.cs.blockdao, factory.NewMinter(builder.cs.factory, builder.cs.actpool), chainOpts...)
}

func (builder *Builder) buildDelegateManager() {
dm := nodeinfo.NewDelegateManager(&builder.cfg.NodeInfo, builder.cs.p2pAgent, builder.cs.chain, builder.cfg.Chain.ProducerPrivateKey())
builder.cs.delegateManager = dm
func (builder *Builder) buildNodeInfoManager() {
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, builder.cs.p2pAgent, builder.cs.chain, builder.cfg.Chain.ProducerPrivateKey())
builder.cs.nodeInfoManager = dm
builder.cs.lifecycle.Add(dm)
}

Expand Down Expand Up @@ -629,7 +629,7 @@ func (builder *Builder) build(forSubChain, forTest bool) (*ChainService, error)
if err := builder.buildBlockSyncer(); err != nil {
return nil, err
}
builder.buildDelegateManager()
builder.buildNodeInfoManager()
cs := builder.cs
builder.cs = nil

Expand Down
31 changes: 10 additions & 21 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-address/address"
Expand Down Expand Up @@ -86,7 +85,7 @@ type ChainService struct {
candidateIndexer *poll.CandidateIndexer
candBucketsIndexer *staking.CandidatesBucketsIndexer
registry *protocol.Registry
delegateManager *nodeinfo.DelegateManager
nodeInfoManager *nodeinfo.InfoManager
}

// Start starts the server
Expand Down Expand Up @@ -166,25 +165,15 @@ func (cs *ChainService) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) err
return cs.consensus.HandleConsensusMsg(msg)
}

// HandleNodeInfoMsg handles nodeinfo message.
func (cs *ChainService) HandleNodeInfoMsg(ctx context.Context, peer string, msg *iotextypes.ResponseNodeInfoMessage) error {
cs.delegateManager.HandleNodeInfo(ctx, peer, msg)
// HandleNodeInfo handles nodeinfo message.
func (cs *ChainService) HandleNodeInfo(ctx context.Context, peer string, msg *iotextypes.NodeInfo) error {
cs.nodeInfoManager.HandleNodeInfo(ctx, peer, msg)
return nil
}

// HandleRequestNodeInfoMsg handles request node info message
func (cs *ChainService) HandleRequestNodeInfoMsg(ctx context.Context, peerID string, msg *iotextypes.RequestNodeInfoMessage) error {
peers, err := cs.p2pAgent.ConnectedPeers()
if err != nil {
return errors.Wrap(err, "get connected peers failed")
}
id := slices.IndexFunc(peers, func(p peer.AddrInfo) bool {
return p.ID.Pretty() == peerID
})
if id < 0 {
return errors.Errorf("unicast node info msg failed: target peerID %s is not connected", peerID)
}
return cs.delegateManager.HandleNodeInfoRequest(ctx, peers[id])
// HandleNodeInfoRequest handles request node info message
func (cs *ChainService) HandleNodeInfoRequest(ctx context.Context, peer peer.AddrInfo, msg *iotextypes.NodeInfoRequest) error {
return cs.nodeInfoManager.HandleNodeInfoRequest(ctx, peer)
}

// ChainID returns ChainID.
Expand Down Expand Up @@ -220,9 +209,9 @@ func (cs *ChainService) BlockSync() blocksync.BlockSync {
return cs.blocksync
}

// DelegateManager returns the delegate manager
func (cs *ChainService) DelegateManager() *nodeinfo.DelegateManager {
return cs.delegateManager
// NodeInfoManager returns the delegate manager
func (cs *ChainService) NodeInfoManager() *nodeinfo.InfoManager {
return cs.nodeInfoManager
}

// Registry returns a pointer to the registry
Expand Down
43 changes: 25 additions & 18 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type Subscriber interface {
HandleBlock(context.Context, string, *iotextypes.Block) error
HandleSyncRequest(context.Context, peer.AddrInfo, *iotexrpc.BlockSync) error
HandleConsensusMsg(*iotextypes.ConsensusMessage) error
HandleRequestNodeInfoMsg(context.Context, string, *iotextypes.RequestNodeInfoMessage) error
HandleNodeInfoMsg(context.Context, string, *iotextypes.ResponseNodeInfoMessage) error
HandleNodeInfoRequest(context.Context, peer.AddrInfo, *iotextypes.NodeInfoRequest) error
HandleNodeInfo(context.Context, string, *iotextypes.NodeInfo) error
}

// Dispatcher is used by peers, handles incoming block and header notifications and relays announcements of new blocks.
Expand Down Expand Up @@ -431,8 +431,8 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, pe
d.dispatchAction(ctx, chainID, message)
case *iotextypes.Block:
d.dispatchBlock(ctx, chainID, peer, message)
case *iotextypes.ResponseNodeInfoMessage:
if err := subscriber.HandleNodeInfoMsg(ctx, peer, msg); err != nil {
case *iotextypes.NodeInfo:
if err := subscriber.HandleNodeInfo(ctx, peer, msg); err != nil {
log.L().Warn("Failed to handle node info message.", zap.Error(err))
}
default:
Expand All @@ -452,14 +452,16 @@ func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer pe
d.dispatchBlockSyncReq(ctx, chainID, peer, message)
case iotexrpc.MessageType_BLOCK:
d.dispatchBlock(ctx, chainID, peer.ID.Pretty(), message)
case iotexrpc.MessageType_NODE_INFO_REQUEST, iotexrpc.MessageType_NODE_INFO:
d.dispatchNodeInfo(ctx, chainID, peer.ID.Pretty(), message)
case iotexrpc.MessageType_NODE_INFO_REQUEST:
d.dispatchNodeInfoRequest(ctx, chainID, peer, message.(*iotextypes.NodeInfoRequest))
case iotexrpc.MessageType_NODE_INFO:
d.dispatchNodeInfo(ctx, chainID, peer.ID.Pretty(), message.(*iotextypes.NodeInfo))
default:
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Any("msgType", msgType))
}
}

func (d *IotxDispatcher) dispatchNodeInfo(ctx context.Context, chainID uint32, peer string, message proto.Message) {
func (d *IotxDispatcher) dispatchNodeInfoRequest(ctx context.Context, chainID uint32, peer peer.AddrInfo, message *iotextypes.NodeInfoRequest) {
if !d.IsReady() {
return
}
Expand All @@ -468,17 +470,22 @@ func (d *IotxDispatcher) dispatchNodeInfo(ctx context.Context, chainID uint32, p
log.L().Debug("no subscriber for this chain id, drop the node info", zap.Uint32("chain id", chainID))
return
}
switch msg := message.(type) {
case *iotextypes.ResponseNodeInfoMessage:
if err := subscriber.HandleNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Warn("failed to handle node info message", zap.Error(err))
}
case *iotextypes.RequestNodeInfoMessage:
if err := subscriber.HandleRequestNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Warn("failed to handle request node info message", zap.Error(err))
}
default:
log.L().Warn("Unexpected nodeinfo msgType.", zap.Any("msgType", msg))
if err := subscriber.HandleNodeInfoRequest(ctx, peer, message); err != nil {
log.L().Warn("failed to handle request node info message", zap.Error(err))
}
}

func (d *IotxDispatcher) dispatchNodeInfo(ctx context.Context, chainID uint32, peerID string, message *iotextypes.NodeInfo) {
if !d.IsReady() {
return
}
subscriber := d.subscriber(chainID)
if subscriber == nil {
log.L().Debug("no subscriber for this chain id, drop the node info", zap.Uint32("chain id", chainID))
return
}
if err := subscriber.HandleNodeInfo(ctx, peerID, message); err != nil {
log.L().Warn("failed to handle node info message", zap.Error(err))
}
}

Expand Down
8 changes: 4 additions & 4 deletions dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func setTestCase() []proto.Message {
&iotextypes.Block{},
&iotexrpc.BlockSync{},
&testingpb.TestPayload{},
&iotextypes.RequestNodeInfoMessage{},
&iotextypes.ResponseNodeInfoMessage{},
&iotextypes.NodeInfoRequest{},
&iotextypes.NodeInfo{},
}
}

Expand Down Expand Up @@ -101,10 +101,10 @@ func (ds *dummySubscriber) HandleAction(context.Context, *iotextypes.Action) err

func (ds *dummySubscriber) HandleConsensusMsg(*iotextypes.ConsensusMessage) error { return nil }

func (ds *dummySubscriber) HandleRequestNodeInfoMsg(context.Context, string, *iotextypes.RequestNodeInfoMessage) error {
func (ds *dummySubscriber) HandleNodeInfoRequest(context.Context, peer.AddrInfo, *iotextypes.NodeInfoRequest) error {
return nil
}

func (ds *dummySubscriber) HandleNodeInfoMsg(context.Context, string, *iotextypes.ResponseNodeInfoMessage) error {
func (ds *dummySubscriber) HandleNodeInfo(context.Context, string, *iotextypes.NodeInfo) error {
return nil
}
22 changes: 11 additions & 11 deletions e2etest/nodeinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/require"
)

func newNodeInfoTestConfig(triePath, dBPath, idxDBPath string) (config.Config, func(), error) {
func newConfigForNodeInfoTest(triePath, dBPath, idxDBPath string) (config.Config, func(), error) {
cfg, err := newTestConfig()
if err != nil {
return cfg, nil, err
Expand Down Expand Up @@ -48,10 +48,10 @@ func newNodeInfoTestConfig(triePath, dBPath, idxDBPath string) (config.Config, f
func TestBroadcastNodeInfo(t *testing.T) {
require := require.New(t)

cfgSender, teardown, err := newNodeInfoTestConfig("trie.test", "db.test", "indexdb.test")
cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.NodeInfo.OnlyDelegateBroadcast = false
cfgSender.NodeInfo.EnableBroadcastNodeInfo = true
cfgSender.NodeInfo.BroadcastNodeInfoInterval = time.Second
cfgSender.Network.ReconnectInterval = 2 * time.Second
srvSender, err := itx.NewServer(cfgSender)
Expand All @@ -65,7 +65,7 @@ func TestBroadcastNodeInfo(t *testing.T) {
addrsSender, err := srvSender.P2PAgent().Self()
require.NoError(err)

cfgReciever, teardown2, err := newNodeInfoTestConfig("trie2.test", "db2.test", "indexdb2.test")
cfgReciever, teardown2, err := newConfigForNodeInfoTest("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.BootstrapNodes = []string{validNetworkAddr(addrsSender)}
Expand All @@ -80,16 +80,17 @@ func TestBroadcastNodeInfo(t *testing.T) {
}()

// check if there is sender's info in reciever delegatemanager
time.Sleep(5 * time.Second)
require.NoError(srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager().BroadcastNodeInfo(context.Background()))
time.Sleep(1 * time.Second)
addrSender := cfgSender.Chain.ProducerAddress().String()
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).DelegateManager().GetNodeByAddr(addrSender)
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeByAddr(addrSender)
require.True(ok)
}

func TestUnicastNodeInfo(t *testing.T) {
require := require.New(t)

cfgReciever, teardown2, err := newNodeInfoTestConfig("trie2.test", "db2.test", "indexdb2.test")
cfgReciever, teardown2, err := newConfigForNodeInfoTest("trie2.test", "db2.test", "indexdb2.test")
require.NoError(err)
defer teardown2()
cfgReciever.Network.ReconnectInterval = 2 * time.Second
Expand All @@ -104,7 +105,7 @@ func TestUnicastNodeInfo(t *testing.T) {
addrsReciever, err := srvReciever.P2PAgent().Self()
require.NoError(err)

cfgSender, teardown, err := newNodeInfoTestConfig("trie.test", "db.test", "indexdb.test")
cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test")
require.NoError(err)
defer teardown()
cfgSender.Network.ReconnectInterval = 2 * time.Second
Expand All @@ -119,13 +120,12 @@ func TestUnicastNodeInfo(t *testing.T) {
}()

// check if there is reciever's info in sender delegatemanager
time.Sleep(5 * time.Second)
peerReciever, err := srvReciever.P2PAgent().Info()
require.NoError(err)
dmSender := srvSender.ChainService(cfgSender.Chain.ID).DelegateManager()
dmSender := srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager()
err = dmSender.RequestSingleNodeInfoAsync(context.Background(), peerReciever)
require.NoError(err)
time.Sleep(5 * time.Second)
time.Sleep(1 * time.Second)
addrReciever := cfgReciever.Chain.ProducerAddress().String()
_, ok := dmSender.GetNodeByAddr(addrReciever)
require.True(ok)
Expand Down
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ethereum/go-ethereum v1.10.21
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/mock v1.4.4
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.4.2
Expand All @@ -20,7 +20,7 @@ require (
github.com/iotexproject/iotex-address v0.2.8
github.com/iotexproject/iotex-antenna-go/v2 v2.5.1
github.com/iotexproject/iotex-election v0.3.5-0.20210611041425-20ddf674363d
github.com/iotexproject/iotex-proto v0.5.11
github.com/iotexproject/iotex-proto v0.5.12
envestcc marked this conversation as resolved.
Show resolved Hide resolved
github.com/libp2p/go-libp2p-core v0.8.5
github.com/mattn/go-sqlite3 v1.14.8 // indirect
github.com/miguelmota/go-ethereum-hdwallet v0.1.1
Expand All @@ -45,10 +45,10 @@ require (
go.uber.org/config v1.3.1
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/net v0.0.0-20220607020251-c690dde0001d
golang.org/x/net v0.3.0
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
google.golang.org/genproto v0.0.0-20211223182754-3ac035c7e7cb
google.golang.org/grpc v1.43.0
google.golang.org/genproto v0.0.0-20230127162408-596548ed4efa
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
)
Expand All @@ -58,11 +58,11 @@ require (
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/shirou/gopsutil/v3 v3.22.2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0
golang.org/x/exp v0.0.0-20230111222715-75897c7a292a
golang.org/x/text v0.3.7
golang.org/x/text v0.5.0
)

require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/benbjohnson/clock v1.0.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand All @@ -86,7 +86,6 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
Expand Down Expand Up @@ -193,9 +192,10 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/tools v0.2.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading