Skip to content

Commit

Permalink
IBFT: Cleanup AddParentSeal in Prepare (ethereum#778)
Browse files Browse the repository at this point in the history
Extract addParentSeal to its own function.

Clean up error conditions. Now, whenever it can't generate a new ParentSeal, it will fallback to the AggregatedSeal on the parent block. If that fail, we will fail.
  • Loading branch information
Mariano Cortesi authored Jan 3, 2020
1 parent 36357a9 commit b5a7558
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 71 deletions.
152 changes: 84 additions & 68 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,74 +400,7 @@ func (sb *Backend) Prepare(chain consensus.ChainReader, header *types.Header) er
delay := time.Unix(header.Time.Int64(), 0).Sub(now())
time.Sleep(delay)

logger := sb.logger.New("func", "Backend.Prepare()", "number", number)
// modify the block header to include all the ParentCommits
// only do this for blocks which start with block 1 as a parent
if number > 1 {

// In some cases, "Prepare" may be called before sb.core has moved to the next sequence,
// preventing signature aggregation.
// This typically happens in round > 0, since round 0 typically hits the "time.Sleep()"
// above.
// When this happens, loop until sb.core moves to the next sequence, with a limit of 500ms.
waitForSequenceChange := func() *big.Int {
timeout := time.After(500 * time.Millisecond)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
seq := sb.core.Sequence()
if seq != nil && seq.Cmp(header.Number) == 0 {
logger.Trace("Current sequence matches header", "cur_seq", seq)
return seq
}
case <-timeout:
// TODO(asa): Why is this logged by full nodes?
log.Trace("Timed out while waiting for core to sequence change, unable to combine commit messages with ParentAggregatedSeal", "cur_seq", sb.core.Sequence())
return nil
}
}
}
seq := waitForSequenceChange()

parentExtra, err := types.ExtractIstanbulExtra(parent)
if err != nil {
return err
}
parentAggregatedSeal := parentExtra.AggregatedSeal
logger = logger.New("parentAggregatedSeal", parentAggregatedSeal.String())
logger.Trace("Got ParentAggregatedSeal")
if seq != nil && seq.Cmp(header.Number) == 0 {
parentCommits := sb.core.ParentCommits()
logger = logger.New("cur_seq", sb.core.Sequence())
if parentCommits != nil && parentCommits.Size() != 0 {
logger = logger.New("numParentCommits", parentCommits.Size())
logger.Trace("Found commit messages from previous sequence to combine with ParentAggregatedSeal")
// if we had any seals gossiped to us, proceed to add them to the
// already aggregated signature
if unionAggregatedSeal, err := istanbulCore.UnionOfSeals(parentExtra.AggregatedSeal, parentCommits); err != nil {
logger.Error("Failed to combine commit messages with ParentAggregatedSeal", "err", err)
} else {
// need to pass the previous block from the parent to get the parent's validators
// (otherwise we'd be getting the validators for the current block)
parentValidators := sb.getValidators(parent.Number.Uint64()-1, parent.ParentHash)
// only update to use the union if we indeed provided a valid aggregate signature for this block
if err := sb.verifyAggregatedSeal(parent.Hash(), parentValidators, unionAggregatedSeal); err != nil {
logger.Error("Failed to verify combined ParentAggregatedSeal", "err", err)
} else {
parentAggregatedSeal = unionAggregatedSeal
logger.Debug("Succeeded in verifying combined ParentAggregatedSeal", "combinedParentAggregatedSeal", parentAggregatedSeal.String())
}
}
} else {
logger.Debug("No additional seals to combine with ParentAggregatedSeal")
}
return writeAggregatedSeal(header, parentAggregatedSeal, true)
}
}

return nil
return sb.addParentSeal(chain, header)
}

// UpdateValSetDiff will update the validator set diff in the header, if the mined header is the last block of the epoch
Expand Down Expand Up @@ -882,6 +815,68 @@ func (sb *Backend) snapshot(chain consensus.ChainReader, number uint64, hash com
return returnSnap, nil
}

