Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast node's height info into p2p network #3744

Merged
merged 36 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2412217
init monitor message framework
envestcc Dec 29, 2022
0f70388
init broadcast framework
envestcc Dec 30, 2022
5507558
update broadcast framework
envestcc Dec 30, 2022
47e97d5
update broadcast framework
envestcc Dec 30, 2022
935f9cc
add node package to handle node info manager
envestcc Jan 3, 2023
3ecc432
update
envestcc Jan 3, 2023
1090b2a
add node info request and response message
envestcc Jan 6, 2023
6eb404d
remove delegate manager dependency
envestcc Jan 6, 2023
98884a2
add message sign
envestcc Jan 9, 2023
eb56750
add sign for node info message
envestcc Jan 10, 2023
37af551
fix unittest error
envestcc Jan 10, 2023
96d185a
add unittest for node
envestcc Jan 11, 2023
5c0b500
add pubkey in nodeinfo message
envestcc Jan 11, 2023
b15fea7
Merge branch 'master' into monitor_msg
envestcc Jan 11, 2023
6e191c6
fix compile error
envestcc Jan 11, 2023
4660879
update go.mod
envestcc Jan 11, 2023
4284fa8
refactor node unittest
envestcc Jan 12, 2023
fbb6983
refactor unittest
envestcc Jan 12, 2023
19cf088
convert NodeManager.UpdateNode to private
envestcc Jan 13, 2023
b9cd38a
use recovered pubkey from sign to verify
envestcc Jan 13, 2023
fb8f2a9
fix comment
envestcc Jan 13, 2023
5f9bbfd
simplify finding in array
envestcc Jan 13, 2023
1a10a7b
fix lint error
envestcc Jan 13, 2023
8d53cd4
add unicast single request message
envestcc Jan 16, 2023
ebbec69
node push(broadcast) node info message proactively and periodically
envestcc Jan 18, 2023
dc07084
Merge branch 'master' into monitor_msg
envestcc Jan 19, 2023
cbc2a66
fix comment
envestcc Jan 25, 2023
e09792b
chang node to nodeinfo
envestcc Jan 28, 2023
0016e7f
add e2etest for nodeinfo
envestcc Jan 29, 2023
0c6e7ed
fix comment
envestcc Feb 1, 2023
5b4038e
fix comments
envestcc Feb 2, 2023
e9af1cf
Merge branch 'master' into monitor_msg
envestcc Feb 2, 2023
fd70cd1
update mock
envestcc Feb 2, 2023
9947145
move const var _nodeMapSize to config struct
envestcc Feb 6, 2023
63e56ef
Merge branch 'master' into monitor_msg
envestcc Feb 6, 2023
37fdaaa
Update manager.go
dustinxie Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/gasstation"
"github.com/iotexproject/iotex-core/node"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/pkg/version"
Expand Down Expand Up @@ -140,8 +141,6 @@ type (
ReceiveBlock(blk *block.Block) error
// BlockHashByBlockHeight returns block hash by block height
BlockHashByBlockHeight(blkHeight uint64) (hash.Hash256, error)
// HandleMonitorMsg handle monitor msg
HandleMonitorMsg(ctx context.Context, peer string, msg *iotextypes.Monitor) error
}

// coreService implements the CoreService interface
Expand All @@ -160,7 +159,7 @@ type (
chainListener apitypes.Listener
electionCommittee committee.Committee
readCache *ReadCache
delegatesMonitor map[string]*iotextypes.Monitor
delegateManager node.NodeManager
}

