Skip to content

Commit

Permalink
[consensus] Add proposer role (#3814)
Browse files Browse the repository at this point in the history
* add proposors in consensus

* fix typo proposer

---------

Co-authored-by: CoderZhi <thecoderzhi@gmail.com>
  • Loading branch information
envestcc and CoderZhi authored Mar 6, 2023
1 parent fba0ef9 commit 2fc3ba0
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 210 deletions.
65 changes: 34 additions & 31 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@ func NewConsensus(
var err error
switch cfg.Scheme {
case RollDPoSScheme:
delegatesByEpochFunc := func(epochNum uint64) ([]string, error) {
re := protocol.NewRegistry()
if err := ops.rp.Register(re); err != nil {
return nil, err
}
ctx := genesis.WithGenesisContext(
protocol.WithRegistry(context.Background(), re),
cfg.Genesis,
)
ctx = protocol.WithFeatureWithHeightCtx(ctx)
tipHeight := bc.TipHeight()
tipEpochNum := ops.rp.GetEpochNum(tipHeight)
var candidatesList state.CandidateList
var err error
switch epochNum {
case tipEpochNum:
candidatesList, err = ops.pp.Delegates(ctx, sf)
case tipEpochNum + 1:
candidatesList, err = ops.pp.NextDelegates(ctx, sf)
default:
err = errors.Errorf("invalid epoch number %d compared to tip epoch number %d", epochNum, tipEpochNum)
}
if err != nil {
return nil, err
}
addrs := []string{}
for _, candidate := range candidatesList {
addrs = append(addrs, candidate.Address)
}
return addrs, nil
}
proposersByEpochFunc := delegatesByEpochFunc
bd := rolldpos.NewRollDPoSBuilder().
SetAddr(cfg.Chain.ProducerAddress().String()).
SetPriKey(cfg.Chain.ProducerPrivateKey()).
Expand All @@ -108,37 +140,8 @@ func NewConsensus(
SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())).
SetClock(clock).
SetBroadcast(ops.broadcastHandler).
SetDelegatesByEpochFunc(func(epochNum uint64) ([]string, error) {
re := protocol.NewRegistry()
if err := ops.rp.Register(re); err != nil {
return nil, err
}
ctx := genesis.WithGenesisContext(
protocol.WithRegistry(context.Background(), re),
cfg.Genesis,
)
ctx = protocol.WithFeatureWithHeightCtx(ctx)
tipHeight := bc.TipHeight()
tipEpochNum := ops.rp.GetEpochNum(tipHeight)
var candidatesList state.CandidateList
var err error
switch epochNum {
case tipEpochNum:
candidatesList, err = ops.pp.Delegates(ctx, sf)
case tipEpochNum + 1:
candidatesList, err = ops.pp.NextDelegates(ctx, sf)
default:
err = errors.Errorf("invalid epoch number %d compared to tip epoch number %d", epochNum, tipEpochNum)
}
if err != nil {
return nil, err
}
addrs := []string{}
for _, candidate := range candidatesList {
addrs = append(addrs, candidate.Address)
}
return addrs, nil
}).
SetDelegatesByEpochFunc(delegatesByEpochFunc).
SetProposersByEpochFunc(proposersByEpochFunc).
RegisterProtocol(ops.rp)
// TODO: explorer dependency deleted here at #1085, need to revive by migrating to api
cs.scheme, err = bd.Build()
Expand Down
4 changes: 3 additions & 1 deletion consensus/consensusfsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,13 @@ func (m *ConsensusFSM) prepare(evt fsm.Event) (fsm.State, error) {
}

overtime := m.ctx.WaitUntilRoundStart()
if proposal != nil {
m.ctx.Broadcast(proposal)
}
if !m.ctx.IsDelegate() {
return m.BackToPrepare(0)
}
if proposal != nil {
m.ctx.Broadcast(proposal)
m.ProduceReceiveBlockEvent(proposal)
}

Expand Down
14 changes: 12 additions & 2 deletions consensus/scheme/rolldpos/rolldpos.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ type (
clock clock.Clock
// TODO: explorer dependency deleted at #1085, need to add api params
rp *rolldpos.Protocol
delegatesByEpochFunc DelegatesByEpochFunc
delegatesByEpochFunc NodesSelectionByEpochFunc
proposersByEpochFunc NodesSelectionByEpochFunc
}
)

Expand Down Expand Up @@ -392,12 +393,20 @@ func (b *Builder) SetClock(clock clock.Clock) *Builder {

// SetDelegatesByEpochFunc sets delegatesByEpochFunc
func (b *Builder) SetDelegatesByEpochFunc(
delegatesByEpochFunc DelegatesByEpochFunc,
delegatesByEpochFunc NodesSelectionByEpochFunc,
) *Builder {
b.delegatesByEpochFunc = delegatesByEpochFunc
return b
}

// SetProposersByEpochFunc sets proposersByEpochFunc
func (b *Builder) SetProposersByEpochFunc(
proposersByEpochFunc NodesSelectionByEpochFunc,
) *Builder {
b.proposersByEpochFunc = proposersByEpochFunc
return b
}

// RegisterProtocol sets the rolldpos protocol
func (b *Builder) RegisterProtocol(rp *rolldpos.Protocol) *Builder {
b.rp = rp
Expand Down Expand Up @@ -427,6 +436,7 @@ func (b *Builder) Build() (*RollDPoS, error) {
b.rp,
b.broadcastHandler,
b.delegatesByEpochFunc,
b.proposersByEpochFunc,
b.encodedAddr,
b.priKey,
b.clock,
Expand Down
41 changes: 25 additions & 16 deletions consensus/scheme/rolldpos/rolldpos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestNewRollDPoS(t *testing.T) {
return nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -99,6 +100,7 @@ func TestNewRollDPoS(t *testing.T) {
}).
SetClock(clock.NewMock()).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -119,6 +121,7 @@ func TestNewRollDPoS(t *testing.T) {
}).
SetClock(clock.NewMock()).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.NoError(t, err)
Expand All @@ -134,6 +137,7 @@ func TestNewRollDPoS(t *testing.T) {
return nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
assert.Error(t, err)
Expand Down Expand Up @@ -219,6 +223,14 @@ func TestValidateBlockFooter(t *testing.T) {
g.NumDelegates,
g.NumSubEpochs,
)
delegatesByEpoch := func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}
r, err := NewRollDPoSBuilder().
SetConfig(builderCfg).
SetAddr(identityset.Address(1).String()).
Expand All @@ -227,14 +239,8 @@ func TestValidateBlockFooter(t *testing.T) {
SetBroadcast(func(_ proto.Message) error {
return nil
}).
SetDelegatesByEpochFunc(func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
SetClock(clock).
RegisterProtocol(rp).
Build()
Expand Down Expand Up @@ -306,6 +312,14 @@ func TestRollDPoS_Metrics(t *testing.T) {
g.NumDelegates,
g.NumSubEpochs,
)
delegatesByEpoch := func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}
r, err := NewRollDPoSBuilder().
SetConfig(builderCfg).
SetAddr(identityset.Address(1).String()).
Expand All @@ -315,14 +329,8 @@ func TestRollDPoS_Metrics(t *testing.T) {
return nil
}).
SetClock(clock).
SetDelegatesByEpochFunc(func(uint64) ([]string, error) {
return []string{
candidates[0],
candidates[1],
candidates[2],
candidates[3],
}, nil
}).
SetDelegatesByEpochFunc(delegatesByEpoch).
SetProposersByEpochFunc(delegatesByEpoch).
RegisterProtocol(rp).
Build()
require.NoError(t, err)
Expand Down Expand Up @@ -479,6 +487,7 @@ func TestRollDPoSConsensus(t *testing.T) {
SetChainManager(NewChainManager(chain)).
SetBroadcast(p2p.Broadcast).
SetDelegatesByEpochFunc(delegatesByEpochFunc).
SetProposersByEpochFunc(delegatesByEpochFunc).
RegisterProtocol(rp).
Build()
require.NoError(t, err)
Expand Down
11 changes: 8 additions & 3 deletions consensus/scheme/rolldpos/rolldposctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func init() {
}

type (
// DelegatesByEpochFunc defines a function to overwrite candidates
DelegatesByEpochFunc func(uint64) ([]string, error)
// NodesSelectionByEpochFunc defines a function to select nodes
NodesSelectionByEpochFunc func(uint64) ([]string, error)

// RDPoSCtx is the context of RollDPoS
RDPoSCtx interface {
Expand Down Expand Up @@ -114,7 +114,8 @@ func NewRollDPoSCtx(
blockDeserializer *block.Deserializer,
rp *rolldpos.Protocol,
broadcastHandler scheme.Broadcast,
delegatesByEpochFunc DelegatesByEpochFunc,
delegatesByEpochFunc NodesSelectionByEpochFunc,
proposersByEpochFunc NodesSelectionByEpochFunc,
encodedAddr string,
priKey crypto.PrivateKey,
clock clock.Clock,
Expand All @@ -132,6 +133,9 @@ func NewRollDPoSCtx(
if delegatesByEpochFunc == nil {
return nil, errors.New("delegates by epoch function cannot be nil")
}
if proposersByEpochFunc == nil {
return nil, errors.New("proposers by epoch function cannot be nil")
}
if cfg.AcceptBlockTTL(0)+cfg.AcceptProposalEndorsementTTL(0)+cfg.AcceptLockEndorsementTTL(0)+cfg.CommitTTL(0) > cfg.BlockInterval(0) {
return nil, errors.Errorf(
"invalid ttl config, the sum of ttls should be equal to block interval. acceptBlockTTL %d, acceptProposalEndorsementTTL %d, acceptLockEndorsementTTL %d, commitTTL %d, blockInterval %d",
Expand All @@ -148,6 +152,7 @@ func NewRollDPoSCtx(
}
roundCalc := &roundCalculator{
delegatesByEpochFunc: delegatesByEpochFunc,
proposersByEpochFunc: proposersByEpochFunc,
chain: chain,
rp: rp,
timeBasedRotation: timeBasedRotation,
Expand Down
Loading

0 comments on commit 2fc3ba0

Please sign in to comment.