Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast node's height info into p2p network #3744

Merged
merged 36 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2412217
init monitor message framework
envestcc Dec 29, 2022
0f70388
init broadcast framework
envestcc Dec 30, 2022
5507558
update broadcast framework
envestcc Dec 30, 2022
47e97d5
update broadcast framework
envestcc Dec 30, 2022
935f9cc
add node package to handle node info manager
envestcc Jan 3, 2023
3ecc432
update
envestcc Jan 3, 2023
1090b2a
add node info request and response message
envestcc Jan 6, 2023
6eb404d
remove delegate manager dependency
envestcc Jan 6, 2023
98884a2
add message sign
envestcc Jan 9, 2023
eb56750
add sign for node info message
envestcc Jan 10, 2023
37af551
fix unittest error
envestcc Jan 10, 2023
96d185a
add unittest for node
envestcc Jan 11, 2023
5c0b500
add pubkey in nodeinfo message
envestcc Jan 11, 2023
b15fea7
Merge branch 'master' into monitor_msg
envestcc Jan 11, 2023
6e191c6
fix compile error
envestcc Jan 11, 2023
4660879
update go.mod
envestcc Jan 11, 2023
4284fa8
refactor node unittest
envestcc Jan 12, 2023
fbb6983
refactor unittest
envestcc Jan 12, 2023
19cf088
convert NodeManager.UpdateNode to private
envestcc Jan 13, 2023
b9cd38a
use recovered pubkey from sign to verify
envestcc Jan 13, 2023
fb8f2a9
fix comment
envestcc Jan 13, 2023
5f9bbfd
simplify finding in array
envestcc Jan 13, 2023
1a10a7b
fix lint error
envestcc Jan 13, 2023
8d53cd4
add unicast single request message
envestcc Jan 16, 2023
ebbec69
node push(broadcast) node info message proactively and periodically
envestcc Jan 18, 2023
dc07084
Merge branch 'master' into monitor_msg
envestcc Jan 19, 2023
cbc2a66
fix comment
envestcc Jan 25, 2023
e09792b
chang node to nodeinfo
envestcc Jan 28, 2023
0016e7f
add e2etest for nodeinfo
envestcc Jan 29, 2023
0c6e7ed
fix comment
envestcc Feb 1, 2023
5b4038e
fix comments
envestcc Feb 2, 2023
e9af1cf
Merge branch 'master' into monitor_msg
envestcc Feb 2, 2023
fd70cd1
update mock
envestcc Feb 2, 2023
9947145
move const var _nodeMapSize to config struct
envestcc Feb 6, 2023
63e56ef
Merge branch 'master' into monitor_msg
envestcc Feb 6, 2023
37fdaaa
Update manager.go
dustinxie Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type (
chainListener apitypes.Listener
electionCommittee committee.Committee
readCache *ReadCache
delegatesMonitor map[string]*iotextypes.ConsensusMessage
}

// jobDesc provides a struct to get and store logs in core.LogsInRange
Expand Down
7 changes: 7 additions & 0 deletions api/serverV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/iotexproject/iotex-core/blocksync"
"github.com/iotexproject/iotex-core/pkg/tracer"
"github.com/iotexproject/iotex-core/state/factory"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

// ServerV2 provides api for user to interact with blockchain data
Expand Down Expand Up @@ -134,3 +135,9 @@ func (svr *ServerV2) ReceiveBlock(blk *block.Block) error {
func (svr *ServerV2) CoreService() CoreService {
return svr.core
}

// HandleMonitorMsg handle monitor msg
func (svr *ServerV2) HandleMonitorMsg(msg *iotextypes.ConsensusMessage) error {
// save msg to core.delegatesMonitor
return nil
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/iotexproject/iotex-core/dispatcher"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/server/cronjob"
)

// IMPORTANT: to define a config, add a field or a new config type to the existing config types. In addition, provide
Expand Down Expand Up @@ -80,6 +81,7 @@ var (
DB: db.DefaultConfig,
Indexer: blockindex.DefaultConfig,
Genesis: genesis.Default,
Cronjob: cronjob.DefaultConfig,
}

// ErrInvalidCfg indicates the invalid config value
Expand Down Expand Up @@ -129,6 +131,7 @@ type (
Log log.GlobalConfig `yaml:"log"`
SubLogs map[string]log.GlobalConfig `yaml:"subLogs"`
Genesis genesis.Genesis `yaml:"genesis"`
Cronjob cronjob.Config `yaml:"cronjob"`
}

// Validate is the interface of validating the config
Expand Down
92 changes: 70 additions & 22 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type Subscriber interface {
HandleConsensusMsg(*iotextypes.ConsensusMessage) error
}

// MsgSubscriber is the dispatcher msg subscriber interface
type MsgSubscriber interface {
HandleMonitorMsg(*iotextypes.ConsensusMessage) error
}

// Dispatcher is used by peers, handles incoming block and header notifications and relays announcements of new blocks.
type Dispatcher interface {
lifecycle.StartStopper
Expand All @@ -67,18 +72,30 @@ type Dispatcher interface {
// HandleTell handles the incoming tell message. The transportation layer semantics is exact once. The sender is
// given for the sake of replying the message
HandleTell(context.Context, uint32, peer.AddrInfo, proto.Message)
// AddMsgSubscriber add msg subscriber
AddMsgSubscriber(MsgSubscriber)
}

var requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
var (
requestMtc = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "iotex_dispatch_request",
Help: "Dispatcher request counter.",
},
[]string{"method", "succeed"},
)
delegateHeightGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iotex_p2p_delegate_height_gauge",
Help: "delegate height",
},
[]string{"address", "version"},
)
)

