Skip to content

Commit

Permalink
Merge "Validate state at startup"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Sep 13, 2016
2 parents 2c890dd + 7b2e488 commit 44e1d30
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 22 deletions.
1 change: 1 addition & 0 deletions consensus/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (co *coordinatorImpl) ProcessEvent(event events.Event) events.Event {
for {
err, recoverable := co.stc.SyncToTarget(info.Height-1, info.CurrentBlockHash, et.peers)
if err == nil {
logger.Debug("State transfer sync completed, returning")
co.skipInProgress = false
co.consumer.StateUpdated(et.tag, info)
return nil
Expand Down
4 changes: 2 additions & 2 deletions consensus/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ func NewHelper(mhc peer.MessageHandlerCoordinator) *Helper {
coordinator: mhc,
secOn: viper.GetBool("security.enabled"),
secHelper: mhc.GetSecHelper(),
valid: true, // Assume our state is consistent until we are told otherwise, TODO: revisit
valid: true, // Assume our state is consistent until we are told otherwise, actual consensus (pbft) will invalidate this immediately, but noops will not
}

h.executor = executor.NewImpl(h, h, mhc)
h.executor.Start()
return h
}

func (h *Helper) setConsenter(c consensus.Consenter) {
h.consenter = c
h.executor.Start() // The consenter may be expecting a callback from the executor because of state transfer completing, it will miss this if we start the executor too early
}

// GetNetworkInfo returns the PeerEndpoints of the current validator and the entire validating network
Expand Down
9 changes: 9 additions & 0 deletions consensus/pbft/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,17 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
op.externalEventReceiver.manager = op.manager
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack)
op.manager.Queue() <- workEvent(func() {
op.pbft.stateTransfer(&stateUpdateTarget{
checkpointMessage: checkpointMessage{
seqNo: op.pbft.lastExec,
id: blockchainInfoBlob,
},
})
})

op.batchSize = config.GetInt("general.batchsize")
op.batchStore = nil
Expand Down
61 changes: 47 additions & 14 deletions consensus/pbft/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/consensus/util/events"
pb "github.com/hyperledger/fabric/protos"

"github.com/golang/protobuf/proto"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -76,8 +77,31 @@ func TestNetworkBatch(t *testing.T) {
}
}

func TestClearOustandingReqsOnStateRecovery(t *testing.T) {
b := newObcBatch(0, loadConfig(), &omniProto{})
var inertState = &omniProto{
GetBlockchainInfoImpl: func() *pb.BlockchainInfo {
return &pb.BlockchainInfo{
CurrentBlockHash: []byte("GENESIS"),
Height: 1,
}
},
GetBlockchainInfoBlobImpl: func() []byte {
b, _ := proto.Marshal(&pb.BlockchainInfo{
CurrentBlockHash: []byte("GENESIS"),
Height: 1,
})
return b
},
InvalidateStateImpl: func() {},
ValidateStateImpl: func() {},
UpdateStateImpl: func(id interface{}, target *pb.BlockchainInfo, peers []*pb.PeerID) {},
}

func TestClearOutstandingReqsOnStateRecovery(t *testing.T) {
omni := *inertState
omni.UnicastImpl = func(msg *pb.Message, receiverHandle *pb.PeerID) error { return nil }
b := newObcBatch(0, loadConfig(), &omni)
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())

defer b.Close()

b.reqStore.storeOutstanding(&Request{})
Expand All @@ -98,10 +122,9 @@ func TestClearOustandingReqsOnStateRecovery(t *testing.T) {
func TestOutstandingReqsIngestion(t *testing.T) {
bs := [3]*obcBatch{}
for i := range bs {
omni := &omniProto{
UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil },
}
bs[i] = newObcBatch(uint64(i), loadConfig(), omni)
omni := *inertState
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil }
bs[i] = newObcBatch(uint64(i), loadConfig(), &omni)
defer bs[i].Close()

// Have vp1 only deliver messages
Expand All @@ -115,6 +138,9 @@ func TestOutstandingReqsIngestion(t *testing.T) {
}
}
}
for i := range bs {
bs[i].StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
}