func (sb *Backend) addParentSeal(chain consensus.ChainReader, header *types.Header) error {
number := header.Number.Uint64()
logger := sb.logger.New("func", "Backend.addParentSeal()", "number", number)

// only do this for blocks which start with block 1 as a parent
if number <= 1 {
return nil
}

// Get parent's extra to fetch it's AggregatedSeal
parent := chain.GetHeader(header.ParentHash, number-1)
parentExtra, err := types.ExtractIstanbulExtra(parent)
if err != nil {
return err
}

createParentSeal := func() types.IstanbulAggregatedSeal {
// In some cases, "addParentSeal" may be called before sb.core has moved to the next sequence,
// preventing signature aggregation.
// This typically happens in round > 0, since round 0 typically hits the "time.Sleep()"
// above.
// When this happens, loop until sb.core moves to the next sequence, with a limit of 500ms.
seq := waitCoreToReachSequence(sb.core, header.Number)
if seq == nil {
return parentExtra.AggregatedSeal
}

logger = logger.New("parentAggregatedSeal", parentExtra.AggregatedSeal.String(), "cur_seq", seq)

parentCommits := sb.core.ParentCommits()
if parentCommits == nil || parentCommits.Size() == 0 {
logger.Debug("No additional seals to combine with ParentAggregatedSeal")
return parentExtra.AggregatedSeal
}

logger = logger.New("numParentCommits", parentCommits.Size())
logger.Trace("Found commit messages from previous sequence to combine with ParentAggregatedSeal")

// if we had any seals gossiped to us, proceed to add them to the
// already aggregated signature
unionAggregatedSeal, err := istanbulCore.UnionOfSeals(parentExtra.AggregatedSeal, parentCommits)
if err != nil {
logger.Error("Failed to combine commit messages with ParentAggregatedSeal", "err", err)
return parentExtra.AggregatedSeal
}

// need to pass the previous block from the parent to get the parent's validators
// (otherwise we'd be getting the validators for the current block)
parentValidators := sb.getValidators(parent.Number.Uint64()-1, parent.ParentHash)
// only update to use the union if we indeed provided a valid aggregate signature for this block
if err := sb.verifyAggregatedSeal(parent.Hash(), parentValidators, unionAggregatedSeal); err != nil {
logger.Error("Failed to verify combined ParentAggregatedSeal", "err", err)
return parentExtra.AggregatedSeal
}

logger.Debug("Succeeded in verifying combined ParentAggregatedSeal", "combinedParentAggregatedSeal", unionAggregatedSeal.String())
return unionAggregatedSeal
}

return writeAggregatedSeal(header, createParentSeal(), true)
}

// FIXME: Need to update this for Istanbul
// sigHash returns the hash which is used as input for the Istanbul
// signing. It is the hash of the entire header apart from the 65 byte signature
Expand Down Expand Up @@ -1033,3 +1028,24 @@ func writeAggregatedSeal(h *types.Header, aggregatedSeal types.IstanbulAggregate
h.Extra = append(h.Extra[:types.IstanbulExtraVanity], payload...)
return nil
}

func waitCoreToReachSequence(core istanbulCore.Engine, expectedSequence *big.Int) *big.Int {
logger := log.New("func", "waitCoreToReachSequence")
timeout := time.After(500 * time.Millisecond)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
seq := core.Sequence()
if seq != nil && seq.Cmp(expectedSequence) == 0 {
logger.Trace("Current sequence matches header", "cur_seq", seq)
return seq
}
case <-timeout:
// TODO(asa): Why is this logged by full nodes?
log.Trace("Timed out while waiting for core to sequence change, unable to combine commit messages with ParentAggregatedSeal", "cur_seq", core.Sequence())
return nil
}
}
}
18 changes: 15 additions & 3 deletions consensus/istanbul/backend/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backend
import (
"bytes"
"crypto/ecdsa"
"fmt"
"math/big"
"reflect"
"testing"
Expand Down Expand Up @@ -278,8 +279,14 @@ func makeBlock(chain *core.BlockChain, engine *Backend, parent *types.Block) *ty
func makeBlockWithoutSeal(chain *core.BlockChain, engine *Backend, parent *types.Block) *types.Block {
header := makeHeader(parent, engine.config)
engine.Prepare(chain, header)
state, _ := chain.StateAt(parent.Root())
block, _ := engine.Finalize(chain, header, state, nil, nil, nil, nil)
state, err := chain.StateAt(parent.Root())
if err != nil {
fmt.Printf("Error!! %v\n", err)
}
block, err := engine.Finalize(chain, header, state, nil, nil, nil, nil)
if err != nil {
fmt.Printf("Error!! %v\n", err)
}
return block
}

Expand Down Expand Up @@ -347,10 +354,14 @@ func TestSealStopChannel(t *testing.T) {
}
}

// TestSealCommittedOtherHash checks that when Seal() ask for a commit, if we send a
// different block hash, it will abort
func TestSealCommittedOtherHash(t *testing.T) {
chain, engine := newBlockChain(4, true)
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
otherBlock := makeBlockWithoutSeal(chain, engine, block)

// create a second block which will have a different hash
otherBlock := makeBlockWithoutSeal(chain, engine, chain.Genesis())
eventSub := engine.EventMux().Subscribe(istanbul.RequestEvent{})
eventLoop := func() {
ev := <-eventSub.Chan()
Expand Down Expand Up @@ -541,6 +552,7 @@ func TestVerifyHeaders(t *testing.T) {
b = makeBlockWithoutSeal(chain, engine, blocks[i-1])
b, _ = engine.updateBlock(blocks[i-1].Header(), b)
}

blocks = append(blocks, b)
headers = append(headers, blocks[i].Header())
}
Expand Down

0 comments on commit b5a7558

Please sign in to comment.