Skip to content

Commit

Permalink
Merge pull request #4 from informalsystems/ph/downtime
Browse files Browse the repository at this point in the history
Add functionality for advancing blocks, advancing time, and causing downtime
  • Loading branch information
p-offtermatt authored Jun 16, 2023
2 parents 031a28e + 9f43139 commit 4c23143
Show file tree
Hide file tree
Showing 8 changed files with 1,269 additions and 330 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV GOFLAGS="-buildvcs=false"

# cache gomodules for cometmock
ADD ./go.mod /go.mod
ADD ./go.sum go.sum
ADD ./go.sum /go.sum
RUN go mod download

# Add CometMock and install it
Expand Down
218 changes: 194 additions & 24 deletions cometmock/abci_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

db "github.com/cometbft/cometbft-db"
abcitypes "github.com/cometbft/cometbft/abci/types"
cryptoenc "github.com/cometbft/cometbft/crypto/encoding"
"github.com/cometbft/cometbft/crypto/merkle"
Expand Down Expand Up @@ -40,7 +41,7 @@ type AbciClient struct {
LastBlock *types.Block
LastCommit *types.Commit
Storage storage.Storage
PrivValidators map[string]types.PrivValidator
PrivValidators map[string]types.PrivValidator // maps validator addresses to their priv validator structs
IndexerService *txindex.IndexerService
TxIndex *indexerkv.TxIndex
BlockIndex *blockindexkv.BlockerIndexer
Expand All @@ -49,6 +50,122 @@ type AbciClient struct {
// can be used to check for nondeterminism in apps, but also slows down execution a bit,
// though performance difference was not measured.
ErrorOnUnequalResponses bool

// validator addresses are mapped to false if they should not be signing, and to true if they should
signingStatus map[string]bool
signingStatusMutex sync.RWMutex

// time offset. whenever we qury the time, we add this offset to it
// this means after modifying this, blocks will have the timestamp offset by this value.
// this will look to the app like one block took a long time to be produced.
timeOffset time.Duration
}

func (a *AbciClient) GetTimeOffset() time.Duration {
return a.timeOffset
}

func (a *AbciClient) IncrementTimeOffset(additionalOffset time.Duration) error {
if additionalOffset < 0 {
a.Logger.Error("time offset cannot be decremented, please provide a positive offset")
return fmt.Errorf("time offset cannot be decremented, please provide a positive offset")
}
a.Logger.Debug("Incrementing time offset", "additionalOffset", additionalOffset.String())
a.timeOffset = a.timeOffset + additionalOffset
return nil
}

// GetSigningStatusMap gets a copy of the signing status map that can be used for reading.
func (a *AbciClient) GetSigningStatusMap() map[string]bool {
a.signingStatusMutex.RLock()
defer a.signingStatusMutex.RUnlock()

statusMap := make(map[string]bool, len(a.signingStatus))
for k, v := range a.signingStatus {
statusMap[k] = v
}
return statusMap
}

// GetSigningStatus gets the signing status of the given address.
func (a *AbciClient) GetSigningStatus(address string) (bool, error) {
a.signingStatusMutex.RLock()
defer a.signingStatusMutex.RUnlock()

status, ok := a.signingStatus[address]
if !ok {
return false, fmt.Errorf("address %s not found in signing status map, please double-check this is the key address of a validator key", address)
}
return status, nil
}

func (a *AbciClient) SetSigningStatus(address string, status bool) error {
a.signingStatusMutex.Lock()
defer a.signingStatusMutex.Unlock()

_, ok := a.signingStatus[address]
if !ok {
return fmt.Errorf("address %s not found in signing status map, please double-check this is the key address of a validator key", address)
}
a.signingStatus[address] = status

a.Logger.Info("Set signing status", "address", address, "status", status)

return nil
}

func CreateAndStartEventBus(logger cometlog.Logger) (*types.EventBus, error) {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
}

func CreateAndStartIndexerService(eventBus *types.EventBus, logger cometlog.Logger) (*txindex.IndexerService, *indexerkv.TxIndex, *blockindexkv.BlockerIndexer, error) {
txIndexer := indexerkv.NewTxIndex(db.NewMemDB())
blockIndexer := blockindexkv.New(db.NewMemDB())

indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))