func init() {
prometheus.MustRegister(requestMtc)
prometheus.MustRegister(delegateHeightGauge)
}

// blockMsg packages a proto block message.
Expand Down Expand Up @@ -118,22 +135,24 @@ func (m actionMsg) ChainID() uint32 {

// IotxDispatcher is the request and event dispatcher for iotx node.
type IotxDispatcher struct {
started int32
shutdown int32
actionChanLock sync.RWMutex
blockChanLock sync.RWMutex
syncChanLock sync.RWMutex
actionChan chan *actionMsg
blockChan chan *blockMsg
syncChan chan *blockSyncMsg
eventAudit map[iotexrpc.MessageType]int
eventAuditLock sync.RWMutex
wg sync.WaitGroup
quit chan struct{}
subscribers map[uint32]Subscriber
subscribersMU sync.RWMutex
peerLastSync map[string]time.Time
syncInterval time.Duration
started int32
shutdown int32
actionChanLock sync.RWMutex
blockChanLock sync.RWMutex
syncChanLock sync.RWMutex
actionChan chan *actionMsg
blockChan chan *blockMsg
syncChan chan *blockSyncMsg
eventAudit map[iotexrpc.MessageType]int
eventAuditLock sync.RWMutex
wg sync.WaitGroup
quit chan struct{}
subscribers map[uint32]Subscriber
subscribersMU sync.RWMutex
peerLastSync map[string]time.Time
syncInterval time.Duration
msgSubscribers []MsgSubscriber
msgSubscribersMU sync.RWMutex
}

// NewDispatcher creates a new Dispatcher
Expand Down Expand Up @@ -161,6 +180,15 @@ func (d *IotxDispatcher) AddSubscriber(
d.subscribersMU.Unlock()
}

// AddMsgSubscriber adds a subscriber to dispatcher
func (d *IotxDispatcher) AddMsgSubscriber(
envestcc marked this conversation as resolved.
Show resolved Hide resolved
subscriber MsgSubscriber,
) {
d.msgSubscribersMU.Lock()
d.msgSubscribers = append(d.msgSubscribers, subscriber)
d.msgSubscribersMU.Unlock()
}

// Start starts the dispatcher.
func (d *IotxDispatcher) Start(ctx context.Context) error {
if atomic.AddInt32(&d.started, 1) != 1 {
Expand Down Expand Up @@ -345,6 +373,10 @@ func (d *IotxDispatcher) dispatchAction(ctx context.Context, chainID uint32, msg
subscriber.ReportFullness(ctx, iotexrpc.MessageType_ACTION, float32(l)/float32(c))
}

func (d *IotxDispatcher) dispatchMonitorMessage(ctx context.Context, peer string, msg proto.Message) {
// save metrics
}

// dispatchBlock adds the passed block message to the news handling queue.
func (d *IotxDispatcher) dispatchBlock(ctx context.Context, chainID uint32, peer string, msg proto.Message) {
if atomic.LoadInt32(&d.shutdown) != 0 {
Expand Down Expand Up @@ -408,8 +440,22 @@ func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint3
subscriber.ReportFullness(ctx, iotexrpc.MessageType_BLOCK_REQUEST, float32(l)/float32(c))
}

func (d *IotxDispatcher) handleBroadcastMsg(ctx context.Context, chainID uint32, peer string, message proto.Message) {
envestcc marked this conversation as resolved.
Show resolved Hide resolved
for _, sub := range d.msgSubscribers {
switch msg := message.(type) {
case *iotextypes.ConsensusMessage:
if err := sub.HandleMonitorMsg(msg); err != nil {
log.L().Debug("Failed to handle consensus message", zap.Error(err))
}
default:
}
}
}

// HandleBroadcast handles incoming broadcast message
func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, peer string, message proto.Message) {
d.handleBroadcastMsg(ctx, chainID, peer, message)
envestcc marked this conversation as resolved.
Show resolved Hide resolved

subscriber := d.subscriber(chainID)
if subscriber == nil {
log.L().Warn("chainID has not been registered in dispatcher.", zap.Uint32("chainID", chainID))
Expand All @@ -425,6 +471,8 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, pe
d.dispatchAction(ctx, chainID, message)
case *iotextypes.Block:
d.dispatchBlock(ctx, chainID, peer, message)
case *iotextypes.ConsensusVote:
d.dispatchMonitorMessage(ctx, peer, msg)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
default:
msgType, _ := goproto.GetTypeFromRPCMsg(message)
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
Expand Down
18 changes: 18 additions & 0 deletions server/cronjob/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2022 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package cronjob

import "time"

// Config is the cronjob config
type Config struct {
MonitorInterval time.Duration `yaml:"monitorInterval"`
}

// DefaultConfig is the default config
var DefaultConfig = Config{
MonitorInterval: 5 * time.Minute,
}
envestcc marked this conversation as resolved.
Show resolved Hide resolved
58 changes: 58 additions & 0 deletions server/cronjob/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2022 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package cronjob

import (
"context"
"time"

"github.com/iotexproject/iotex-core/blockchain"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

// CronJob interface
type CronJob interface {
Run()
Interval() time.Duration
}

envestcc marked this conversation as resolved.
Show resolved Hide resolved
type cronInterval time.Duration

func (i cronInterval) Interval() time.Duration {
return time.Duration(i)
}

type monitorJob struct {
cronInterval
agent p2p.Agent
bc blockchain.Blockchain
}

// NewMonitorCronjob new monitor cronjob
func NewMonitorCronjob(agent p2p.Agent, bc blockchain.Blockchain, interval time.Duration) CronJob {
return &monitorJob{
cronInterval: cronInterval(interval),
agent: agent,
bc: bc,
}
}

func (j *monitorJob) Run() {
// ver := version.PackageVersion
j.agent.BroadcastOutbound(context.Background(), &iotextypes.ConsensusMessage{
Height: j.bc.TipHeight(),
})
}

// NewCronJobs create cronjobs
func NewCronJobs(cfg Config, agent p2p.Agent, bc blockchain.Blockchain) []CronJob {
res := []CronJob{}
if cfg.MonitorInterval > 0 {
res = append(res, NewMonitorCronjob(agent, bc, cfg.MonitorInterval))
}
return res
}
23 changes: 22 additions & 1 deletion server/itx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"github.com/iotexproject/iotex-core/dispatcher"
"github.com/iotexproject/iotex-core/p2p"
"github.com/iotexproject/iotex-core/pkg/ha"
"github.com/iotexproject/iotex-core/pkg/lifecycle"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/pkg/probe"
"github.com/iotexproject/iotex-core/pkg/routine"
"github.com/iotexproject/iotex-core/pkg/util/httputil"
"github.com/iotexproject/iotex-core/server/cronjob"
)

// Server is the iotex server instance containing all components.
Expand All @@ -39,6 +41,7 @@ type Server struct {
initializedSubChains map[uint32]bool
mutex sync.RWMutex
subModuleCancel context.CancelFunc
tasks []lifecycle.StartStopper
}

// NewServer creates a new server
Expand Down Expand Up @@ -88,9 +91,17 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
return nil, errors.Wrap(err, "failed to add api server as subscriber")
}
}

tasks := []lifecycle.StartStopper{}
jobs := cronjob.NewCronJobs(cfg.Cronjob, p2pAgent, cs.Blockchain())
for i := range jobs {
tasks = append(tasks, routine.NewRecurringTask(jobs[i].Run, jobs[i].Interval()))
}

// TODO: explorer dependency deleted here at #1085, need to revive by migrating to api
chains[cs.ChainID()] = cs
dispatcher.AddSubscriber(cs.ChainID(), cs)
dispatcher.AddMsgSubscriber(apiServer)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
svr := Server{
cfg: cfg,
p2pAgent: p2pAgent,
Expand All @@ -99,6 +110,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
chainservices: chains,
apiServers: apiServers,
initializedSubChains: map[uint32]bool{},
tasks: tasks,
}
// Setup sub-chain starter
// TODO: sub-chain infra should use main-chain API instead of protocol directly
Expand All @@ -125,13 +137,22 @@ func (s *Server) Start(ctx context.Context) error {
if err := s.dispatcher.Start(cctx); err != nil {
return errors.Wrap(err, "error when starting dispatcher")
}

for i := range s.tasks {
if err := s.tasks[i].Start(cctx); err != nil {
return errors.Wrapf(err, "error when starting task %v", i)
}
}
return nil
}

// Stop stops the server
func (s *Server) Stop(ctx context.Context) error {
defer s.subModuleCancel()
for i := range s.tasks {
if err := s.tasks[i].Stop(ctx); err != nil {
return errors.Wrapf(err, "error when stopping task %v", i)
}
}
if err := s.p2pAgent.Stop(ctx); err != nil {
// notest
return errors.Wrap(err, "error when stopping P2P agent")
Expand Down