Skip to content

Commit

Permalink
chore(cometBFTService): simplified CometBFTService (#2026)
Browse files Browse the repository at this point in the history
  • Loading branch information
abi87 authored Oct 1, 2024
1 parent 320256c commit ed6800f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 143 deletions.
190 changes: 81 additions & 109 deletions mod/consensus/pkg/cometbft/service/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ import (
"github.com/sourcegraph/conc/iter"
)

//nolint:gocognit // todo fix.
var (
errInvalidHeight = errors.New("invalid height")
errNilFinalizeBlockState = errors.New("finalizeBlockState is nil")
)

func (s *Service[LoggerT]) InitChain(
_ context.Context,
req *cmtabci.InitChainRequest,
Expand All @@ -56,9 +60,6 @@ func (s *Service[LoggerT]) InitChain(
)
}

// On a new chain, we consider the init chain block height as 0, even though
// req.InitialHeight is 1 by default.
initHeader := cmtproto.Header{ChainID: req.ChainId, Time: req.Time}
s.logger.Info(
"InitChain",
"initialHeight",
Expand All @@ -67,99 +68,92 @@ func (s *Service[LoggerT]) InitChain(
req.ChainId,
)

// Set the initial height, which will be used to determine if we are
// proposing
// Set the initial height, which will be used to determine if we are proposing
// or processing the first block or not.
s.initialHeight = req.InitialHeight
if s.initialHeight == 0 { // If initial height is 0, set it to 1
if s.initialHeight == 0 {
s.initialHeight = 1
}

// if req.InitialHeight is > 1, then we set the initial version on all
// stores
// if req.InitialHeight is > 1, then we set the initial version on all stores
if req.InitialHeight > 1 {
initHeader.Height = req.InitialHeight
if err := s.sm.CommitMultiStore().
SetInitialVersion(req.InitialHeight); err != nil {
return nil, err
}
}

s.setState(execModeFinalize)
s.finalizeBlockState = s.resetState()

defer func() {
// InitChain represents the state of the application BEFORE the first
// block, i.e. the genesis block. This means that when processing the
// app's InitChain handler, the block height is zero by default.
// However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
// However, after genesis is handled the height needs to reflect
// the true block height.
initHeader := cmtproto.Header{
ChainID: req.ChainId,
Time: req.Time,
Height: req.InitialHeight,
}
s.finalizeBlockState.SetContext(
s.finalizeBlockState.Context().WithBlockHeader(initHeader),
)
}()

if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
}

res, err := s.initChainer(s.finalizeBlockState.Context(), req)
resValidators, err := s.initChainer(
s.finalizeBlockState.Context(),
req.AppStateBytes,
)
if err != nil {
return nil, err
}

if res == nil {
return nil, errors.New(
"application init chain handler returned a nil response",
)
}

// check validators
if len(req.Validators) > 0 {
if len(req.Validators) != len(res.Validators) {
if len(req.Validators) != len(resValidators) {
return nil, fmt.Errorf(
"len(RequestInitChain.Validators) != len(GenesisValidators) (%d != %d)",
len(req.Validators),
len(res.Validators),
len(resValidators),
)
}

sort.Sort(cmtabci.ValidatorUpdates(req.Validators))

for i := range res.Validators {
if req.Validators[i].Power != res.Validators[i].Power {
for i := range resValidators {
if req.Validators[i].Power != resValidators[i].Power {
return nil, errors.New("mismatched power")
}
if !bytes.Equal(
req.Validators[i].PubKeyBytes, res.Validators[i].
req.Validators[i].PubKeyBytes, resValidators[i].
PubKeyBytes) {
return nil, errors.New("mismatched pubkey bytes")
}

if req.
Validators[i].PubKeyType != res.
Validators[i].PubKeyType {
if req.Validators[i].PubKeyType !=
resValidators[i].PubKeyType {
return nil, errors.New("mismatched pubkey types")
}
}
}

// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts
// from
// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts from
// this FinalizeBlockState.
return &cmtabci.InitChainResponse{
ConsensusParams: res.ConsensusParams,
Validators: res.Validators,
ConsensusParams: req.ConsensusParams,
Validators: resValidators,
AppHash: s.sm.CommitMultiStore().LastCommitID().Hash,
}, nil
}

// InitChainer initializes the chain.
func (s *Service[LoggerT]) initChainer(
ctx sdk.Context,
req *cmtabci.InitChainRequest,
) (*cmtabci.InitChainResponse, error) {
appStateBytes []byte,
) ([]cmtabci.ValidatorUpdate, error) {
var genesisState map[string]json.RawMessage
if err := json.Unmarshal(req.AppStateBytes, &genesisState); err != nil {
if err := json.Unmarshal(appStateBytes, &genesisState); err != nil {
return nil, err
}
valUpdates, err := s.Middleware.InitGenesis(
Expand All @@ -170,17 +164,10 @@ func (s *Service[LoggerT]) initChainer(
return nil, err
}

convertedValUpdates, err := iter.MapErr(
return iter.MapErr(
valUpdates,
convertValidatorUpdate[cmtabci.ValidatorUpdate],
)
if err != nil {
return nil, err
}

return &cmtabci.InitChainResponse{
Validators: convertedValUpdates,
}, nil
}

func (s *Service[LoggerT]) Info(
Expand Down Expand Up @@ -212,13 +199,18 @@ func (s *Service[LoggerT]) PrepareProposal(
_ context.Context,
req *cmtabci.PrepareProposalRequest,
) (*cmtabci.PrepareProposalResponse, error) {
s.setState(execModePrepareProposal)

// CometBFT must never call PrepareProposal with a height of 0.
if req.Height < 1 {
return nil, errors.New("PrepareProposal called with invalid height")
return nil, fmt.Errorf(
"prepareProposal at height %v: %w",
req.Height,
errInvalidHeight,
)
}

// Always reset state given that PrepareProposal can timeout
// and be called again in a subsequent round.
s.prepareProposalState = s.resetState()
s.prepareProposalState.SetContext(
s.getContextForProposal(
s.prepareProposalState.Context(),
Expand Down Expand Up @@ -260,20 +252,21 @@ func (s *Service[LoggerT]) ProcessProposal(
) (*cmtabci.ProcessProposalResponse, error) {
// CometBFT must never call ProcessProposal with a height of 0.
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
return nil, fmt.Errorf(
"processProposal at height %v: %w",
req.Height,
errInvalidHeight,
)
}

s.setState(execModeProcessProposal)

// Since the application can get access to FinalizeBlock state and write to
// it, we must be sure to reset it in case ProcessProposal timeouts and is
// called
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the
// finalizeState
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
s.processProposalState = s.resetState()
if req.Height > s.initialHeight {
s.setState(execModeFinalize)
s.finalizeBlockState = s.resetState()
}

s.processProposalState.SetContext(
Expand Down Expand Up @@ -308,27 +301,17 @@ func (s *Service[LoggerT]) ProcessProposal(
}

func (s *Service[LoggerT]) internalFinalizeBlock(
ctx context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
if err := s.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if s.finalizeBlockState == nil {
s.setState(execModeFinalize)
}
if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
}

// First check for an abort signal after beginBlock, as it's the first place
// we spend any significant amount of time.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
s.finalizeBlockState = s.resetState()
}

// Iterate over all raw transactions in the proposal and attempt to execute
Expand All @@ -338,14 +321,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock(
// vote extensions, so skip those.
txResults := make([]*cmtabci.ExecTxResult, 0, len(req.Txs))
for range req.Txs {
// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

//nolint:mnd // its okay for now.
txResults = append(txResults, &cmtabci.ExecTxResult{
Codespace: "sdk",
Expand All @@ -372,15 +347,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock(
return nil, err
}

// check after finalizeBlock if we should abort, to avoid propagating the
// result
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

return &cmtabci.FinalizeBlockResponse{
TxResults: txResults,
ValidatorUpdates: valUpdates,
Expand All @@ -392,7 +358,11 @@ func (s *Service[_]) validateFinalizeBlockHeight(
req *cmtabci.FinalizeBlockRequest,
) error {
if req.Height < 1 {
return fmt.Errorf("invalid height: %d", req.Height)
return fmt.Errorf(
"finalizeBlock at height %v: %w",
req.Height,
errInvalidHeight,
)
}

lastBlockHeight := s.LastBlockHeight()
Expand Down Expand Up @@ -428,7 +398,7 @@ func (s *Service[_]) FinalizeBlock(
_ context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
res, err := s.internalFinalizeBlock(context.Background(), req)
res, err := s.internalFinalizeBlock(req)
if res != nil {
res.AppHash = s.workingHash()
}
Expand All @@ -447,7 +417,9 @@ func (s *Service[LoggerT]) Commit(
context.Context, *cmtabci.CommitRequest,
) (*cmtabci.CommitResponse, error) {
if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
// This is unexpected since CometBFT should call Commit only
// after FinalizeBlock has been called. Panic appeases nilaway.
panic(fmt.Errorf("commit: %w", errNilFinalizeBlockState))
}
header := s.finalizeBlockState.Context().BlockHeader()
retainHeight := s.GetBlockRetentionHeight(header.Height)
Expand All @@ -456,16 +428,13 @@ func (s *Service[LoggerT]) Commit(
if ok {
rms.SetCommitHeader(header)
}

s.sm.CommitMultiStore().Commit()

resp := &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}

s.finalizeBlockState = nil

return resp, nil
return &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}, nil
}

// workingHash gets the apphash that will be finalized in commit.
Expand All @@ -478,11 +447,12 @@ func (s *Service[LoggerT]) Commit(
func (s *Service[LoggerT]) workingHash() []byte {
// Write the FinalizeBlock state into branched storage and commit the
// MultiStore. The write to the FinalizeBlock state writes all state
// transitions to the root
// MultiStore (s.sm.CommitMultiStore()) so when Commit() is called it
// persists those values.
// transitions to the root MultiStore (s.sm.CommitMultiStore())
// so when Commit() is called it persists those values.
if s.finalizeBlockState == nil {
panic("workingHash() called before FinalizeBlock()")
// this is unexpected since workingHash is called only after
// internalFinalizeBlock. Panic appeases nilaway.
panic(fmt.Errorf("workingHash: %w", errNilFinalizeBlockState))
}
s.finalizeBlockState.ms.Write()

Expand All @@ -505,14 +475,16 @@ func (s *Service[LoggerT]) getContextForProposal(
ctx sdk.Context,
height int64,
) sdk.Context {
if height == s.initialHeight {
if s.finalizeBlockState == nil {
return ctx
}
ctx, _ = s.finalizeBlockState.Context().CacheContext()
if height != s.initialHeight {
return ctx
}

if s.finalizeBlockState == nil {
// this is unexpected since cometBFT won't call PrepareProposal
// on initialHeight. Panic appeases nilaway.
panic(fmt.Errorf("getContextForProposal: %w", errNilFinalizeBlockState))
}
ctx, _ = s.finalizeBlockState.Context().CacheContext()
return ctx
}

Expand Down
Loading

0 comments on commit ed6800f

Please sign in to comment.