return indexerService, txIndexer, blockIndexer, indexerService.Start()
}

func NewAbciClient(clients []AbciCounterpartyClient, logger cometlog.Logger, curState state.State, lastBlock *types.Block, lastCommit *types.Commit, storage storage.Storage, privValidators map[string]types.PrivValidator, errorOnUnequalResponses bool) *AbciClient {
signingStatus := make(map[string]bool)
for addr := range privValidators {
signingStatus[addr] = true
}

eventBus, err := CreateAndStartEventBus(logger)
if err != nil {
logger.Error(err.Error())
panic(err)
}

indexerService, txIndex, blockIndex, err := CreateAndStartIndexerService(eventBus, logger)
if err != nil {
logger.Error(err.Error())
panic(err)
}

return &AbciClient{
Clients: clients,
Logger: logger,
CurState: curState,
EventBus: *eventBus,
LastBlock: lastBlock,
LastCommit: lastCommit,
Storage: storage,
PrivValidators: privValidators,
IndexerService: indexerService,
TxIndex: txIndex,
BlockIndex: blockIndex,
ErrorOnUnequalResponses: errorOnUnequalResponses,
signingStatus: signingStatus,
}
}

func (a *AbciClient) RetryDisconnectedClients() {
Expand Down Expand Up @@ -146,7 +263,7 @@ func (a *AbciClient) SendBeginBlock(block *types.Block) (*abcitypes.ResponseBegi
a.Logger.Info("Sending BeginBlock to clients")
}
// build the BeginBlock request
beginBlockRequest := CreateBeginBlockRequest(&block.Header, block.LastCommit)
beginBlockRequest := a.CreateBeginBlockRequest(&block.Header, block.LastCommit)

// send BeginBlock to all clients and collect the responses
f := func(client AbciCounterpartyClient) (interface{}, error) {
Expand All @@ -169,10 +286,33 @@ func (a *AbciClient) SendBeginBlock(block *types.Block) (*abcitypes.ResponseBegi
return responses[0].(*abcitypes.ResponseBeginBlock), nil
}

func CreateBeginBlockRequest(header *types.Header, lastCommit *types.Commit) *abcitypes.RequestBeginBlock {
func (a *AbciClient) CreateBeginBlockRequest(header *types.Header, lastCommit *types.Commit) *abcitypes.RequestBeginBlock {
commitSigs := lastCommit.Signatures

// if this is the first block, LastCommitInfo.Votes will be empty, see https://github.com/cometbft/cometbft/blob/release/v0.34.24/state/execution.go#L342
voteInfos := make([]abcitypes.VoteInfo, len(commitSigs))
if lastCommit.Height != 0 {
for i := range commitSigs {
val := a.CurState.LastValidators.Validators[i]
byteAddress := val.Address.Bytes()

abciVal := abcitypes.Validator{
Address: byteAddress,
Power: val.VotingPower,
}

signedLastBlock := !commitSigs[i].Absent()

voteInfos[i] = abcitypes.VoteInfo{
Validator: abciVal,
SignedLastBlock: signedLastBlock,
}
}
}

return &abcitypes.RequestBeginBlock{
// TODO: fill in Votes
LastCommitInfo: abcitypes.LastCommitInfo{Round: lastCommit.Round, Votes: []abcitypes.VoteInfo{}},
LastCommitInfo: abcitypes.LastCommitInfo{Round: lastCommit.Round, Votes: voteInfos},
Header: *header.ToProto(),
}
}
Expand Down Expand Up @@ -402,14 +542,35 @@ func (a *AbciClient) SendAbciQuery(data []byte, path string, height int64, prove
return response.(*abcitypes.ResponseQuery), nil
}

// RunEmptyBlocks runs a specified number of empty blocks through ABCI.
func (a *AbciClient) RunEmptyBlocks(numBlocks int) error {
for i := 0; i < numBlocks; i++ {
_, _, _, _, _, err := a.RunBlock(nil)
if err != nil {
return err
}
}
return nil
}

// RunBlock runs a block with a specified transaction through the ABCI application.
// It calls RunBlockWithTimeAndProposer with the current time and the LastValidators.Proposer.
func (a *AbciClient) RunBlock(tx *[]byte) (*abcitypes.ResponseBeginBlock, *abcitypes.ResponseCheckTx, *abcitypes.ResponseDeliverTx, *abcitypes.ResponseEndBlock, *abcitypes.ResponseCommit, error) {
return a.RunBlockWithTimeAndProposer(tx, time.Now().Add(a.timeOffset), a.CurState.LastValidators.Proposer)
}

// RunBlock runs a block with a specified transaction through the ABCI application.
// It calls BeginBlock, DeliverTx, EndBlock, Commit and then
// updates the state.
// RunBlock is safe for use by multiple goroutines simultaneously.
func (a *AbciClient) RunBlock(tx *[]byte, blockTime time.Time, proposer *types.Validator) (*abcitypes.ResponseBeginBlock, *abcitypes.ResponseCheckTx, *abcitypes.ResponseDeliverTx, *abcitypes.ResponseEndBlock, *abcitypes.ResponseCommit, error) {
func (a *AbciClient) RunBlockWithTimeAndProposer(tx *[]byte, blockTime time.Time, proposer *types.Validator) (*abcitypes.ResponseBeginBlock, *abcitypes.ResponseCheckTx, *abcitypes.ResponseDeliverTx, *abcitypes.ResponseEndBlock, *abcitypes.ResponseCommit, error) {
// lock mutex to avoid running two blocks at the same time
a.Logger.Debug("Locking mutex")
blockMutex.Lock()

defer blockMutex.Unlock()
defer a.Logger.Debug("Unlocking mutex")

a.Logger.Info("Running block")
if verbose {
a.Logger.Info("State at start of block", "state", a.CurState)
Expand Down Expand Up @@ -452,27 +613,39 @@ func (a *AbciClient) RunBlock(tx *[]byte, blockTime time.Time, proposer *types.V
for index, val := range a.CurState.Validators.Validators {
privVal := a.PrivValidators[val.Address.String()]

// create and sign a precommit
vote := &cmttypes.Vote{
ValidatorAddress: val.Address,
ValidatorIndex: int32(index),
Height: block.Height,
Round: 1,
Timestamp: time.Now(),
Type: cmttypes.PrecommitType,
BlockID: blockId.ToProto(),
}

privVal.SignVote(a.CurState.ChainID, vote)

convertedVote, err := types.VoteFromProto(vote)
shouldSign, err := a.GetSigningStatus(val.Address.String())
if err != nil {
return nil, nil, nil, nil, nil, err
}

commitSig := convertedVote.CommitSig()
if shouldSign {
// create and sign a precommit
vote := &cmttypes.Vote{
ValidatorAddress: val.Address,
ValidatorIndex: int32(index),
Height: block.Height,
Round: 1,
Timestamp: time.Now().Add(a.timeOffset),
Type: cmttypes.PrecommitType,
BlockID: blockId.ToProto(),
}

err = privVal.SignVote(a.CurState.ChainID, vote)
if err != nil {
return nil, nil, nil, nil, nil, err
}

commitSigs = append(commitSigs, commitSig)
convertedVote, err := types.VoteFromProto(vote)
if err != nil {
return nil, nil, nil, nil, nil, err
}

commitSig := convertedVote.CommitSig()

commitSigs = append(commitSigs, commitSig)
} else {
commitSigs = append(commitSigs, types.NewCommitSigAbsent())
}
}

a.LastCommit = types.NewCommit(
Expand Down Expand Up @@ -573,9 +746,6 @@ func (a *AbciClient) RunBlock(tx *[]byte, blockTime time.Time, proposer *types.V
}
a.CurState.AppHash = resCommit.Data

// unlock mutex
blockMutex.Unlock()

return resBeginBlock, resCheckTx, resDeliverTx, resEndBlock, resCommit, nil
}

Expand Down
Loading

0 comments on commit 4c23143

Please sign in to comment.