Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] Add proposer role #3814

Merged
merged 3 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@ func NewConsensus(
var err error
switch cfg.Scheme {
case RollDPoSScheme:
delegatesByEpochFunc := func(epochNum uint64) ([]string, error) {
re := protocol.NewRegistry()
if err := ops.rp.Register(re); err != nil {
return nil, err
}
ctx := genesis.WithGenesisContext(
protocol.WithRegistry(context.Background(), re),
cfg.Genesis,
)
ctx = protocol.WithFeatureWithHeightCtx(ctx)
tipHeight := bc.TipHeight()
tipEpochNum := ops.rp.GetEpochNum(tipHeight)
var candidatesList state.CandidateList
var err error
switch epochNum {
case tipEpochNum:
candidatesList, err = ops.pp.Delegates(ctx, sf)
case tipEpochNum + 1:
candidatesList, err = ops.pp.NextDelegates(ctx, sf)
default:
err = errors.Errorf("invalid epoch number %d compared to tip epoch number %d", epochNum, tipEpochNum)
}
if err != nil {
return nil, err
}
addrs := []string{}
for _, candidate := range candidatesList {
addrs = append(addrs, candidate.Address)
}
return addrs, nil
}
proposersByEpochFunc := delegatesByEpochFunc
bd := rolldpos.NewRollDPoSBuilder().
SetAddr(cfg.Chain.ProducerAddress().String()).
SetPriKey(cfg.Chain.ProducerPrivateKey()).
Expand All @@ -108,37 +140,8 @@ func NewConsensus(
SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())).
SetClock(clock).
SetBroadcast(ops.broadcastHandler).
SetDelegatesByEpochFunc(func(epochNum uint64) ([]string, error) {
re := protocol.NewRegistry()
if err := ops.rp.Register(re); err != nil {
return nil, err
}
ctx := genesis.WithGenesisContext(
protocol.WithRegistry(context.Background(), re),
cfg.Genesis,
)
ctx = protocol.WithFeatureWithHeightCtx(ctx)
tipHeight := bc.TipHeight()
tipEpochNum := ops.rp.GetEpochNum(tipHeight)
var candidatesList state.CandidateList
var err error
switch epochNum {
case tipEpochNum:
candidatesList, err = ops.pp.Delegates(ctx, sf)
case tipEpochNum + 1:
candidatesList, err = ops.pp.NextDelegates(ctx, sf)
default:
err = errors.Errorf("invalid epoch number %d compared to tip epoch number %d", epochNum, tipEpochNum)
}
if err != nil {
return nil, err
}
addrs := []string{}
for _, candidate := range candidatesList {
addrs = append(addrs, candidate.Address)
}
return addrs, nil
}).
SetDelegatesByEpochFunc(delegatesByEpochFunc).
SetProposersByEpochFunc(proposersByEpochFunc).
RegisterProtocol(ops.rp)
// TODO: explorer dependency deleted here at #1085, need to revive by migrating to api
cs.scheme, err = bd.Build()
Expand Down
4 changes: 3 additions & 1 deletion consensus/consensusfsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,13 @@ func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) {
}

overtime := m.ctx.WaitUntilRoundStart()
if proposal != nil {
Copy link
Member

@dustinxie dustinxie Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be removed? since delegates only receive proposal, don't need to broadcast proposal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or as phase 1, ENode and CNode still use the same dpos code?
if that's the plan, need to add the IsENode() in the consensus/dpos module? since lots of code/funcs will only used by CNode

m.ctx.Broadcast(proposal)
}
if !m.ctx.IsDelegate() {
return m.BackToPrepare(0)
}
if proposal != nil {
m.ctx.Broadcast(proposal)
m.ProduceReceiveBlockEvent(proposal)
}

Expand Down
14 changes: 12 additions & 2 deletions consensus/scheme/rolldpos/rolldpos.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ type (
clock clock.Clock
// TODO: explorer dependency deleted at #1085, need to add api params
rp *rolldpos.Protocol
delegatesByEpochFunc DelegatesByEpochFunc
delegatesByEpochFunc NodesSelectionByEpochFunc
proposersByEpochFunc NodesSelectionByEpochFunc
}
)