err := bs[1].RecvMsg(createTxMsg(1), &pb.PeerID{Name: "vp1"})
if err != nil {
Expand All @@ -137,10 +163,10 @@ func TestOutstandingReqsIngestion(t *testing.T) {
}

func TestOutstandingReqsResubmission(t *testing.T) {
omni := &omniProto{}
config := loadConfig()
config.Set("general.batchsize", 2)
b := newObcBatch(0, config, omni)
omni := *inertState
b := newObcBatch(0, config, &omni)
defer b.Close() // The broadcasting threads only cause problems here... but this test stalls without them

transactionsBroadcast := 0
Expand All @@ -160,6 +186,9 @@ func TestOutstandingReqsResubmission(t *testing.T) {
return nil
}

b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
b.manager.Queue() <- nil // Make sure the state update finishes first

reqs := make([]*Request, 8)
for i := 0; i < len(reqs); i++ {
reqs[i] = createPbftReq(int64(i), 0)
Expand Down Expand Up @@ -232,11 +261,12 @@ func TestOutstandingReqsResubmission(t *testing.T) {
}

func TestViewChangeOnPrimarySilence(t *testing.T) {
b := newObcBatch(1, loadConfig(), &omniProto{
UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil },
SignImpl: func(msg []byte) ([]byte, error) { return msg, nil },
VerifyImpl: func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil },
})
omni := *inertState
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint
omni.SignImpl = func(msg []byte) ([]byte, error) { return msg, nil }
omni.VerifyImpl = func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil }
b := newObcBatch(1, loadConfig(), &omni)
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
b.pbft.requestTimeout = 50 * time.Millisecond
defer b.Close()

Expand Down Expand Up @@ -347,7 +377,10 @@ func TestClassicBackToBackStateTransfer(t *testing.T) {
}

func TestClearBatchStoreOnViewChange(t *testing.T) {
b := newObcBatch(1, loadConfig(), &omniProto{})
omni := *inertState
omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint
b := newObcBatch(1, loadConfig(), &omni)
b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl())
defer b.Close()

b.batchStore = []*Request{&Request{}}
Expand Down
25 changes: 22 additions & 3 deletions consensus/pbft/mock_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pbft

import (
"bytes"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -297,6 +298,27 @@ func (mock *MockLedger) GetBlockHeadMetadata() ([]byte, error) {
}

func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers []*protos.PeerID) {
if mock.blockHeight >= info.Height {
blockCursor := info.Height - 1
validHash := info.CurrentBlockHash
for {
block, ok := mock.blocks[blockCursor]
if !ok {
break
}
hash, _ := mock.HashBlock(block)
if !bytes.Equal(hash, validHash) {
break
}
blockCursor--
validHash = block.PreviousBlockHash
if blockCursor == ^uint64(0) {
return
}
}
panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d. (Corrupt block at %d with hash %x)", info.Height, mock.blockHeight, blockCursor, validHash))
}

var remoteLedger consensus.ReadOnlyLedger
if len(peers) > 0 {
var ok bool
Expand All @@ -309,9 +331,6 @@ func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers
}
fmt.Printf("TEST LEDGER skipping to %+v", info)
p := 0
if mock.blockHeight >= info.Height {
panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d", info.Height, mock.blockHeight))
}
for n := mock.blockHeight; n < info.Height; n++ {
block, err := remoteLedger.GetBlock(n)

Expand Down
2 changes: 1 addition & 1 deletion consensus/pbft/pbft-core.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,11 @@ func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
return nil
}
logger.Infof("Replica %d application caught up via state transfer, lastExec now %d", instance.id, update.seqNo)
// XXX create checkpoint
instance.lastExec = update.seqNo
instance.moveWatermarks(instance.lastExec) // The watermark movement handles moving this to a checkpoint boundary
instance.skipInProgress = false
instance.consumer.validateState()
instance.Checkpoint(update.seqNo, update.id)
instance.executeOutstanding()
case execDoneEvent:
instance.execDoneSync()
Expand Down
25 changes: 25 additions & 0 deletions consensus/pbft/pbft-core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/op/go-logging"

"github.com/hyperledger/fabric/consensus/util/events"
pb "github.com/hyperledger/fabric/protos"
)

func init() {
Expand Down Expand Up @@ -1703,6 +1704,30 @@ func TestViewChangeDuringExecution(t *testing.T) {
}
}

func TestStateTransferCheckpoint(t *testing.T) {
broadcasts := 0
instance := newPbftCore(3, loadConfig(), &omniProto{
broadcastImpl: func(msg []byte) {
broadcasts++
},
validateStateImpl: func() {},
}, &inertTimerFactory{})

id := []byte("My ID")
events.SendEvent(instance, stateUpdatedEvent{
chkpt: &checkpointMessage{
seqNo: 10,
id: id,
},
target: &pb.BlockchainInfo{},
})

if broadcasts != 1 {
t.Fatalf("Should have broadcast a checkpoint after the state transfer finished")
}

}

func TestStateTransferredToOldPoint(t *testing.T) {
skipped := false
instance := newPbftCore(3, loadConfig(), &omniProto{
Expand Down
38 changes: 36 additions & 2 deletions core/peer/statetransfer/statetransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type coordinatorImpl struct {
// If the peerIDs are nil, then all peers are assumed to have the given block.
// If the call returns an error, a boolean is included which indicates if the error may be transient and the caller should retry
func (sts *coordinatorImpl) SyncToTarget(blockNumber uint64, blockHash []byte, peerIDs []*pb.PeerID) (error, bool) {
logger.Debugf("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs)
logger.Infof("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs)

if !sts.inProgress {
sts.currentStateBlockNumber = sts.stack.GetBlockchainSize() - 1 // The block height is one more than the latest block number
Expand Down Expand Up @@ -422,8 +422,42 @@ func (sts *coordinatorImpl) syncBlockchainToTarget(blockSyncReq *blockSyncReq) {
panic("Our blockchain is already higher than a sync target, this is unlikely, but unimplemented")
}
} else {
blockCursor := blockSyncReq.blockNumber
validHash := blockSyncReq.firstBlockHash

_, _, err := sts.syncBlocks(blockSyncReq.blockNumber, blockSyncReq.reportOnBlock, blockSyncReq.firstBlockHash, blockSyncReq.peerIDs)
// Don't bother fetching blocks which are already here and valid
// This is especially useful at startup
for {
block, err := sts.stack.GetBlockByNumber(blockCursor)
if err != nil || block == nil {
// Need to fetch this block
break
}
bh, err := sts.stack.HashBlock(block)
if err != nil {
// Something wrong with this block
break
}
if !bytes.Equal(bh, validHash) {
// Block is corrupt
break
}
blockCursor--
validHash = block.PreviousBlockHash
if blockCursor+1 == blockSyncReq.reportOnBlock {
break
}
}

if blockCursor+1 <= blockSyncReq.blockNumber {
logger.Debugf("Skipped remote syncing of block %d through %d because they were already present and valid", blockSyncReq.blockNumber, blockCursor+1)
}

var err error
// Note, this must accomodate blockCursor underflowing
if blockCursor+1 > blockSyncReq.reportOnBlock {
_, _, err = sts.syncBlocks(blockCursor, blockSyncReq.reportOnBlock, validHash, blockSyncReq.peerIDs)
}

if nil != blockSyncReq.replyChan {
logger.Debugf("Replying to blockSyncReq on reply channel with : %s", err)
Expand Down
69 changes: 69 additions & 0 deletions core/peer/statetransfer/statetransfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,75 @@ func makeSimpleFilter(failureTrigger mockRequest, failureType mockResponse) (fun

}

func TestStartupValidStateGenesis(t *testing.T) {
mrls := createRemoteLedgers(2, 1) // No remote targets available

// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
ml.PutBlock(0, SimpleGetBlock(0))

sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}

}

func TestStartupValidStateExisting(t *testing.T) {
mrls := createRemoteLedgers(2, 1) // No remote targets available

// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
height := uint64(50)
for i := uint64(0); i < height; i++ {
ml.PutBlock(i, SimpleGetBlock(i))
}
ml.state = SimpleGetState(height - 1)

sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}

}

func TestStartupInvalidStateGenesis(t *testing.T) {
mrls := createRemoteLedgers(1, 3)

// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
ml.PutBlock(0, SimpleGetBlock(0))
ml.state = ^ml.state // Ensure the state is wrong

sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}

}

func TestStartupInvalidStateExisting(t *testing.T) {
mrls := createRemoteLedgers(1, 3)

// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
height := uint64(50)
for i := uint64(0); i < height; i++ {
ml.PutBlock(i, SimpleGetBlock(i))
}
ml.state = ^SimpleGetState(height - 1) // Ensure the state is wrong

sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}

}

func TestCatchupSimple(t *testing.T) {
mrls := createRemoteLedgers(1, 3)

Expand Down

0 comments on commit 44e1d30

Please sign in to comment.