// jobDesc provides a struct to get and store logs in core.LogsInRange
Expand Down Expand Up @@ -210,6 +209,7 @@ func newCoreService(
bfIndexer blockindex.BloomFilterIndexer,
actPool actpool.ActPool,
registry *protocol.Registry,
dm node.NodeManager,
opts ...Option,
) (CoreService, error) {
if cfg == (Config{}) {
Expand All @@ -222,18 +222,19 @@ func newCoreService(
}

core := coreService{
bc: chain,
bs: bs,
sf: sf,
dao: dao,
indexer: indexer,
bfIndexer: bfIndexer,
ap: actPool,
cfg: cfg,
registry: registry,
chainListener: NewChainListener(500),
gs: gasstation.NewGasStation(chain, dao, cfg.GasStation),
readCache: NewReadCache(),
bc: chain,
bs: bs,
sf: sf,
dao: dao,
indexer: indexer,
bfIndexer: bfIndexer,
ap: actPool,
cfg: cfg,
registry: registry,
chainListener: NewChainListener(500),
gs: gasstation.NewGasStation(chain, dao, cfg.GasStation),
readCache: NewReadCache(),
delegateManager: dm,
}

for _, opt := range opts {
Expand Down Expand Up @@ -1567,9 +1568,3 @@ func (core *coreService) SyncingProgress() (uint64, uint64, uint64) {
startingHeight, currentHeight, targetHeight, _ := core.bs.SyncStatus()
return startingHeight, currentHeight, targetHeight
}

// HandleMonitorMsg handle monitor msg
func (core *coreService) HandleMonitorMsg(ctx context.Context, peer string, msg *iotextypes.Monitor) error {
// update delegateMonitor
return nil
}
10 changes: 3 additions & 7 deletions api/serverV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"github.com/iotexproject/iotex-core/blockchain/blockdao"
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/node"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/state/factory"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

// ServerV2 provides api for user to interact with blockchain data
Expand All @@ -44,9 +44,10 @@ func NewServerV2(
bfIndexer blockindex.BloomFilterIndexer,
actPool actpool.ActPool,
registry *protocol.Registry,
dm node.NodeManager,
opts ...Option,
) (*ServerV2, error) {
coreAPI, err := newCoreService(cfg, chain, bs, sf, dao, indexer, bfIndexer, actPool, registry, opts...)
coreAPI, err := newCoreService(cfg, chain, bs, sf, dao, indexer, bfIndexer, actPool, registry, dm, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,8 +136,3 @@ func (svr *ServerV2) ReceiveBlock(blk *block.Block) error {
func (svr *ServerV2) CoreService() CoreService {
return svr.core
}

// HandleMonitorMsg handle monitor msg
func (svr *ServerV2) HandleMonitorMsg(ctx context.Context, peer string, msg *iotextypes.Monitor) error {
return svr.core.HandleMonitorMsg(ctx, peer, msg)
}
8 changes: 8 additions & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/iotexproject/iotex-core/consensus"
rp "github.com/iotexproject/iotex-core/consensus/scheme/rolldpos"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/node"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/state/factory"
Expand Down Expand Up @@ -377,6 +378,12 @@ func (builder *Builder) createBlockchain(forSubChain, forTest bool) blockchain.B
return blockchain.NewBlockchain(builder.cfg.Chain, builder.cfg.Genesis, builder.cs.blockdao, factory.NewMinter(builder.cs.factory, builder.cs.actpool), chainOpts...)
}

func (builder *Builder) buildNodeManager() {
dm := node.NewDelegateManager(&builder.cfg.Node, builder.cs.consensus, builder.cs.p2pAgent, builder.cs.chain)
builder.cs.delegateManager = dm
builder.cs.lifecycle.Add(dm)
}

func (builder *Builder) buildBlockSyncer() error {
if builder.cs.blocksync != nil {
return nil
Expand Down Expand Up @@ -622,6 +629,7 @@ func (builder *Builder) build(forSubChain, forTest bool) (*ChainService, error)
if err := builder.buildBlockSyncer(); err != nil {
return nil, err
}
builder.buildNodeManager()
cs := builder.cs
builder.cs = nil

Expand Down
29 changes: 11 additions & 18 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/consensus"
"github.com/iotexproject/iotex-core/node"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
Expand Down Expand Up @@ -61,10 +62,6 @@ var (
)
)

type monitorMsgHandler interface {
HandleMonitorMsg(context.Context, string, *iotextypes.Monitor) error
}

func init() {
prometheus.MustRegister(_apiCallWithChainIDMtc)
prometheus.MustRegister(_apiCallWithOutChainIDMtc)
Expand All @@ -88,8 +85,7 @@ type ChainService struct {
candidateIndexer *poll.CandidateIndexer
candBucketsIndexer *staking.CandidatesBucketsIndexer
registry *protocol.Registry
monitorHandler monitorMsgHandler
monitorBroadcaster lifecycle.StartStopper
delegateManager node.NodeManager
}

// Start starts the server
Expand Down Expand Up @@ -169,9 +165,14 @@ func (cs *ChainService) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) err
return cs.consensus.HandleConsensusMsg(msg)
}

// HandleMonitorMsg handles monitor message.
func (cs *ChainService) HandleMonitorMsg(ctx context.Context, peer string, msg *iotextypes.Monitor) error {
return cs.monitorHandler.HandleMonitorMsg(ctx, peer, msg)
// HandleNodeInfoMsg handles nodeinfo message.
func (cs *ChainService) HandleNodeInfoMsg(ctx context.Context, peer string, msg *iotextypes.NodeInfo) error {
cs.delegateManager.UpdateNode(&node.Node{
Addr: peer,
Height: msg.Height,
Version: msg.Version,
})
return nil
}

// ChainID returns ChainID.
Expand Down Expand Up @@ -210,15 +211,6 @@ func (cs *ChainService) BlockSync() blocksync.BlockSync {
// Registry returns a pointer to the registry
func (cs *ChainService) Registry() *protocol.Registry { return cs.registry }

// SetMonitorHandler set the monitor handler
func (cs *ChainService) SetMonitorHandler(handler monitorMsgHandler) { cs.monitorHandler = handler }

// SetMonitorBroadcaster sets the block sync instance
func (cs *ChainService) SetMonitorBroadcaster(mb lifecycle.StartStopper) {
cs.monitorBroadcaster = mb
cs.lifecycle.Add(mb)
}

// NewAPIServer creates a new api server
func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}) (*api.ServerV2, error) {
if cfg.GRPCPort == 0 && cfg.HTTPPort == 0 {
Expand All @@ -242,6 +234,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
cs.bfIndexer,
cs.actpool,
cs.registry,
cs.delegateManager,
apiServerOptions...,
)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/iotexproject/iotex-core/consensus/consensusfsm"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/dispatcher"
"github.com/iotexproject/iotex-core/node"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
)
Expand Down Expand Up @@ -102,9 +103,8 @@ type (
// System is the system config
System struct {
// Active is the status of the node. True means active and false means stand-by
Active bool `yaml:"active"`
HeartbeatInterval time.Duration `yaml:"heartbeatInterval"`
MonitorBroadcastInterval time.Duration `yaml:"monitorBroadcastInterval"`
Active bool `yaml:"active"`
HeartbeatInterval time.Duration `yaml:"heartbeatInterval"`
// HTTPProfilingPort is the port number to access golang performance profiling data of a blockchain node. It is
// 0 by default, meaning performance profiling has been disabled
HTTPAdminPort int `yaml:"httpAdminPort"`
Expand All @@ -130,6 +130,7 @@ type (
Log log.GlobalConfig `yaml:"log"`
SubLogs map[string]log.GlobalConfig `yaml:"subLogs"`
Genesis genesis.Genesis `yaml:"genesis"`
Node node.Config `yaml:"node"`
}

// Validate is the interface of validating the config
Expand Down
15 changes: 3 additions & 12 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Subscriber interface {
HandleBlock(context.Context, string, *iotextypes.Block) error
HandleSyncRequest(context.Context, peer.AddrInfo, *iotexrpc.BlockSync) error
HandleConsensusMsg(*iotextypes.ConsensusMessage) error
HandleMonitorMsg(context.Context, string, *iotextypes.Monitor) error
HandleNodeInfoMsg(context.Context, string, *iotextypes.NodeInfo) error
}

// Dispatcher is used by peers, handles incoming block and header notifications and relays announcements of new blocks.
Expand All @@ -78,18 +78,10 @@ var (
},
[]string{"method", "succeed"},
)
delegateHeightGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iotex_p2p_delegate_height_gauge",
Help: "delegate height",
},
[]string{"address", "version"},
)
)

func init() {
prometheus.MustRegister(requestMtc)
prometheus.MustRegister(delegateHeightGauge)
}

// blockMsg packages a proto block message.
Expand Down Expand Up @@ -436,11 +428,10 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, pe
d.dispatchAction(ctx, chainID, message)
case *iotextypes.Block:
d.dispatchBlock(ctx, chainID, peer, message)
case *iotextypes.Monitor:
if err := subscriber.HandleMonitorMsg(ctx, peer, msg); err != nil {
case *iotextypes.NodeInfo:
if err := subscriber.HandleNodeInfoMsg(ctx, peer, msg); err != nil {
log.L().Debug("Failed to handle monitor message.", zap.Error(err))
}
// update delegateHeightGauge metric
default:
msgType, _ := goproto.GetTypeFromRPCMsg(message)
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
Expand Down
12 changes: 12 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2022 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package node

import "time"

type Config struct {
NodeInfoBroadcastInterval time.Duration `yaml:"nodeInfoBroadcastInterval"`
}
20 changes: 20 additions & 0 deletions node/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2022 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package node

import "github.com/prometheus/client_golang/prometheus"

var nodeDelegateHeightGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iotex_node_delegate_height_gauge",
Help: "delegate height",
},
[]string{"address", "version"},
)

func init() {
prometheus.MustRegister(nodeDelegateHeightGauge)
}
envestcc marked this conversation as resolved.
Show resolved Hide resolved
75 changes: 75 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2022 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package node

import (
"context"

"github.com/iotexproject/iotex-core/blockchain"
"github.com/iotexproject/iotex-core/consensus"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/routine"
"github.com/iotexproject/iotex-core/pkg/version"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

type Node struct {
Addr string
Height uint64
Version string
}

// NodeManager manage nodes on the blockchain
type NodeManager interface {
envestcc marked this conversation as resolved.
Show resolved Hide resolved
lifecycle.StartStopper
UpdateNode(*Node)
GetNode(addr string) *Node
}

type delegateManager struct {
cfg Config
nodeMap map[string]*Node
consensus consensus.Consensus
envestcc marked this conversation as resolved.
Show resolved Hide resolved
broadcaster lifecycle.StartStopper
p2pAgent p2p.Agent
bc blockchain.Blockchain
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

func NewDelegateManager(cfg *Config, consensus consensus.Consensus, p2pAgent p2p.Agent, bc blockchain.Blockchain) NodeManager {
dm := &delegateManager{
cfg: *cfg,
nodeMap: make(map[string]*Node),
consensus: consensus,
p2pAgent: p2pAgent,
bc: bc,
}
dm.broadcaster = routine.NewRecurringTask(dm.broadcast, cfg.NodeInfoBroadcastInterval)
return dm
}

func (dm *delegateManager) Start(ctx context.Context) error {
return dm.broadcaster.Start(ctx)
}
func (dm *delegateManager) Stop(ctx context.Context) error {
envestcc marked this conversation as resolved.
Show resolved Hide resolved
return dm.broadcaster.Stop(ctx)
}
func (dm *delegateManager) UpdateNode(node *Node) {
// update dm.nodeMap
n := *node
dm.nodeMap[node.Addr] = &n
// update metric
nodeDelegateHeightGauge.WithLabelValues("address", node.Addr, "version", node.Version).Set(float64(node.Height))
}
func (dm *delegateManager) GetNode(addr string) *Node {
envestcc marked this conversation as resolved.
Show resolved Hide resolved
return dm.nodeMap[addr]
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}
func (dm *delegateManager) broadcast() {
dm.p2pAgent.BroadcastOutbound(context.Background(), &iotextypes.NodeInfo{
Height: dm.bc.TipHeight(),
Version: version.PackageVersion,
})
}
Loading