Expand Down Expand Up @@ -392,12 +393,20 @@ func (b *Builder) SetClock(clock clock.Clock) *Builder {

// SetDelegatesByEpochFunc sets delegatesByEpochFunc
func (b *Builder) SetDelegatesByEpochFunc(
delegatesByEpochFunc DelegatesByEpochFunc,
delegatesByEpochFunc NodesSelectionByEpochFunc,
) *Builder {
b.delegatesByEpochFunc = delegatesByEpochFunc
return b
}

// SetProposersByEpochFunc sets proposersByEpochFunc
func (b *Builder) SetProposersByEpochFunc(
proposersByEpochFunc NodesSelectionByEpochFunc,
) *Builder {
b.proposersByEpochFunc = proposersByEpochFunc
return b
}

// RegisterProtocol sets the rolldpos protocol
func (b *Builder) RegisterProtocol(rp *rolldpos.Protocol) *Builder {
b.rp = rp
Expand Down Expand Up @@ -427,6 +436,7 @@ func (b *Builder) Build() (*RollDPoS, error) {
b.rp,
b.broadcastHandler,
b.delegatesByEpochFunc,
b.proposersByEpochFunc,
b.encodedAddr,
b.priKey,
b.clock,
Expand Down
41 changes: 25 additions & 16 deletions consensus/scheme/rolldpos/rolldpos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestNewRollDPoS(t *testing.T) {
return nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -99,6 +100,7 @@ func TestNewRollDPoS(t *testing.T) {
}).
SetClock(clock.NewMock()).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -119,6 +121,7 @@ func TestNewRollDPoS(t *testing.T) {
}).
SetClock(clock.NewMock()).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -134,6 +137,7 @@ func TestNewRollDPoS(t *testing.T) {
return nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.Error(t, err)
Expand Down Expand Up @@ -219,6 +223,14 @@ func TestValidateBlockFooter(t *testing.T) {
g.NumDelegates,
g.NumSubEpochs,
)
delegatesByEpoch := func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}
r, err := NewRollDPoSBuilder().
SetConfig(builderCfg).
SetAddr(identityset.Address(1).String()).
Expand All @@ -227,14 +239,8 @@ func TestValidateBlockFooter(t *testing.T) {
SetBroadcast(func(_ proto.Message) error {
return nil
}).
SetDelegatesByEpochFunc(func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
SetClock(clock).
RegisterProtocol(rp).
Build()
Expand Down Expand Up @@ -306,6 +312,14 @@ func TestRollDPoS_Metrics(t *testing.T) {
g.NumDelegates,
g.NumSubEpochs,
)
delegatesByEpoch := func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}
r, err := NewRollDPoSBuilder().
SetConfig(builderCfg).
SetAddr(identityset.Address(1).String()).
Expand All @@ -315,14 +329,8 @@ func TestRollDPoS_Metrics(t *testing.T) {
return nil
}).
SetClock(clock).
SetDelegatesByEpochFunc(func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
require.NoError(t, err)
Expand Down Expand Up @@ -479,6 +487,7 @@ func TestRollDPoSConsensus(t *testing.T) {
SetChainManager(NewChainManager(chain)).
SetBroadcast(p2p.Broadcast).
SetDelegatesByEpochFunc(delegatesByEpochFunc).
SetProposersByEpochFunc(delegatesByEpochFunc).
RegisterProtocol(rp).
Build()
require.NoError(t, err)
Expand Down
11 changes: 8 additions & 3 deletions consensus/scheme/rolldpos/rolldposctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func init() {
}

type (
// DelegatesByEpochFunc defines a function to overwrite candidates
DelegatesByEpochFunc func(uint64) ([]string, error)
// NodesSelectionByEpochFunc defines a function to select nodes
NodesSelectionByEpochFunc func(uint64) ([]string, error)

// RDPoSCtx is the context of RollDPoS
RDPoSCtx interface {
Expand Down Expand Up @@ -114,7 +114,8 @@ func NewRollDPoSCtx(
blockDeserializer *block.Deserializer,
rp *rolldpos.Protocol,
broadcastHandler scheme.Broadcast,
delegatesByEpochFunc DelegatesByEpochFunc,
delegatesByEpochFunc NodesSelectionByEpochFunc,
proposersByEpochFunc NodesSelectionByEpochFunc,
encodedAddr string,
priKey crypto.PrivateKey,
clock clock.Clock,
Expand All @@ -132,6 +133,9 @@ func NewRollDPoSCtx(
if delegatesByEpochFunc == nil {
return nil, errors.New("delegates by epoch function cannot be nil")
}
if proposersByEpochFunc == nil {
return nil, errors.New("proposers by epoch function cannot be nil")
}
if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) {
return nil, errors.Errorf(
"invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d",
Expand All @@ -148,6 +152,7 @@ func NewRollDPoSCtx(
}
roundCalc := &roundCalculator{
delegatesByEpochFunc: delegatesByEpochFunc,
proposersByEpochFunc: proposersByEpochFunc,
chain: chain,
rp: rp,
timeBasedRotation: timeBasedRotation,
Expand Down
Loading