From 68cbcf98e0d25dc504e0d2ab7d9a9b30cf3a0e8e Mon Sep 17 00:00:00 2001 From: "mark.lin" Date: Mon, 24 Jul 2017 13:50:22 +0800 Subject: [PATCH] consensus/*, eth, ethstats, miner: add block lock and some features * consensus/istanbul: handle future preprepare * consensus/istanbul: handle request timeout in evnet loop * consensus, eth: start/stop core engine while start/stop mining * eth, ethstats: fix crash while reporting to ethstats * consensus/istanbul, miner: add new event to trigger new block creation * eth, consensus/istanbul: improve sending messages * consensus/istanbul: stop future preprepare timer while stop core * consensus/istanbul: add cache in ecrecover() --- consensus/consensus.go | 12 +- consensus/istanbul/backend.go | 22 +++- consensus/istanbul/backend/api.go | 2 +- consensus/istanbul/backend/backend.go | 117 +++++++++++------ consensus/istanbul/backend/backend_test.go | 81 +++++++++--- consensus/istanbul/backend/engine.go | 115 ++++++++++++----- consensus/istanbul/backend/engine_test.go | 25 ++-- consensus/istanbul/backend/snapshot_test.go | 2 +- consensus/istanbul/core/backlog_test.go | 6 +- consensus/istanbul/core/commit.go | 17 ++- consensus/istanbul/core/commit_test.go | 20 +-- consensus/istanbul/core/core.go | 89 +++++++------ consensus/istanbul/core/core_test.go | 12 +- consensus/istanbul/core/events.go | 2 + consensus/istanbul/core/handler.go | 18 +++ consensus/istanbul/core/prepare.go | 7 +- consensus/istanbul/core/prepare_test.go | 9 ++ consensus/istanbul/core/preprepare.go | 55 +++++++- consensus/istanbul/core/preprepare_test.go | 132 +++++++++++++++++++- consensus/istanbul/core/request.go | 16 ++- consensus/istanbul/core/request_test.go | 5 +- consensus/istanbul/core/roundstate.go | 43 ++++++- consensus/istanbul/core/roundstate_test.go | 39 ++++++ consensus/istanbul/core/testbackend_test.go | 38 ++++-- consensus/istanbul/errors.go | 4 + consensus/istanbul/events.go | 13 +- eth/backend.go | 9 +- eth/istanbul_handler.go | 57 +++------ ethstats/ethstats.go | 3 + miner/worker.go | 12 +- 30 files changed, 733 insertions(+), 249 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 6eed311f9a38..cd2444ffb193 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -106,15 +106,15 @@ type PoW interface { type Istanbul interface { Engine - // Handle a message from peer + // HandleMsg handles a message from peer HandleMsg(pubKey *ecdsa.PublicKey, data []byte) error - // Receive new chain head block - NewChainHead(block *types.Block) + // NewChainHead is called if a new chain head block comes + NewChainHead(block *types.Block) error - // Start the engine - Start(chain ChainReader, inserter func(block *types.Block) error) error + // Start starts the engine + Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error - // Stop the engine + // Stop stops the engine Stop() error } diff --git a/consensus/istanbul/backend.go b/consensus/istanbul/backend.go index 4f63ec29e769..b7bf0e93bfce 100644 --- a/consensus/istanbul/backend.go +++ b/consensus/istanbul/backend.go @@ -17,6 +17,9 @@ package istanbul import ( + "math/big" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" ) @@ -32,21 +35,19 @@ type Backend interface { // EventMux returns the event mux in backend EventMux() *event.TypeMux - // Send sends a message to specific target - Send(payload []byte, target common.Address) error - // Broadcast sends a message to all validators Broadcast(valSet ValidatorSet, payload []byte) error // Commit delivers an approved proposal to backend. // The delivered proposal will be put into blockchain. - Commit(proposal Proposal, seals []byte) error + Commit(proposal Proposal, seals [][]byte) error // NextRound is called when we want to trigger next Seal() NextRound() error - // Verify verifies the proposal. - Verify(Proposal) error + // Verify verifies the proposal. If a consensus.ErrFutureBlock error is returned, + // the time difference of the proposal and current time is also returned. + Verify(Proposal) (time.Duration, error) // Sign signs input data with the backend's private key Sign([]byte) ([]byte, error) @@ -54,4 +55,13 @@ type Backend interface { // CheckSignature verifies the signature by checking if it's signed by // the given validator CheckSignature(data []byte, addr common.Address, sig []byte) error + + // HasBlock checks if the combination of the given hash and height matches any existing blocks + HasBlock(hash common.Hash, number *big.Int) bool + + // GetProposer returns the proposer of the given block height + GetProposer(number uint64) common.Address + + // ParentValidators returns the validator set of the given proposal's parent block + ParentValidators(proposal Proposal) ValidatorSet } diff --git a/consensus/istanbul/backend/api.go b/consensus/istanbul/backend/api.go index 9879dc57168a..ee2a3e9d94db 100644 --- a/consensus/istanbul/backend/api.go +++ b/consensus/istanbul/backend/api.go @@ -26,7 +26,7 @@ import ( // API is a user facing RPC API to dump Istanbul state type API struct { chain consensus.ChainReader - istanbul *simpleBackend + istanbul *backend } // GetSnapshot retrieves the state snapshot at a given block. diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index cf6bd1db934a..2a46e5baa336 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -18,19 +18,21 @@ package backend import ( "crypto/ecdsa" + "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core" "github.com/ethereum/go-ethereum/consensus/istanbul/validator" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/miner" lru "github.com/hashicorp/golang-lru" ) @@ -38,24 +40,26 @@ import ( func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul { // Allocate the snapshot caches and create the engine recents, _ := lru.NewARC(inmemorySnapshots) - backend := &simpleBackend{ + backend := &backend{ config: config, eventMux: eventMux, istanbulEventMux: new(event.TypeMux), privateKey: privateKey, address: crypto.PubkeyToAddress(privateKey.PublicKey), - logger: log.New("backend", "simple"), + logger: log.New(), db: db, commitCh: make(chan *types.Block, 1), recents: recents, candidates: make(map[common.Address]bool), + coreStarted: false, } + backend.core = istanbulCore.New(backend, backend.config) return backend } // ---------------------------------------------------------------------------- -type simpleBackend struct { +type backend struct { config *istanbul.Config eventMux *event.TypeMux istanbulEventMux *event.TypeMux @@ -63,16 +67,16 @@ type simpleBackend struct { address common.Address core istanbulCore.Engine logger log.Logger - quitSync chan struct{} db ethdb.Database - timeout uint64 chain consensus.ChainReader - inserter func(block *types.Block) error + inserter func(types.Blocks) (int, error) // the channels for istanbul engine notifications commitCh chan *types.Block proposedBlockHash common.Hash sealMu sync.Mutex + coreStarted bool + coreMu sync.Mutex // Current list of candidates we are pushing candidates map[common.Address]bool @@ -83,29 +87,18 @@ type simpleBackend struct { } // Address implements istanbul.Backend.Address -func (sb *simpleBackend) Address() common.Address { +func (sb *backend) Address() common.Address { return sb.address } // Validators implements istanbul.Backend.Validators -func (sb *simpleBackend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet { - snap, err := sb.snapshot(sb.chain, proposal.Number().Uint64(), proposal.Hash(), nil) - if err != nil { - return validator.NewSet(nil, sb.config.ProposerPolicy) - } - return snap.ValSet -} - -func (sb *simpleBackend) Send(payload []byte, target common.Address) error { - go sb.eventMux.Post(istanbul.ConsensusDataEvent{ - Target: target, - Data: payload, - }) - return nil +func (sb *backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet { + return sb.getValidators(proposal.Number().Uint64(), proposal.Hash()) } // Broadcast implements istanbul.Backend.Send -func (sb *simpleBackend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error { +func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error { + targets := make(map[common.Address]bool) for _, val := range valSet.List() { if val.Address() == sb.Address() { // send to self @@ -116,14 +109,21 @@ func (sb *simpleBackend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) } else { // send to other peers - sb.Send(payload, val.Address()) + targets[val.Address()] = true } } + + if len(targets) > 0 { + go sb.eventMux.Post(istanbul.ConsensusDataEvent{ + Targets: targets, + Data: payload, + }) + } return nil } // Commit implements istanbul.Backend.Commit -func (sb *simpleBackend) Commit(proposal istanbul.Proposal, seals []byte) error { +func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error { // Check if the proposal is a valid block block := &types.Block{} block, ok := proposal.(*types.Block) @@ -154,49 +154,58 @@ func (sb *simpleBackend) Commit(proposal istanbul.Proposal, seals []byte) error // TODO: how do we check the block is inserted correctly? return nil } - - return sb.inserter(block) + // if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent + if _, err := sb.inserter(types.Blocks{block}); err != nil { + return err + } + msg := istanbul.NewCommittedEvent{ + Block: block, + } + go sb.eventMux.Post(msg) + return nil } -// NextRound will broadcast ChainHeadEvent to trigger next seal() -func (sb *simpleBackend) NextRound() error { +// NextRound will broadcast NewBlockEvent to trigger next seal() +func (sb *backend) NextRound() error { header := sb.chain.CurrentHeader() sb.logger.Debug("NextRound", "address", sb.Address(), "current_hash", header.Hash(), "current_number", header.Number) - go sb.eventMux.Post(core.ChainHeadEvent{}) + go sb.eventMux.Post(miner.NewBlockEvent{}) return nil } // EventMux implements istanbul.Backend.EventMux -func (sb *simpleBackend) EventMux() *event.TypeMux { +func (sb *backend) EventMux() *event.TypeMux { return sb.istanbulEventMux } // Verify implements istanbul.Backend.Verify -func (sb *simpleBackend) Verify(proposal istanbul.Proposal) error { +func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) { // Check if the proposal is a valid block block := &types.Block{} block, ok := proposal.(*types.Block) if !ok { sb.logger.Error("Invalid proposal, %v", proposal) - return errInvalidProposal + return 0, errInvalidProposal } // verify the header of proposed block err := sb.VerifyHeader(sb.chain, block.Header(), false) - // Ignore errEmptyCommittedSeals error because we don't have the committed seals yet - if err != nil && err != errEmptyCommittedSeals { - return err + // ignore errEmptyCommittedSeals error because we don't have the committed seals yet + if err == nil || err == errEmptyCommittedSeals { + return 0, nil + } else if err == consensus.ErrFutureBlock { + return time.Unix(block.Header().Time.Int64(), 0).Sub(now()), consensus.ErrFutureBlock } - return nil + return 0, err } // Sign implements istanbul.Backend.Sign -func (sb *simpleBackend) Sign(data []byte) ([]byte, error) { +func (sb *backend) Sign(data []byte) ([]byte, error) { hashData := crypto.Keccak256([]byte(data)) return crypto.Sign(hashData, sb.privateKey) } // CheckSignature implements istanbul.Backend.CheckSignature -func (sb *simpleBackend) CheckSignature(data []byte, address common.Address, sig []byte) error { +func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byte) error { signer, err := istanbul.GetSignatureAddress(data, sig) if err != nil { log.Error("Failed to get signer address", "err", err) @@ -208,3 +217,33 @@ func (sb *simpleBackend) CheckSignature(data []byte, address common.Address, sig } return nil } + +// HasBlock implements istanbul.Backend.HashBlock +func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool { + return sb.chain.GetHeader(hash, number.Uint64()) != nil +} + +// GetProposer implements istanbul.Backend.GetProposer +func (sb *backend) GetProposer(number uint64) common.Address { + if h := sb.chain.GetHeaderByNumber(number); h != nil { + a, _ := sb.Author(h) + return a + } + return common.Address{} +} + +// ParentValidators implements istanbul.Backend.GetParentValidators +func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet { + if block, ok := proposal.(*types.Block); ok { + return sb.getValidators(block.Number().Uint64()-1, block.ParentHash()) + } + return validator.NewSet(nil, sb.config.ProposerPolicy) +} + +func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet { + snap, err := sb.snapshot(sb.chain, number, hash, nil) + if err != nil { + return validator.NewSet(nil, sb.config.ProposerPolicy) + } + return snap.ValSet +} diff --git a/consensus/istanbul/backend/backend_test.go b/consensus/istanbul/backend/backend_test.go index 92100040ca06..52cbca7ead7a 100644 --- a/consensus/istanbul/backend/backend_test.go +++ b/consensus/istanbul/backend/backend_test.go @@ -33,7 +33,7 @@ import ( ) func TestSign(t *testing.T) { - b, _, _ := newSimpleBackend() + b, _, _ := newBackend() data := []byte("Here is a string....") sig, err := b.Sign(data) if err != nil { @@ -54,7 +54,7 @@ func TestCheckSignature(t *testing.T) { data := []byte("Here is a string....") hashData := crypto.Keccak256([]byte(data)) sig, _ := crypto.Sign(hashData, key) - b, _, _ := newSimpleBackend() + b, _, _ := newBackend() a := getAddress() err := b.CheckSignature(data, a, sig) if err != nil { @@ -68,7 +68,7 @@ func TestCheckSignature(t *testing.T) { } func TestCheckValidatorSignature(t *testing.T) { - _, keys, vset := newSimpleBackend() + _, keys, vset := newBackend() // 1. Positive test: sign with validator's key should succeed data := []byte("dummy data") @@ -113,18 +113,19 @@ func TestCheckValidatorSignature(t *testing.T) { } func TestCommit(t *testing.T) { - backend, _, _ := newSimpleBackend() + backend, _, _ := newBackend() + commitCh := make(chan *types.Block) // Case: it's a proposer, so the backend.commit will receive channel result from backend.Commit function testCases := []struct { expectedErr error - expectedSignature []byte + expectedSignature [][]byte expectedBlock func() *types.Block }{ { // normal case nil, - append([]byte{1}, bytes.Repeat([]byte{0x00}, types.IstanbulExtraSeal-1)...), + [][]byte{append([]byte{1}, bytes.Repeat([]byte{0x00}, types.IstanbulExtraSeal-1)...)}, func() *types.Block { chain, engine := newBlockChain(1) block := makeBlockWithoutSeal(chain, engine, chain.Genesis()) @@ -148,16 +149,10 @@ func TestCommit(t *testing.T) { for _, test := range testCases { expBlock := test.expectedBlock() go func() { - for { - select { - case result := <-backend.commitCh: - if result.Hash() != expBlock.Hash() { - t.Errorf("hash mismatch: have %v, want %v", result.Hash(), expBlock.Hash()) - } - return - case <-time.After(time.Second): - t.Error("unexpected timeout occurs") - } + select { + case result := <-backend.commitCh: + commitCh <- result + return } }() @@ -167,6 +162,56 @@ func TestCommit(t *testing.T) { t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr) } } + + if test.expectedErr == nil { + // to avoid race condition is occurred by goroutine + select { + case result := <-commitCh: + if result.Hash() != expBlock.Hash() { + t.Errorf("hash mismatch: have %v, want %v", result.Hash(), expBlock.Hash()) + } + case <-time.After(10 * time.Second): + t.Fatal("timeout") + } + } + } +} + +func TestHasBlock(t *testing.T) { + chain, engine := newBlockChain(1) + block := makeBlockWithoutSeal(chain, engine, chain.Genesis()) + finalBlock, _ := engine.Seal(chain, block, nil) + chain.InsertChain(types.Blocks{finalBlock}) + if engine.HasBlock(block.Hash(), finalBlock.Number()) { + t.Errorf("error mismatch: have true, want false") + } + if !engine.HasBlock(finalBlock.Hash(), finalBlock.Number()) { + t.Errorf("error mismatch: have false, want true") + } +} + +func TestGetProposer(t *testing.T) { + chain, engine := newBlockChain(1) + block := makeBlock(chain, engine, chain.Genesis()) + chain.InsertChain(types.Blocks{block}) + expected := engine.GetProposer(1) + actual := engine.Address() + if actual != expected { + t.Errorf("proposer mismatch: have %v, want %v", actual.Hex(), expected.Hex()) + } +} + +func TestParentValidators(t *testing.T) { + chain, engine := newBlockChain(1) + block := makeBlock(chain, engine, chain.Genesis()) + chain.InsertChain(types.Blocks{block}) + expected := engine.Validators(block).List() + //Block without seal will make empty validator set + block = makeBlockWithoutSeal(chain, engine, block) + chain.InsertChain(types.Blocks{block}) + actual := engine.ParentValidators(block).List() + if len(expected) != len(actual) || expected[0] != actual[0] { + t.Errorf("validator set mismatch: have %v, want %v", actual, expected) } } @@ -217,10 +262,10 @@ func (slice Keys) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } -func newSimpleBackend() (backend *simpleBackend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) { +func newBackend() (b *backend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) { key, _ := generatePrivateKey() validatorSet, validatorKeys = newTestValidatorSet(5) - backend = &simpleBackend{ + b = &backend{ privateKey: key, logger: log.New("backend", "simple"), commitCh: make(chan *types.Block, 1), diff --git a/consensus/istanbul/backend/engine.go b/consensus/istanbul/backend/engine.go index 87532a69ab9d..f785e16f4cb0 100644 --- a/consensus/istanbul/backend/engine.go +++ b/consensus/istanbul/backend/engine.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + lru "github.com/hashicorp/golang-lru" ) const ( @@ -88,19 +89,22 @@ var ( nonceAuthVote = hexutil.MustDecode("0xffffffffffffffff") // Magic nonce number to vote on adding a new validator nonceDropVote = hexutil.MustDecode("0x0000000000000000") // Magic nonce number to vote on removing a validator. + + inmemoryAddresses = 20 // Number of recent addresses from ecrecover + recentAddresses, _ = lru.NewARC(inmemoryAddresses) ) // Author retrieves the Ethereum address of the account that minted the given // block, which may be different from the header's coinbase if a consensus // engine is based on signatures. -func (sb *simpleBackend) Author(header *types.Header) (common.Address, error) { +func (sb *backend) Author(header *types.Header) (common.Address, error) { return ecrecover(header) } // VerifyHeader checks whether a header conforms to the consensus rules of a // given engine. Verifying the seal may be done optionally here, or explicitly // via the VerifySeal method. -func (sb *simpleBackend) VerifyHeader(chain consensus.ChainReader, header *types.Header, seal bool) error { +func (sb *backend) VerifyHeader(chain consensus.ChainReader, header *types.Header, seal bool) error { return sb.verifyHeader(chain, header, nil) } @@ -108,7 +112,7 @@ func (sb *simpleBackend) VerifyHeader(chain consensus.ChainReader, header *types // caller may optionally pass in a batch of parents (ascending order) to avoid // looking those up from the database. This is useful for concurrently verifying // a batch of new headers. -func (sb *simpleBackend) verifyHeader(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (sb *backend) verifyHeader(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { if header.Number == nil { return errUnknownBlock } @@ -147,7 +151,7 @@ func (sb *simpleBackend) verifyHeader(chain consensus.ChainReader, header *types // rather depend on a batch of previous headers. The caller may optionally pass // in a batch of parents (ascending order) to avoid looking those up from the // database. This is useful for concurrently verifying a batch of new headers. -func (sb *simpleBackend) verifyCascadingFields(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (sb *backend) verifyCascadingFields(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { // The genesis block is the always valid dead-end number := header.Number.Uint64() if number == 0 { @@ -186,7 +190,7 @@ func (sb *simpleBackend) verifyCascadingFields(chain consensus.ChainReader, head // concurrently. The method returns a quit channel to abort the operations and // a results channel to retrieve the async verifications (the order is that of // the input slice). -func (sb *simpleBackend) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { +func (sb *backend) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { abort := make(chan struct{}) results := make(chan error, len(headers)) go func() { @@ -205,7 +209,7 @@ func (sb *simpleBackend) VerifyHeaders(chain consensus.ChainReader, headers []*t // VerifyUncles verifies that the given block's uncles conform to the consensus // rules of a given engine. -func (sb *simpleBackend) VerifyUncles(chain consensus.ChainReader, block *types.Block) error { +func (sb *backend) VerifyUncles(chain consensus.ChainReader, block *types.Block) error { if len(block.Uncles()) > 0 { return errInvalidUncleHash } @@ -213,7 +217,7 @@ func (sb *simpleBackend) VerifyUncles(chain consensus.ChainReader, block *types. } // verifySigner checks whether the signer is in parent's validator set -func (sb *simpleBackend) verifySigner(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (sb *backend) verifySigner(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { // Verifying the genesis block is not supported number := header.Number.Uint64() if number == 0 { @@ -240,7 +244,7 @@ func (sb *simpleBackend) verifySigner(chain consensus.ChainReader, header *types } // verifyCommittedSeals checks whether every committed seal is signed by one of the parent's validators -func (sb *simpleBackend) verifyCommittedSeals(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (sb *backend) verifyCommittedSeals(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { number := header.Number.Uint64() // We don't need to verify committed seals in the genesis block if number == 0 { @@ -293,7 +297,7 @@ func (sb *simpleBackend) verifyCommittedSeals(chain consensus.ChainReader, heade // VerifySeal checks whether the crypto seal on a header is valid according to // the consensus rules of the given engine. -func (sb *simpleBackend) VerifySeal(chain consensus.ChainReader, header *types.Header) error { +func (sb *backend) VerifySeal(chain consensus.ChainReader, header *types.Header) error { // get parent header and ensure the signer is in parent's validator set number := header.Number.Uint64() if number == 0 { @@ -309,7 +313,7 @@ func (sb *simpleBackend) VerifySeal(chain consensus.ChainReader, header *types.H // Prepare initializes the consensus fields of a block header according to the // rules of a particular engine. The changes are executed inline. -func (sb *simpleBackend) Prepare(chain consensus.ChainReader, header *types.Header) error { +func (sb *backend) Prepare(chain consensus.ChainReader, header *types.Header) error { // unused fields, force to set to empty header.Coinbase = common.Address{} header.Nonce = emptyNonce @@ -368,7 +372,7 @@ func (sb *simpleBackend) Prepare(chain consensus.ChainReader, header *types.Head // // Note, the block header and state database might be updated to reflect any // consensus rules that happen at finalization (e.g. block rewards). -func (sb *simpleBackend) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, +func (sb *backend) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { // No block rewards in Istanbul, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) @@ -380,7 +384,7 @@ func (sb *simpleBackend) Finalize(chain consensus.ChainReader, header *types.Hea // Seal generates a new block for the given input block with the local miner's // seal place on top. -func (sb *simpleBackend) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { +func (sb *backend) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { // update the block header timestamp and signature and propose the block to core engine header := block.Header() number := header.Number.Uint64() @@ -440,7 +444,7 @@ func (sb *simpleBackend) Seal(chain consensus.ChainReader, block *types.Block, s } // update timestamp and signature of the block based on its number of transactions -func (sb *simpleBackend) updateBlock(parent *types.Header, block *types.Block) (*types.Block, error) { +func (sb *backend) updateBlock(parent *types.Header, block *types.Block) (*types.Block, error) { // set block period based the number of tx var period uint64 if len(block.Transactions()) == 0 { @@ -471,7 +475,7 @@ func (sb *simpleBackend) updateBlock(parent *types.Header, block *types.Block) ( } // APIs returns the RPC APIs this consensus engine provides. -func (sb *simpleBackend) APIs(chain consensus.ChainReader) []rpc.API { +func (sb *backend) APIs(chain consensus.ChainReader) []rpc.API { return []rpc.API{{ Namespace: "istanbul", Version: "1.0", @@ -481,7 +485,12 @@ func (sb *simpleBackend) APIs(chain consensus.ChainReader) []rpc.API { } // HandleMsg implements consensus.Istanbul.HandleMsg -func (sb *simpleBackend) HandleMsg(pubKey *ecdsa.PublicKey, data []byte) error { +func (sb *backend) HandleMsg(pubKey *ecdsa.PublicKey, data []byte) error { + sb.coreMu.Lock() + defer sb.coreMu.Unlock() + if !sb.coreStarted { + return istanbul.ErrStoppedEngine + } addr := crypto.PubkeyToAddress(*pubKey) // get the latest snapshot curHeader := sb.chain.CurrentHeader() @@ -503,23 +512,41 @@ func (sb *simpleBackend) HandleMsg(pubKey *ecdsa.PublicKey, data []byte) error { } // NewChainHead implements consensus.Istanbul.NewChainHead -func (sb *simpleBackend) NewChainHead(block *types.Block) { +func (sb *backend) NewChainHead(block *types.Block) error { + sb.coreMu.Lock() + defer sb.coreMu.Unlock() + if !sb.coreStarted { + return istanbul.ErrStoppedEngine + } p, err := sb.Author(block.Header()) if err != nil { sb.logger.Error("Failed to get block proposer", "err", err) - return + return err } go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{ Proposal: block, Proposer: p, }) + return nil } // Start implements consensus.Istanbul.Start -func (sb *simpleBackend) Start(chain consensus.ChainReader, inserter func(block *types.Block) error) error { +func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error { + sb.coreMu.Lock() + defer sb.coreMu.Unlock() + if sb.coreStarted { + return istanbul.ErrStartedEngine + } + + // clear previous data + sb.proposedBlockHash = common.Hash{} + if sb.commitCh != nil { + close(sb.commitCh) + } + sb.commitCh = make(chan *types.Block, 1) + sb.chain = chain sb.inserter = inserter - sb.core = istanbulCore.New(sb, sb.config) curHeader := chain.CurrentHeader() lastSequence := new(big.Int).Set(curHeader.Number) @@ -533,19 +560,29 @@ func (sb *simpleBackend) Start(chain consensus.ChainReader, inserter func(block lastProposer = p } block := chain.GetBlock(curHeader.Hash(), lastSequence.Uint64()) - return sb.core.Start(lastSequence, lastProposer, block) + if err := sb.core.Start(lastSequence, lastProposer, block); err != nil { + return err + } + sb.coreStarted = true + return nil } // Stop implements consensus.Istanbul.Stop -func (sb *simpleBackend) Stop() error { - if sb.core != nil { - return sb.core.Stop() +func (sb *backend) Stop() error { + sb.coreMu.Lock() + defer sb.coreMu.Unlock() + if !sb.coreStarted { + return istanbul.ErrStoppedEngine } + if err := sb.core.Stop(); err != nil { + return err + } + sb.coreStarted = false return nil } // snapshot retrieves the authorization snapshot at a given point in time. -func (sb *simpleBackend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) { +func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) { // Search for a snapshot in memory or on disk for checkpoints var ( headers []*types.Header @@ -640,12 +677,23 @@ func sigHash(header *types.Header) (hash common.Hash) { // ecrecover extracts the Ethereum account address from a signed header. func ecrecover(header *types.Header) (common.Address, error) { + hash := header.Hash() + if addr, ok := recentAddresses.Get(hash); ok { + return addr.(common.Address), nil + } + // Retrieve the signature from the header extra-data istanbulExtra, err := types.ExtractIstanbulExtra(header) if err != nil { return common.Address{}, err } - return istanbul.GetSignatureAddress(sigHash(header).Bytes(), istanbulExtra.Seal) + + addr, err := istanbul.GetSignatureAddress(sigHash(header).Bytes(), istanbulExtra.Seal) + if err != nil { + return addr, err + } + recentAddresses.Add(hash, addr) + return addr, nil } // prepareExtra returns a extra-data of the given header and validators @@ -695,21 +743,24 @@ func writeSeal(h *types.Header, seal []byte) error { } // writeCommittedSeals writes the extra-data field of a block header with given committed seals. -func writeCommittedSeals(h *types.Header, committedSeals []byte) error { - if len(committedSeals)%types.IstanbulExtraSeal != 0 { +func writeCommittedSeals(h *types.Header, committedSeals [][]byte) error { + if len(committedSeals) == 0 { return errInvalidCommittedSeals } + for _, seal := range committedSeals { + if len(seal) != types.IstanbulExtraSeal { + return errInvalidCommittedSeals + } + } + istanbulExtra, err := types.ExtractIstanbulExtra(h) if err != nil { return err } - istanbulExtra.CommittedSeal = make([][]byte, len(committedSeals)/types.IstanbulExtraSeal) - for i := 0; i < len(istanbulExtra.CommittedSeal); i++ { - istanbulExtra.CommittedSeal[i] = make([]byte, types.IstanbulExtraSeal) - copy(istanbulExtra.CommittedSeal[i][:], committedSeals[i*types.IstanbulExtraSeal:]) - } + istanbulExtra.CommittedSeal = make([][]byte, len(committedSeals)) + copy(istanbulExtra.CommittedSeal, committedSeals) payload, err := rlp.EncodeToBytes(&istanbulExtra) if err != nil { diff --git a/consensus/istanbul/backend/engine_test.go b/consensus/istanbul/backend/engine_test.go index 1e71eb1904c3..7b9d6792d892 100644 --- a/consensus/istanbul/backend/engine_test.go +++ b/consensus/istanbul/backend/engine_test.go @@ -41,26 +41,19 @@ import ( // in this test, we can set n to 1, and it means we can process Istanbul and commit a // block by one node. Otherwise, if n is larger than 1, we have to generate // other fake events to process Istanbul. -func newBlockChain(n int) (*core.BlockChain, *simpleBackend) { +func newBlockChain(n int) (*core.BlockChain, *backend) { genesis, nodeKeys := getGenesisAndKeys(n) eventMux := new(event.TypeMux) memDB, _ := ethdb.NewMemDatabase() config := istanbul.DefaultConfig // Use the first key as private key - backend := New(config, eventMux, nodeKeys[0], memDB) + b, _ := New(config, eventMux, nodeKeys[0], memDB).(*backend) genesis.MustCommit(memDB) - blockchain, err := core.NewBlockChain(memDB, genesis.Config, backend, eventMux, vm.Config{}) + blockchain, err := core.NewBlockChain(memDB, genesis.Config, b, eventMux, vm.Config{}) if err != nil { panic(err) } - commitBlock := func(block *types.Block) error { - _, err := blockchain.InsertChain([]*types.Block{block}) - return err - } - backend.Start(blockchain, commitBlock) - - b, _ := backend.(*simpleBackend) - + b.Start(blockchain, blockchain.InsertChain) snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil) if err != nil { panic(err) @@ -138,13 +131,13 @@ func makeHeader(parent *types.Block, config *istanbul.Config) *types.Header { return header } -func makeBlock(chain *core.BlockChain, engine *simpleBackend, parent *types.Block) *types.Block { +func makeBlock(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block { block := makeBlockWithoutSeal(chain, engine, parent) block, _ = engine.Seal(chain, block, nil) return block } -func makeBlockWithoutSeal(chain *core.BlockChain, engine *simpleBackend, parent *types.Block) *types.Block { +func makeBlockWithoutSeal(chain *core.BlockChain, engine *backend, parent *types.Block) *types.Block { header := makeHeader(parent, engine.config) engine.Prepare(chain, header) state, _ := chain.StateAt(parent.Root()) @@ -235,7 +228,7 @@ func TestSealCommittedOtherHash(t *testing.T) { if !ok { t.Errorf("unexpected event comes: %v", reflect.TypeOf(ev.Data)) } - engine.Commit(otherBlock, []byte{}) + engine.Commit(otherBlock, [][]byte{}) } eventSub.Unsubscribe() } @@ -572,7 +565,7 @@ func TestWriteCommittedSeals(t *testing.T) { } // normal case - err := writeCommittedSeals(h, expectedCommittedSeal) + err := writeCommittedSeals(h, [][]byte{expectedCommittedSeal}) if err != expectedErr { t.Errorf("error mismatch: have %v, want %v", err, expectedErr) } @@ -588,7 +581,7 @@ func TestWriteCommittedSeals(t *testing.T) { // invalid seal unexpectedCommittedSeal := append(expectedCommittedSeal, make([]byte, 1)...) - err = writeCommittedSeals(h, unexpectedCommittedSeal) + err = writeCommittedSeals(h, [][]byte{unexpectedCommittedSeal}) if err != errInvalidCommittedSeals { t.Errorf("error mismatch: have %v, want %v", err, errInvalidCommittedSeals) } diff --git a/consensus/istanbul/backend/snapshot_test.go b/consensus/istanbul/backend/snapshot_test.go index 7a529396e996..ee9b870b18cc 100644 --- a/consensus/istanbul/backend/snapshot_test.go +++ b/consensus/istanbul/backend/snapshot_test.go @@ -346,7 +346,7 @@ func TestVoting(t *testing.T) { if tt.epoch != 0 { config.Epoch = tt.epoch } - engine := New(config, eventMux, accounts.accounts[tt.validators[0]], db).(*simpleBackend) + engine := New(config, eventMux, accounts.accounts[tt.validators[0]], db).(*backend) chain, err := core.NewBlockChain(db, genesis.Config, engine, eventMux, vm.Config{}) // Assemble a chain of headers from the cast votes diff --git a/consensus/istanbul/core/backlog_test.go b/consensus/istanbul/core/backlog_test.go index c34c734bce46..082ae43e3b05 100644 --- a/consensus/istanbul/core/backlog_test.go +++ b/consensus/istanbul/core/backlog_test.go @@ -37,7 +37,7 @@ func TestCheckMessage(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4)), + }, newTestValidatorSet(4), common.Hash{}, nil), } // invalid view format @@ -209,7 +209,7 @@ func TestProcessFutureBacklog(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4)), + }, newTestValidatorSet(4), common.Hash{}, nil), state: StateAcceptRequest, } c.subscribeEvents() @@ -294,7 +294,7 @@ func testProcessBacklog(t *testing.T, msg *message) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4)), + }, newTestValidatorSet(4), common.Hash{}, nil), } c.subscribeEvents() defer c.unsubscribeEvents() diff --git a/consensus/istanbul/core/commit.go b/consensus/istanbul/core/commit.go index 86c3b639b668..9ff96e811cf7 100644 --- a/consensus/istanbul/core/commit.go +++ b/consensus/istanbul/core/commit.go @@ -19,13 +19,26 @@ package core import ( "reflect" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" ) func (c *core) sendCommit() { + sub := c.current.Subject() + c.broadcastCommit(sub) +} + +func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) { + sub := &istanbul.Subject{ + View: view, + Digest: digest, + } + c.broadcastCommit(sub) +} + +func (c *core) broadcastCommit(sub *istanbul.Subject) { logger := c.logger.New("state", c.state) - sub := c.current.Subject() encodedSubject, err := Encode(sub) if err != nil { logger.Error("Failed to encode", "subject", sub) @@ -60,6 +73,8 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error { // If we already have a proposal, we may have chance to speed up the consensus process // by committing the proposal without prepare messages. if c.current.Commits.Size() > 2*c.valSet.F() && c.state.Cmp(StateCommitted) < 0 { + // Still need to call LockBlock here since state can skip Prepared state and jump directly to Committed state. + c.current.LockHash() c.commit() } diff --git a/consensus/istanbul/core/commit_test.go b/consensus/istanbul/core/commit_test.go index 356de6a47bf8..02107dd8136b 100644 --- a/consensus/istanbul/core/commit_test.go +++ b/consensus/istanbul/core/commit_test.go @@ -17,6 +17,7 @@ package core import ( + "bytes" "math/big" "testing" @@ -177,6 +178,9 @@ OUTER: if err != test.expectedErr { t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue OUTER } } @@ -190,7 +194,9 @@ OUTER: if r0.current.Commits.Size() > 2*r0.valSet.F() { t.Errorf("the size of commit messages should be less than %v", 2*r0.valSet.F()+1) } - + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue } @@ -201,13 +207,10 @@ OUTER: // check signatures large than 2F+1 signedCount := 0 - signers := make([]common.Address, len(v0.committedSeals[0])/common.AddressLength) - for i := 0; i < len(signers); i++ { - copy(signers[i][:], v0.committedSeals[0][i*common.AddressLength:]) - } + committedSeals := v0.committedMsgs[0].committedSeals for _, validator := range r0.valSet.List() { - for _, signer := range signers { - if validator.Address() == signer { + for _, seal := range committedSeals { + if bytes.Compare(validator.Address().Bytes(), seal[:common.AddressLength]) == 0 { signedCount++ break } @@ -216,6 +219,9 @@ OUTER: if signedCount <= 2*r0.valSet.F() { t.Errorf("the expected signed count should be larger than %v, but got %v", 2*r0.valSet.F(), signedCount) } + if !r0.current.IsHashLocked() { + t.Errorf("block should be locked") + } } } diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index bf63717f2f1e..040f6bb5d8f0 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -18,12 +18,14 @@ package core import ( "bytes" + "math" "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" metrics "github.com/ethereum/go-ethereum/metrics" @@ -60,8 +62,9 @@ type core struct { state State logger log.Logger - backend istanbul.Backend - events *event.TypeMuxSubscription + backend istanbul.Backend + events *event.TypeMuxSubscription + futurePreprepareTimer *time.Timer lastProposer common.Address lastProposal istanbul.Proposal @@ -124,22 +127,6 @@ func (c *core) finalizeMessage(msg *message) ([]byte, error) { return payload, nil } -func (c *core) send(msg *message, target common.Address) { - logger := c.logger.New("state", c.state) - - payload, err := c.finalizeMessage(msg) - if err != nil { - logger.Error("Failed to finalize message", "msg", msg, "err", err) - return - } - - // Send payload - if err = c.backend.Send(payload, target); err != nil { - logger.Error("Failed to send message", "msg", msg, "err", err) - return - } -} - func (c *core) broadcast(msg *message) { logger := c.logger.New("state", c.state) @@ -176,12 +163,14 @@ func (c *core) commit() { proposal := c.current.Proposal() if proposal != nil { - var signatures []byte - for _, v := range c.current.Commits.Values() { - signatures = append(signatures, v.CommittedSeal...) + committedSeals := make([][]byte, c.current.Commits.Size()) + for i, v := range c.current.Commits.Values() { + committedSeals[i] = make([]byte, types.IstanbulExtraSeal) + copy(committedSeals[i][:], v.CommittedSeal[:]) } - if err := c.backend.Commit(proposal, signatures); err != nil { + if err := c.backend.Commit(proposal, committedSeals); err != nil { + c.current.UnlockHash() //Unlock block when insertion fails c.sendNextRoundChange() return } @@ -200,13 +189,21 @@ func (c *core) startNewRound(newView *istanbul.View, roundChange bool) { // Clear invalid round change messages c.roundChangeSet = newRoundChangeSet(c.valSet) // New snapshot for new round - c.current = newRoundState(newView, c.valSet) + c.updateRoundState(newView, c.valSet, roundChange) // Calculate new proposer c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64()) c.waitingForRoundChange = false c.setState(StateAcceptRequest) if roundChange && c.isProposer() { - c.backend.NextRound() + // If it is locked, propose the old proposal + if c.current != nil && c.current.IsHashLocked() { + r := &istanbul.Request{ + Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState + } + c.sendPreprepare(r) + } else { + c.backend.NextRound() + } } c.newRoundChangeTimer() @@ -220,13 +217,25 @@ func (c *core) catchUpRound(view *istanbul.View) { c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64()) } c.waitingForRoundChange = true - c.current = newRoundState(view, c.valSet) + + //Needs to keep block lock for round catching up + c.updateRoundState(view, c.valSet, true) c.roundChangeSet.Clear(view.Round) c.newRoundChangeTimer() logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet) } +// updateRoundState updates round state by checking if locking block is necessary +func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) { + // Lock only if both roundChange is true and it is locked + if roundChange && c.current != nil && c.current.IsHashLocked() { + c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare) + } else { + c.current = newRoundState(view, validatorSet, common.Hash{}, nil) + } +} + func (c *core) setState(state State) { if c.state != state { c.state = state @@ -241,27 +250,27 @@ func (c *core) Address() common.Address { return c.address } -func (c *core) newRoundChangeTimer() { +func (c *core) stopFuturePreprepareTimer() { + if c.futurePreprepareTimer != nil { + c.futurePreprepareTimer.Stop() + } +} + +func (c *core) stopTimer() { + c.stopFuturePreprepareTimer() if c.roundChangeTimer != nil { c.roundChangeTimer.Stop() } +} + +func (c *core) newRoundChangeTimer() { + c.stopTimer() // set timeout based on the round number - timeout := time.Duration(c.config.RequestTimeout)*time.Millisecond + time.Duration(c.current.Round().Uint64()*c.config.BlockPeriod)*time.Second + t := uint64(math.Pow(2, float64(c.current.Round().Uint64()))) * c.config.RequestTimeout + timeout := time.Duration(t) * time.Millisecond c.roundChangeTimer = time.AfterFunc(timeout, func() { - // If we're not waiting for round change yet, we can try to catch up - // the max round with F+1 round change message. We only need to catch up - // if the max round is larger than current round. - if !c.waitingForRoundChange { - maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1) - if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 { - c.sendRoundChange(maxRound) - } else { - c.sendNextRoundChange() - } - } else { - c.sendNextRoundChange() - } + c.sendEvent(timeoutEvent{}) }) } diff --git a/consensus/istanbul/core/core_test.go b/consensus/istanbul/core/core_test.go index 81463fb4a39c..b29653d2d956 100644 --- a/consensus/istanbul/core/core_test.go +++ b/consensus/istanbul/core/core_test.go @@ -69,14 +69,14 @@ func TestNewRequest(t *testing.T) { } for _, backend := range sys.backends { - if len(backend.commitMsgs) != 2 { - t.Errorf("the number of executed requests mismatch: have %v, want 2", len(backend.commitMsgs)) + if len(backend.committedMsgs) != 2 { + t.Errorf("the number of executed requests mismatch: have %v, want 2", len(backend.committedMsgs)) } - if !reflect.DeepEqual(request1.Number(), backend.commitMsgs[0].Number()) { - t.Errorf("the number of requests mismatch: have %v, want %v", request1.Number(), backend.commitMsgs[0].Number()) + if !reflect.DeepEqual(request1.Number(), backend.committedMsgs[0].commitProposal.Number()) { + t.Errorf("the number of requests mismatch: have %v, want %v", request1.Number(), backend.committedMsgs[0].commitProposal.Number()) } - if !reflect.DeepEqual(request2.Number(), backend.commitMsgs[1].Number()) { - t.Errorf("the number of requests mismatch: have %v, want %v", request2.Number(), backend.commitMsgs[1].Number()) + if !reflect.DeepEqual(request2.Number(), backend.committedMsgs[1].commitProposal.Number()) { + t.Errorf("the number of requests mismatch: have %v, want %v", request2.Number(), backend.committedMsgs[1].commitProposal.Number()) } } } diff --git a/consensus/istanbul/core/events.go b/consensus/istanbul/core/events.go index 494e045557a0..c3292fa17460 100644 --- a/consensus/istanbul/core/events.go +++ b/consensus/istanbul/core/events.go @@ -24,3 +24,5 @@ type backlogEvent struct { src istanbul.Validator msg *message } + +type timeoutEvent struct{} diff --git a/consensus/istanbul/core/handler.go b/consensus/istanbul/core/handler.go index 3bdc3c14f20c..8709a704c2af 100644 --- a/consensus/istanbul/core/handler.go +++ b/consensus/istanbul/core/handler.go @@ -46,6 +46,7 @@ func (c *core) Start(lastSequence *big.Int, lastProposer common.Address, lastPro // Stop implements core.Engine.Stop func (c *core) Stop() error { + c.stopTimer() c.unsubscribeEvents() return nil } @@ -61,6 +62,7 @@ func (c *core) subscribeEvents() { istanbul.FinalCommittedEvent{}, // internal events backlogEvent{}, + timeoutEvent{}, ) } @@ -88,6 +90,8 @@ func (c *core) handleEvents() { case backlogEvent: // No need to check signature for internal messages c.handleCheckedMsg(ev.msg, ev.src) + case timeoutEvent: + c.handleTimeoutMsg() } } } @@ -145,3 +149,17 @@ func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error { return errInvalidMessage } + +func (c *core) handleTimeoutMsg() { + // If we're not waiting for round change yet, we can try to catch up + // the max round with F+1 round change message. We only need to catch up + // if the max round is larger than current round. + if !c.waitingForRoundChange { + maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1) + if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 { + c.sendRoundChange(maxRound) + return + } + } + c.sendNextRoundChange() +} diff --git a/consensus/istanbul/core/prepare.go b/consensus/istanbul/core/prepare.go index e61d1054ea34..e3a19266ef41 100644 --- a/consensus/istanbul/core/prepare.go +++ b/consensus/istanbul/core/prepare.go @@ -49,15 +49,18 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error { return err } + // If it is locked, it can only process on the locked block. + // Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepare step. if err := c.verifyPrepare(prepare, src); err != nil { return err } c.acceptPrepare(msg, src) - // Change to StatePrepared if we've received enough prepare messages + // Change to StatePrepared if we've received enough prepare messages or it is locked // and we are in earlier state before StatePrepared - if c.current.Prepares.Size() > 2*c.valSet.F() && c.state.Cmp(StatePrepared) < 0 { + if (c.current.IsHashLocked() || c.current.Prepares.Size() > 2*c.valSet.F()) && c.state.Cmp(StatePrepared) < 0 { + c.current.LockHash() c.setState(StatePrepared) c.sendCommit() } diff --git a/consensus/istanbul/core/prepare_test.go b/consensus/istanbul/core/prepare_test.go index a41c22d12c72..deb7705a267e 100644 --- a/consensus/istanbul/core/prepare_test.go +++ b/consensus/istanbul/core/prepare_test.go @@ -201,6 +201,9 @@ OUTER: if err != test.expectedErr { t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue OUTER } } @@ -214,6 +217,9 @@ OUTER: if r0.current.Prepares.Size() > 2*r0.valSet.F() { t.Errorf("the size of prepare messages should be less than %v", 2*r0.valSet.F()+1) } + if r0.current.IsHashLocked() { + t.Errorf("block should not be locked") + } continue } @@ -246,6 +252,9 @@ OUTER: if !reflect.DeepEqual(m, expectedSubject) { t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject) } + if !r0.current.IsHashLocked() { + t.Errorf("block should be locked") + } } } diff --git a/consensus/istanbul/core/preprepare.go b/consensus/istanbul/core/preprepare.go index 0f5356f4a683..b1ceb6c0bf2c 100644 --- a/consensus/istanbul/core/preprepare.go +++ b/consensus/istanbul/core/preprepare.go @@ -19,6 +19,7 @@ package core import ( "time" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" ) @@ -55,7 +56,21 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error { } // Ensure we have the same view with the preprepare message + // If it is old message, see if we need to broadcast COMMIT if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil { + if err == errOldMessage { + // Get validator set for the given proposal + valSet := c.backend.ParentValidators(preprepare.Proposal).Copy() + previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1) + valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64()) + // Broadcast COMMIT if it is an existing block + // 1. The proposer needs to be a proposer matches the given (Sequence + Round) + // 2. The given block must exist + if valSet.IsProposer(src.Address()) && c.backend.HasBlock(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) { + c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash()) + return nil + } + } return err } @@ -66,16 +81,44 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error { } // Verify the proposal we received - if err := c.backend.Verify(preprepare.Proposal); err != nil { - logger.Warn("Failed to verify proposal", "err", err) - c.sendNextRoundChange() + if duration, err := c.backend.Verify(preprepare.Proposal); err != nil { + logger.Warn("Failed to verify proposal", "err", err, "duration", duration) + // if it's a future block, we will handle it again after the duration + if err == consensus.ErrFutureBlock { + c.stopFuturePreprepareTimer() + c.futurePreprepareTimer = time.AfterFunc(duration, func() { + c.sendEvent(backlogEvent{ + src: src, + msg: msg, + }) + }) + } else { + c.sendNextRoundChange() + } return err } + // Here is about to accept the preprepare if c.state == StateAcceptRequest { - c.acceptPreprepare(preprepare) - c.setState(StatePreprepared) - c.sendPrepare() + // If it is locked, it can only process on the locked block + // Otherwise, broadcast PREPARE and enter Prepared state + if c.current.IsHashLocked() { + // Broadcast COMMIT directly if the proposal matches the locked block + // Otherwise, send ROUND CHANGE + if preprepare.Proposal.Hash() == c.current.GetLockedHash() { + // Broadcast COMMIT and enters Prepared state directly + c.acceptPreprepare(preprepare) + c.setState(StatePrepared) + c.sendCommit() + } else { + // Send round change + c.sendNextRoundChange() + } + } else { + c.acceptPreprepare(preprepare) + c.setState(StatePreprepared) + c.sendPrepare() + } } return nil diff --git a/consensus/istanbul/core/preprepare_test.go b/consensus/istanbul/core/preprepare_test.go index eb86e550fca8..3abe0069d841 100644 --- a/consensus/istanbul/core/preprepare_test.go +++ b/consensus/istanbul/core/preprepare_test.go @@ -39,6 +39,7 @@ func TestHandlePreprepare(t *testing.T) { system *testSystem expectedRequest istanbul.Proposal expectedErr error + existingBlock bool }{ { // normal case @@ -56,6 +57,7 @@ func TestHandlePreprepare(t *testing.T) { }(), newTestProposal(), nil, + false, }, { // future message @@ -84,6 +86,7 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errFutureMessage, + false, }, { // non-proposer @@ -105,6 +108,7 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errNotFromProposer, + false, }, { // ErrInvalidMessage @@ -124,6 +128,27 @@ func TestHandlePreprepare(t *testing.T) { }(), makeBlock(1), errOldMessage, + false, + }, + { + // ErrInvalidMessage + func() *testSystem { + sys := NewTestSystemWithBackend(N, F) + + for i, backend := range sys.backends { + c := backend.engine.(*core) + c.valSet = backend.peers + if i != 0 { + c.state = StatePreprepared + c.current.SetSequence(big.NewInt(10)) + c.current.SetRound(big.NewInt(10)) + } + } + return sys + }(), + makeBlock(5), //only height 5 will retrun true on backend.HasBlock, see testSystemBackend.HashBlock + nil, + true, }, } @@ -167,7 +192,7 @@ OUTER: t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared) } - if !reflect.DeepEqual(c.current.Subject().View, curView) { + if !test.existingBlock && !reflect.DeepEqual(c.current.Subject().View, curView) { t.Errorf("view mismatch: have %v, want %v", c.current.Subject().View, curView) } @@ -178,17 +203,116 @@ OUTER: t.Errorf("error mismatch: have %v, want nil", err) } - if decodedMsg.Code != msgPrepare { - t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, msgPrepare) + expectedCode := msgPrepare + if test.existingBlock { + expectedCode = msgCommit + } + if decodedMsg.Code != expectedCode { + t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, expectedCode) } + var subject *istanbul.Subject err = decodedMsg.Decode(&subject) if err != nil { t.Errorf("error mismatch: have %v, want nil", err) } - if !reflect.DeepEqual(subject, c.current.Subject()) { + if !test.existingBlock && !reflect.DeepEqual(subject, c.current.Subject()) { t.Errorf("subject mismatch: have %v, want %v", subject, c.current.Subject()) } + + } + } +} + +func TestHandlePreprepareWithLock(t *testing.T) { + N := uint64(4) // replica 0 is primary, it will send messages to others + F := uint64(1) // F does not affect tests + proposal := newTestProposal() + mismatchProposal := makeBlock(10) + newSystem := func() *testSystem { + sys := NewTestSystemWithBackend(N, F) + + for i, backend := range sys.backends { + c := backend.engine.(*core) + c.valSet = backend.peers + if i != 0 { + c.state = StateAcceptRequest + } + c.roundChangeSet = newRoundChangeSet(c.valSet) + } + return sys + } + + testCases := []struct { + system *testSystem + proposal istanbul.Proposal + lockProposal istanbul.Proposal + }{ + { + newSystem(), + proposal, + proposal, + }, + { + newSystem(), + proposal, + mismatchProposal, + }, + } + + for _, test := range testCases { + test.system.Run(false) + v0 := test.system.backends[0] + r0 := v0.engine.(*core) + curView := r0.currentView() + preprepare := &istanbul.Preprepare{ + View: curView, + Proposal: test.proposal, + } + lockPreprepare := &istanbul.Preprepare{ + View: curView, + Proposal: test.lockProposal, + } + + for i, v := range test.system.backends { + // i == 0 is primary backend, it is responsible for send preprepare messages to others. + if i == 0 { + continue + } + + c := v.engine.(*core) + c.current.SetPreprepare(lockPreprepare) + c.current.LockHash() + m, _ := Encode(preprepare) + _, val := r0.valSet.GetByAddress(v0.Address()) + if err := c.handlePreprepare(&message{ + Code: msgPreprepare, + Msg: m, + Address: v0.Address(), + }, val); err != nil { + t.Errorf("error mismatch: have %v, want nil", err) + } + if test.proposal == test.lockProposal { + if c.state != StatePrepared { + t.Errorf("state mismatch: have %v, want %v", c.state, StatePrepared) + } + if !reflect.DeepEqual(curView, c.currentView()) { + t.Errorf("view mismatch: have %v, want %v", c.currentView(), curView) + } + } else { + // Should stay at StateAcceptRequest + if c.state != StateAcceptRequest { + t.Errorf("state mismatch: have %v, want %v", c.state, StateAcceptRequest) + } + // Should have triggered a round change + expectedView := &istanbul.View{ + Sequence: curView.Sequence, + Round: big.NewInt(1), + } + if !reflect.DeepEqual(expectedView, c.currentView()) { + t.Errorf("view mismatch: have %v, want %v", c.currentView(), expectedView) + } + } } } } diff --git a/consensus/istanbul/core/request.go b/consensus/istanbul/core/request.go index f2e652252064..f99b4956ca25 100644 --- a/consensus/istanbul/core/request.go +++ b/consensus/istanbul/core/request.go @@ -22,11 +22,15 @@ func (c *core) handleRequest(request *istanbul.Request) error { logger := c.logger.New("state", c.state, "seq", c.current.sequence) if err := c.checkRequestMsg(request); err != nil { - logger.Warn("unexpected requests", "err", err, "request", request) + if err == errInvalidMessage { + logger.Warn("invalid request") + return err + } + logger.Warn("unexpected request", "err", err, "number", request.Proposal.Number(), "hash", request.Proposal.Hash()) return err } - logger.Trace("handleRequest", "request", request.Proposal.Number()) + logger.Trace("handleRequest", "number", request.Proposal.Number(), "hash", request.Proposal.Hash()) if c.state == StateAcceptRequest { c.sendPreprepare(request) @@ -55,7 +59,7 @@ func (c *core) checkRequestMsg(request *istanbul.Request) error { func (c *core) storeRequestMsg(request *istanbul.Request) { logger := c.logger.New("state", c.state) - logger.Trace("Store future requests", "request", request) + logger.Trace("Store future request", "request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash()) c.pendingRequestsMu.Lock() defer c.pendingRequestsMu.Unlock() @@ -78,14 +82,14 @@ func (c *core) processPendingRequests() { err := c.checkRequestMsg(r) if err != nil { if err == errFutureMessage { - c.logger.Trace("Stop processing request", "request", r) + c.logger.Trace("Stop processing request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash()) c.pendingRequests.Push(m, prio) break } - c.logger.Trace("Skip the pending request", "request", r, "err", err) + c.logger.Trace("Skip the pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash(), "err", err) continue } - c.logger.Trace("Post pending request", "request", r) + c.logger.Trace("Post pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash()) go c.sendEvent(istanbul.RequestEvent{ Proposal: r.Proposal, diff --git a/consensus/istanbul/core/request_test.go b/consensus/istanbul/core/request_test.go index 81b681eaded1..6c88d06dc08b 100644 --- a/consensus/istanbul/core/request_test.go +++ b/consensus/istanbul/core/request_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -35,7 +36,7 @@ func TestCheckRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(1), Round: big.NewInt(0), - }, newTestValidatorSet(4)), + }, newTestValidatorSet(4), common.Hash{}, nil), } // invalid request @@ -90,7 +91,7 @@ func TestStoreRequestMsg(t *testing.T) { current: newRoundState(&istanbul.View{ Sequence: big.NewInt(0), Round: big.NewInt(0), - }, newTestValidatorSet(4)), + }, newTestValidatorSet(4), common.Hash{}, nil), pendingRequests: prque.New(), pendingRequestsMu: new(sync.Mutex), } diff --git a/consensus/istanbul/core/roundstate.go b/consensus/istanbul/core/roundstate.go index 01e6fc64586b..bb95512a8533 100644 --- a/consensus/istanbul/core/roundstate.go +++ b/consensus/istanbul/core/roundstate.go @@ -21,18 +21,23 @@ import ( "math/big" "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/rlp" ) -func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) *roundState { +// newRoundState creates a new roundState instance with the given view and validatorSet +// lockedHash and preprepare are for round change when lock exists, +// we need to keep a reference of preprepare in order to propose locked proposal when there is a lock and itself is the proposer +func newRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, lockedHash common.Hash, preprepare *istanbul.Preprepare) *roundState { return &roundState{ round: view.Round, sequence: view.Sequence, - Preprepare: nil, + Preprepare: preprepare, Prepares: newMessageSet(validatorSet), Commits: newMessageSet(validatorSet), Checkpoints: newMessageSet(validatorSet), + lockedHash: lockedHash, mu: new(sync.RWMutex), } } @@ -45,6 +50,7 @@ type roundState struct { Prepares *messageSet Commits *messageSet Checkpoints *messageSet + lockedHash common.Hash mu *sync.RWMutex } @@ -112,6 +118,36 @@ func (s *roundState) Sequence() *big.Int { return s.sequence } +func (s *roundState) LockHash() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.Preprepare != nil { + s.lockedHash = s.Preprepare.Proposal.Hash() + } +} + +func (s *roundState) UnlockHash() { + s.mu.Lock() + defer s.mu.Unlock() + + s.lockedHash = common.Hash{} +} + +func (s *roundState) IsHashLocked() bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.lockedHash != common.Hash{} +} + +func (s *roundState) GetLockedHash() common.Hash { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.lockedHash +} + // The DecodeRLP method should read one value from the given // Stream. It is not forbidden to read less or more, but it might // be confusing. @@ -123,6 +159,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error { Prepares *messageSet Commits *messageSet Checkpoints *messageSet + lockedHash common.Hash } if err := stream.Decode(&ss); err != nil { @@ -134,6 +171,7 @@ func (s *roundState) DecodeRLP(stream *rlp.Stream) error { s.Prepares = ss.Prepares s.Commits = ss.Commits s.Checkpoints = ss.Checkpoints + s.lockedHash = ss.lockedHash s.mu = new(sync.RWMutex) return nil @@ -158,5 +196,6 @@ func (s *roundState) EncodeRLP(w io.Writer) error { s.Prepares, s.Commits, s.Checkpoints, + s.lockedHash, }) } diff --git a/consensus/istanbul/core/roundstate_test.go b/consensus/istanbul/core/roundstate_test.go index 0e3ee0ef0f9d..14a2c9c2c797 100644 --- a/consensus/istanbul/core/roundstate_test.go +++ b/consensus/istanbul/core/roundstate_test.go @@ -17,8 +17,11 @@ package core import ( + "math/big" "sync" + "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" ) @@ -33,3 +36,39 @@ func newTestRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet) mu: new(sync.RWMutex), } } + +func TestLockHash(t *testing.T) { + sys := NewTestSystemWithBackend(1, 0) + rs := newTestRoundState( + &istanbul.View{ + Round: big.NewInt(0), + Sequence: big.NewInt(0), + }, + sys.backends[0].peers, + ) + if !common.EmptyHash(rs.GetLockedHash()) { + t.Errorf("error mismatch: have %v, want empty", rs.GetLockedHash()) + } + if rs.IsHashLocked() { + t.Error("IsHashLocked should return false") + } + + // Lock + expected := rs.Proposal().Hash() + rs.LockHash() + if expected != rs.GetLockedHash() { + t.Errorf("error mismatch: have %v, want %v", rs.GetLockedHash(), expected) + } + if !rs.IsHashLocked() { + t.Error("IsHashLocked should return true") + } + + // Unlock + rs.UnlockHash() + if !common.EmptyHash(rs.GetLockedHash()) { + t.Errorf("error mismatch: have %v, want empty", rs.GetLockedHash()) + } + if rs.IsHashLocked() { + t.Error("IsHashLocked should return false") + } +} diff --git a/consensus/istanbul/core/testbackend_test.go b/consensus/istanbul/core/testbackend_test.go index d53c81b17ed6..a2ed8dd804ec 100644 --- a/consensus/istanbul/core/testbackend_test.go +++ b/consensus/istanbul/core/testbackend_test.go @@ -19,6 +19,7 @@ package core import ( "crypto/ecdsa" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/istanbul" @@ -39,14 +40,18 @@ type testSystemBackend struct { peers istanbul.ValidatorSet events *event.TypeMux - commitMsgs []istanbul.Proposal - committedSeals [][]byte - sentMsgs [][]byte // store the message when Send is called by core + committedMsgs []testCommittedMsgs + sentMsgs [][]byte // store the message when Send is called by core address common.Address db ethdb.Database } +type testCommittedMsgs struct { + commitProposal istanbul.Proposal + committedSeals [][]byte +} + // ============================================== // // define the functions that needs to be provided for Istanbul. @@ -87,10 +92,12 @@ func (self *testSystemBackend) NextRound() error { return nil } -func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals []byte) error { +func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte) error { testLogger.Info("commit message", "address", self.Address()) - self.commitMsgs = append(self.commitMsgs, proposal) - self.committedSeals = append(self.committedSeals, seals) + self.committedMsgs = append(self.committedMsgs, testCommittedMsgs{ + commitProposal: proposal, + committedSeals: seals, + }) // fake new head events go self.events.Post(istanbul.FinalCommittedEvent{ @@ -99,8 +106,8 @@ func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals []byte) return nil } -func (self *testSystemBackend) Verify(proposal istanbul.Proposal) error { - return nil +func (self *testSystemBackend) Verify(proposal istanbul.Proposal) (time.Duration, error) { + return 0, nil } func (self *testSystemBackend) Sign(data []byte) ([]byte, error) { @@ -126,6 +133,19 @@ func (self *testSystemBackend) NewRequest(request istanbul.Proposal) { }) } +// Only block height 5 will return true +func (self *testSystemBackend) HasBlock(hash common.Hash, number *big.Int) bool { + return number.Cmp(big.NewInt(5)) == 0 +} + +func (self *testSystemBackend) GetProposer(number uint64) common.Address { + return common.Address{} +} + +func (self *testSystemBackend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet { + return self.peers +} + // ============================================== // // define the struct that need to be provided for integration tests. @@ -180,7 +200,7 @@ func NewTestSystemWithBackend(n, f uint64) *testSystem { core.current = newRoundState(&istanbul.View{ Round: big.NewInt(0), Sequence: big.NewInt(1), - }, vset) + }, vset, common.Hash{}, nil) core.logger = testLogger core.validateFn = backend.CheckValidatorSignature diff --git a/consensus/istanbul/errors.go b/consensus/istanbul/errors.go index b4fa82bffc96..ed5b62342fb4 100644 --- a/consensus/istanbul/errors.go +++ b/consensus/istanbul/errors.go @@ -22,4 +22,8 @@ var ( // ErrUnauthorizedAddress is returned when given address cannot be found in // current validator set. ErrUnauthorizedAddress = errors.New("unauthorized address") + // ErrStoppedEngine is returned if the engine is stopped + ErrStoppedEngine = errors.New("stopped engine") + // ErrStartedEngine is returned if the engine is already started + ErrStartedEngine = errors.New("started engine") ) diff --git a/consensus/istanbul/events.go b/consensus/istanbul/events.go index 0f341b1f7d14..8e60c416c43b 100644 --- a/consensus/istanbul/events.go +++ b/consensus/istanbul/events.go @@ -18,23 +18,32 @@ package istanbul import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) +// ConsensusDataEvent is posted when sending Istanbul consensus data type ConsensusDataEvent struct { - // target to send message - Target common.Address + // targets to send message + Targets map[common.Address]bool // consensus message data Data []byte } +// NewCommittedEvent is posted when I'm not a proposer but +// a block has been committed from Istanbul consensus. +type NewCommittedEvent struct{ Block *types.Block } + +// RequestEvent is posted to propose a proposal type RequestEvent struct { Proposal Proposal } +// MessageEvent is posted for Istanbul engine communication type MessageEvent struct { Payload []byte } +// FinalCommittedEvent is posted when a proposal is committed type FinalCommittedEvent struct { Proposal Proposal Proposer common.Address diff --git a/eth/backend.go b/eth/backend.go index 367619d555f4..4ba94994eebc 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -352,6 +352,8 @@ func (s *Ethereum) StartMining(local bool) error { return fmt.Errorf("singer missing: %v", err) } clique.Authorize(eb, wallet.SignHash) + } else if istanbul, ok := s.engine.(consensus.Istanbul); ok { + istanbul.Start(s.blockchain, s.blockchain.InsertChain) } if local { // If local (CPU) mining is started, we can disable the transaction rejection @@ -364,7 +366,12 @@ func (s *Ethereum) StartMining(local bool) error { return nil } -func (s *Ethereum) StopMining() { s.miner.Stop() } +func (s *Ethereum) StopMining() { + s.miner.Stop() + if istanbul, ok := s.engine.(consensus.Istanbul); ok { + istanbul.Stop() + } +} func (s *Ethereum) IsMining() bool { return s.miner.Mining() } func (s *Ethereum) Miner() *miner.Miner { return s.miner } diff --git a/eth/istanbul_handler.go b/eth/istanbul_handler.go index de69d40486ab..fe9e70fce955 100644 --- a/eth/istanbul_handler.go +++ b/eth/istanbul_handler.go @@ -19,11 +19,9 @@ package eth import ( "fmt" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" @@ -37,7 +35,7 @@ import ( const ( // istanbul is compatible with eth63 protocol istanbulName = "istanbul" - istanbulVersion = 64 + IstanbulVersion = 64 istanbulProtocolLength = 18 IstanbulMsg = 0x11 @@ -67,10 +65,10 @@ func newIstanbulProtocolManager(config *params.ChainConfig, mode downloader.Sync manager.SubProtocols = []p2p.Protocol{ p2p.Protocol{ Name: istanbulName, - Version: istanbulVersion, + Version: IstanbulVersion, Length: istanbulProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := manager.newPeer(int(istanbulVersion), p, rw) + peer := manager.newPeer(int(IstanbulVersion), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) @@ -97,15 +95,13 @@ func newIstanbulProtocolManager(config *params.ChainConfig, mode downloader.Sync func (pm *istanbulProtocolManager) Start() { // Subscribe required events - pm.eventSub = pm.eventMux.Subscribe(istanbul.ConsensusDataEvent{}, core.ChainHeadEvent{}) + pm.eventSub = pm.eventMux.Subscribe(istanbul.ConsensusDataEvent{}, core.ChainHeadEvent{}, istanbul.NewCommittedEvent{}) go pm.eventLoop() pm.protocolManager.Start() - pm.engine.Start(pm.protocolManager.blockchain, pm.commitBlock) } func (pm *istanbulProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.engine.Stop() pm.protocolManager.Stop() pm.eventSub.Unsubscribe() // quits eventLoop } @@ -137,6 +133,8 @@ func (pm *istanbulProtocolManager) eventLoop() { switch ev := obj.Data.(type) { case istanbul.ConsensusDataEvent: pm.sendEvent(ev) + case istanbul.NewCommittedEvent: + pm.BroadcastBlock(ev.Block, false) case core.ChainHeadEvent: pm.newHead(ev) } @@ -145,23 +143,20 @@ func (pm *istanbulProtocolManager) eventLoop() { // sendEvent sends a p2p message with given data to a peer func (pm *istanbulProtocolManager) sendEvent(event istanbul.ConsensusDataEvent) { - // FIXME: it's inefficient because it retrieves all peers every time - p := pm.findPeer(event.Target) - if p == nil { - log.Warn("Failed to find peer by address", "addr", event.Target) - return - } - p2p.Send(p.rw, IstanbulMsg, event.Data) -} - -func (pm *istanbulProtocolManager) commitBlock(block *types.Block) error { - if _, err := pm.blockchain.InsertChain(types.Blocks{block}); err != nil { - log.Debug("Failed to insert block", "number", block.Number(), "hash", block.Hash(), "err", err) - return err + for _, p := range pm.peers.Peers() { + pubKey, err := p.ID().Pubkey() + if err != nil { + continue + } + addr := crypto.PubkeyToAddress(*pubKey) + if event.Targets[addr] { + p2p.Send(p.rw, IstanbulMsg, event.Data) + delete(event.Targets, addr) + if len(event.Targets) == 0 { + return + } + } } - // Only announce the block, don't broadcast it - go pm.BroadcastBlock(block, false) - return nil } func (pm *istanbulProtocolManager) newHead(event core.ChainHeadEvent) { @@ -170,17 +165,3 @@ func (pm *istanbulProtocolManager) newHead(event core.ChainHeadEvent) { pm.engine.NewChainHead(block) } } - -// findPeer retrieves a peer by given address -func (pm *istanbulProtocolManager) findPeer(addr common.Address) *peer { - for _, p := range pm.peers.Peers() { - pubKey, err := p.ID().Pubkey() - if err != nil { - continue - } - if crypto.PubkeyToAddress(*pubKey) == addr { - return p - } - } - return nil -} diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 333c975c9ac7..30360cc7a109 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -355,6 +355,9 @@ func (s *Service) login(conn *websocket.Conn) error { if info := infos.Protocols["eth"]; info != nil { network = fmt.Sprintf("%d", info.(*eth.EthNodeInfo).Network) protocol = fmt.Sprintf("eth/%d", eth.ProtocolVersions[0]) + } else if info := infos.Protocols["istanbul"]; info != nil { + network = fmt.Sprintf("%d", info.(*eth.EthNodeInfo).Network) + protocol = fmt.Sprintf("istanbul/%d", eth.IstanbulVersion) } else { network = fmt.Sprintf("%d", infos.Protocols["les"].(*eth.EthNodeInfo).Network) protocol = fmt.Sprintf("les/%d", les.ProtocolVersions[0]) diff --git a/miner/worker.go b/miner/worker.go index 411bc4e1b8e3..de67ef803557 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -80,6 +80,9 @@ type Result struct { Block *types.Block } +// NewBlockEvent is posted when a new block is required +type NewBlockEvent struct{} + // worker is the main object which takes care of applying messages to the new state type worker struct { config *params.ChainConfig @@ -138,7 +141,12 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), 5), fullValidation: false, } - worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + worker.events = worker.mux.Subscribe( + core.ChainHeadEvent{}, + core.ChainSideEvent{}, + core.TxPreEvent{}, + NewBlockEvent{}, + ) go worker.update() go worker.wait() @@ -233,6 +241,8 @@ func (self *worker) update() { for event := range self.events.Chan() { // A real event arrived, process interesting content switch ev := event.Data.(type) { + case NewBlockEvent: + self.commitNewWork() case core.ChainHeadEvent: self.commitNewWork() case core.ChainSideEvent: