diff --git a/consensus/executor/executor.go b/consensus/executor/executor.go index bab2fa2f8e4..9cadbabd5db 100644 --- a/consensus/executor/executor.go +++ b/consensus/executor/executor.go @@ -131,6 +131,7 @@ func (co *coordinatorImpl) ProcessEvent(event events.Event) events.Event { for { err, recoverable := co.stc.SyncToTarget(info.Height-1, info.CurrentBlockHash, et.peers) if err == nil { + logger.Debug("State transfer sync completed, returning") co.skipInProgress = false co.consumer.StateUpdated(et.tag, info) return nil diff --git a/consensus/helper/helper.go b/consensus/helper/helper.go index e28d9c22097..e8581ade2a0 100644 --- a/consensus/helper/helper.go +++ b/consensus/helper/helper.go @@ -53,16 +53,16 @@ func NewHelper(mhc peer.MessageHandlerCoordinator) *Helper { coordinator: mhc, secOn: viper.GetBool("security.enabled"), secHelper: mhc.GetSecHelper(), - valid: true, // Assume our state is consistent until we are told otherwise, TODO: revisit + valid: true, // Assume our state is consistent until we are told otherwise, actual consensus (pbft) will invalidate this immediately, but noops will not } h.executor = executor.NewImpl(h, h, mhc) - h.executor.Start() return h } func (h *Helper) setConsenter(c consensus.Consenter) { h.consenter = c + h.executor.Start() // The consenter may be expecting a callback from the executor because of state transfer completing, it will miss this if we start the executor too early } // GetNetworkInfo returns the PeerEndpoints of the current validator and the entire validating network diff --git a/consensus/pbft/batch.go b/consensus/pbft/batch.go index 142ebf473d9..212ebf9939a 100644 --- a/consensus/pbft/batch.go +++ b/consensus/pbft/batch.go @@ -83,8 +83,17 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc etf := events.NewTimerFactoryImpl(op.manager) op.pbft = newPbftCore(id, config, op, etf) op.manager.Start() + blockchainInfoBlob := stack.GetBlockchainInfoBlob() op.externalEventReceiver.manager = op.manager op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack) + op.manager.Queue() <- workEvent(func() { + op.pbft.stateTransfer(&stateUpdateTarget{ + checkpointMessage: checkpointMessage{ + seqNo: op.pbft.lastExec, + id: blockchainInfoBlob, + }, + }) + }) op.batchSize = config.GetInt("general.batchsize") op.batchStore = nil diff --git a/consensus/pbft/batch_test.go b/consensus/pbft/batch_test.go index b5a3fdacfbd..8ce2e164487 100644 --- a/consensus/pbft/batch_test.go +++ b/consensus/pbft/batch_test.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/consensus/util/events" pb "github.com/hyperledger/fabric/protos" + "github.com/golang/protobuf/proto" "github.com/spf13/viper" ) @@ -76,8 +77,31 @@ func TestNetworkBatch(t *testing.T) { } } -func TestClearOustandingReqsOnStateRecovery(t *testing.T) { - b := newObcBatch(0, loadConfig(), &omniProto{}) +var inertState = &omniProto{ + GetBlockchainInfoImpl: func() *pb.BlockchainInfo { + return &pb.BlockchainInfo{ + CurrentBlockHash: []byte("GENESIS"), + Height: 1, + } + }, + GetBlockchainInfoBlobImpl: func() []byte { + b, _ := proto.Marshal(&pb.BlockchainInfo{ + CurrentBlockHash: []byte("GENESIS"), + Height: 1, + }) + return b + }, + InvalidateStateImpl: func() {}, + ValidateStateImpl: func() {}, + UpdateStateImpl: func(id interface{}, target *pb.BlockchainInfo, peers []*pb.PeerID) {}, +} + +func TestClearOutstandingReqsOnStateRecovery(t *testing.T) { + omni := *inertState + omni.UnicastImpl = func(msg *pb.Message, receiverHandle *pb.PeerID) error { return nil } + b := newObcBatch(0, loadConfig(), &omni) + b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl()) + defer b.Close() b.reqStore.storeOutstanding(&Request{}) @@ -98,10 +122,9 @@ func TestClearOustandingReqsOnStateRecovery(t *testing.T) { func TestOutstandingReqsIngestion(t *testing.T) { bs := [3]*obcBatch{} for i := range bs { - omni := &omniProto{ - UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil }, - } - bs[i] = newObcBatch(uint64(i), loadConfig(), omni) + omni := *inertState + omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } + bs[i] = newObcBatch(uint64(i), loadConfig(), &omni) defer bs[i].Close() // Have vp1 only deliver messages @@ -115,6 +138,9 @@ func TestOutstandingReqsIngestion(t *testing.T) { } } } + for i := range bs { + bs[i].StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl()) + } err := bs[1].RecvMsg(createTxMsg(1), &pb.PeerID{Name: "vp1"}) if err != nil { @@ -137,10 +163,10 @@ func TestOutstandingReqsIngestion(t *testing.T) { } func TestOutstandingReqsResubmission(t *testing.T) { - omni := &omniProto{} config := loadConfig() config.Set("general.batchsize", 2) - b := newObcBatch(0, config, omni) + omni := *inertState + b := newObcBatch(0, config, &omni) defer b.Close() // The broadcasting threads only cause problems here... but this test stalls without them transactionsBroadcast := 0 @@ -160,6 +186,9 @@ func TestOutstandingReqsResubmission(t *testing.T) { return nil } + b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl()) + b.manager.Queue() <- nil // Make sure the state update finishes first + reqs := make([]*Request, 8) for i := 0; i < len(reqs); i++ { reqs[i] = createPbftReq(int64(i), 0) @@ -232,11 +261,12 @@ func TestOutstandingReqsResubmission(t *testing.T) { } func TestViewChangeOnPrimarySilence(t *testing.T) { - b := newObcBatch(1, loadConfig(), &omniProto{ - UnicastImpl: func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil }, - SignImpl: func(msg []byte) ([]byte, error) { return msg, nil }, - VerifyImpl: func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil }, - }) + omni := *inertState + omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint + omni.SignImpl = func(msg []byte) ([]byte, error) { return msg, nil } + omni.VerifyImpl = func(peerID *pb.PeerID, signature []byte, message []byte) error { return nil } + b := newObcBatch(1, loadConfig(), &omni) + b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl()) b.pbft.requestTimeout = 50 * time.Millisecond defer b.Close() @@ -347,7 +377,10 @@ func TestClassicBackToBackStateTransfer(t *testing.T) { } func TestClearBatchStoreOnViewChange(t *testing.T) { - b := newObcBatch(1, loadConfig(), &omniProto{}) + omni := *inertState + omni.UnicastImpl = func(ocMsg *pb.Message, peer *pb.PeerID) error { return nil } // For the checkpoint + b := newObcBatch(1, loadConfig(), &omni) + b.StateUpdated(&checkpointMessage{seqNo: 0, id: inertState.GetBlockchainInfoBlobImpl()}, inertState.GetBlockchainInfoImpl()) defer b.Close() b.batchStore = []*Request{&Request{}} diff --git a/consensus/pbft/mock_ledger_test.go b/consensus/pbft/mock_ledger_test.go index 14b75750f2c..1a5d127b96e 100644 --- a/consensus/pbft/mock_ledger_test.go +++ b/consensus/pbft/mock_ledger_test.go @@ -17,6 +17,7 @@ limitations under the License. package pbft import ( + "bytes" "fmt" "reflect" "sync" @@ -297,6 +298,27 @@ func (mock *MockLedger) GetBlockHeadMetadata() ([]byte, error) { } func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers []*protos.PeerID) { + if mock.blockHeight >= info.Height { + blockCursor := info.Height - 1 + validHash := info.CurrentBlockHash + for { + block, ok := mock.blocks[blockCursor] + if !ok { + break + } + hash, _ := mock.HashBlock(block) + if !bytes.Equal(hash, validHash) { + break + } + blockCursor-- + validHash = block.PreviousBlockHash + if blockCursor == ^uint64(0) { + return + } + } + panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d. (Corrupt block at %d with hash %x)", info.Height, mock.blockHeight, blockCursor, validHash)) + } + var remoteLedger consensus.ReadOnlyLedger if len(peers) > 0 { var ok bool @@ -309,9 +331,6 @@ func (mock *MockLedger) simulateStateTransfer(info *protos.BlockchainInfo, peers } fmt.Printf("TEST LEDGER skipping to %+v", info) p := 0 - if mock.blockHeight >= info.Height { - panic(fmt.Sprintf("Asked to skip to a block (%d) which is lower than our current height of %d", info.Height, mock.blockHeight)) - } for n := mock.blockHeight; n < info.Height; n++ { block, err := remoteLedger.GetBlock(n) diff --git a/consensus/pbft/pbft-core.go b/consensus/pbft/pbft-core.go index 545db80729b..943572a5afb 100644 --- a/consensus/pbft/pbft-core.go +++ b/consensus/pbft/pbft-core.go @@ -376,11 +376,11 @@ func (instance *pbftCore) ProcessEvent(e events.Event) events.Event { return nil } logger.Infof("Replica %d application caught up via state transfer, lastExec now %d", instance.id, update.seqNo) - // XXX create checkpoint instance.lastExec = update.seqNo instance.moveWatermarks(instance.lastExec) // The watermark movement handles moving this to a checkpoint boundary instance.skipInProgress = false instance.consumer.validateState() + instance.Checkpoint(update.seqNo, update.id) instance.executeOutstanding() case execDoneEvent: instance.execDoneSync() diff --git a/consensus/pbft/pbft-core_test.go b/consensus/pbft/pbft-core_test.go index 52e8a6a32f2..5b3d5d71d5e 100644 --- a/consensus/pbft/pbft-core_test.go +++ b/consensus/pbft/pbft-core_test.go @@ -31,6 +31,7 @@ import ( "github.com/op/go-logging" "github.com/hyperledger/fabric/consensus/util/events" + pb "github.com/hyperledger/fabric/protos" ) func init() { @@ -1703,6 +1704,30 @@ func TestViewChangeDuringExecution(t *testing.T) { } } +func TestStateTransferCheckpoint(t *testing.T) { + broadcasts := 0 + instance := newPbftCore(3, loadConfig(), &omniProto{ + broadcastImpl: func(msg []byte) { + broadcasts++ + }, + validateStateImpl: func() {}, + }, &inertTimerFactory{}) + + id := []byte("My ID") + events.SendEvent(instance, stateUpdatedEvent{ + chkpt: &checkpointMessage{ + seqNo: 10, + id: id, + }, + target: &pb.BlockchainInfo{}, + }) + + if broadcasts != 1 { + t.Fatalf("Should have broadcast a checkpoint after the state transfer finished") + } + +} + func TestStateTransferredToOldPoint(t *testing.T) { skipped := false instance := newPbftCore(3, loadConfig(), &omniProto{ diff --git a/core/peer/statetransfer/statetransfer.go b/core/peer/statetransfer/statetransfer.go index fe77d743606..62258cf5937 100644 --- a/core/peer/statetransfer/statetransfer.go +++ b/core/peer/statetransfer/statetransfer.go @@ -97,7 +97,7 @@ type coordinatorImpl struct { // If the peerIDs are nil, then all peers are assumed to have the given block. // If the call returns an error, a boolean is included which indicates if the error may be transient and the caller should retry func (sts *coordinatorImpl) SyncToTarget(blockNumber uint64, blockHash []byte, peerIDs []*pb.PeerID) (error, bool) { - logger.Debugf("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs) + logger.Infof("Syncing to target %x for block number %d with peers %v", blockHash, blockNumber, peerIDs) if !sts.inProgress { sts.currentStateBlockNumber = sts.stack.GetBlockchainSize() - 1 // The block height is one more than the latest block number @@ -422,8 +422,42 @@ func (sts *coordinatorImpl) syncBlockchainToTarget(blockSyncReq *blockSyncReq) { panic("Our blockchain is already higher than a sync target, this is unlikely, but unimplemented") } } else { + blockCursor := blockSyncReq.blockNumber + validHash := blockSyncReq.firstBlockHash - _, _, err := sts.syncBlocks(blockSyncReq.blockNumber, blockSyncReq.reportOnBlock, blockSyncReq.firstBlockHash, blockSyncReq.peerIDs) + // Don't bother fetching blocks which are already here and valid + // This is especially useful at startup + for { + block, err := sts.stack.GetBlockByNumber(blockCursor) + if err != nil || block == nil { + // Need to fetch this block + break + } + bh, err := sts.stack.HashBlock(block) + if err != nil { + // Something wrong with this block + break + } + if !bytes.Equal(bh, validHash) { + // Block is corrupt + break + } + blockCursor-- + validHash = block.PreviousBlockHash + if blockCursor+1 == blockSyncReq.reportOnBlock { + break + } + } + + if blockCursor+1 <= blockSyncReq.blockNumber { + logger.Debugf("Skipped remote syncing of block %d through %d because they were already present and valid", blockSyncReq.blockNumber, blockCursor+1) + } + + var err error + // Note, this must accomodate blockCursor underflowing + if blockCursor+1 > blockSyncReq.reportOnBlock { + _, _, err = sts.syncBlocks(blockCursor, blockSyncReq.reportOnBlock, validHash, blockSyncReq.peerIDs) + } if nil != blockSyncReq.replyChan { logger.Debugf("Replying to blockSyncReq on reply channel with : %s", err) diff --git a/core/peer/statetransfer/statetransfer_test.go b/core/peer/statetransfer/statetransfer_test.go index 5e72d7b94fc..df8dce75afa 100644 --- a/core/peer/statetransfer/statetransfer_test.go +++ b/core/peer/statetransfer/statetransfer_test.go @@ -164,6 +164,75 @@ func makeSimpleFilter(failureTrigger mockRequest, failureType mockResponse) (fun } +func TestStartupValidStateGenesis(t *testing.T) { + mrls := createRemoteLedgers(2, 1) // No remote targets available + + // Test from blockheight of 1, with valid genesis block + ml := NewMockLedger(mrls, nil, t) + ml.PutBlock(0, SimpleGetBlock(0)) + + sts := newTestStateTransfer(ml, mrls) + defer sts.Stop() + if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err { + t.Fatalf("Startup failure: %s", err) + } + +} + +func TestStartupValidStateExisting(t *testing.T) { + mrls := createRemoteLedgers(2, 1) // No remote targets available + + // Test from blockheight of 1, with valid genesis block + ml := NewMockLedger(mrls, nil, t) + height := uint64(50) + for i := uint64(0); i < height; i++ { + ml.PutBlock(i, SimpleGetBlock(i)) + } + ml.state = SimpleGetState(height - 1) + + sts := newTestStateTransfer(ml, mrls) + defer sts.Stop() + if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err { + t.Fatalf("Startup failure: %s", err) + } + +} + +func TestStartupInvalidStateGenesis(t *testing.T) { + mrls := createRemoteLedgers(1, 3) + + // Test from blockheight of 1, with valid genesis block + ml := NewMockLedger(mrls, nil, t) + ml.PutBlock(0, SimpleGetBlock(0)) + ml.state = ^ml.state // Ensure the state is wrong + + sts := newTestStateTransfer(ml, mrls) + defer sts.Stop() + if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err { + t.Fatalf("Startup failure: %s", err) + } + +} + +func TestStartupInvalidStateExisting(t *testing.T) { + mrls := createRemoteLedgers(1, 3) + + // Test from blockheight of 1, with valid genesis block + ml := NewMockLedger(mrls, nil, t) + height := uint64(50) + for i := uint64(0); i < height; i++ { + ml.PutBlock(i, SimpleGetBlock(i)) + } + ml.state = ^SimpleGetState(height - 1) // Ensure the state is wrong + + sts := newTestStateTransfer(ml, mrls) + defer sts.Stop() + if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err { + t.Fatalf("Startup failure: %s", err) + } + +} + func TestCatchupSimple(t *testing.T) { mrls := createRemoteLedgers(1, 3)