From eb88d2619dc53b0a24180452c8805140e3e6bd14 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Wed, 3 Apr 2019 12:54:35 +0300 Subject: [PATCH] FAB-15043 Mig-v1 cleanup #2 In preparation to the alternative migration design (v2), revert some of the commits already merged. Kafka to Raft migration v1 - cleanup #2 Clean Kafka2Raft green path #4 (of v1) Clean commit 050498317d698df7fbb2c46d43df694882ec0c57 Change-Id: Ic7274401fdc58870c997b3204c21f548c7f5d1af Signed-off-by: Yoav Tock --- orderer/common/multichannel/chainsupport.go | 56 -------------------- orderer/consensus/etcdraft/chain.go | 50 +---------------- orderer/consensus/etcdraft/chain_test.go | 7 +-- orderer/consensus/etcdraft/consenter.go | 17 ------ orderer/consensus/etcdraft/consenter_test.go | 15 ++---- orderer/consensus/etcdraft/node.go | 11 ++-- 6 files changed, 8 insertions(+), 148 deletions(-) diff --git a/orderer/common/multichannel/chainsupport.go b/orderer/common/multichannel/chainsupport.go index 0f412520106..305ff6191a3 100644 --- a/orderer/common/multichannel/chainsupport.go +++ b/orderer/common/multichannel/chainsupport.go @@ -15,7 +15,6 @@ import ( "github.com/hyperledger/fabric/orderer/common/msgprocessor" "github.com/hyperledger/fabric/orderer/consensus" cb "github.com/hyperledger/fabric/protos/common" - "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" "github.com/pkg/errors" ) @@ -69,14 +68,6 @@ func newChainSupport( // Set up the block writer cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs) - // TODO Identify recovery after crash in the middle of consensus-type migration - if cs.detectMigration(lastBlock) { - // We do this because the last block after migration (COMMIT/CONTEXT) carries Kafka metadata. - // This prevents the code down the line from unmarshaling it as Raft, and panicking. - metadata.Value = nil - logger.Debugf("[channel: %s] Consensus-type migration: restart on to Raft, resetting Kafka block metadata", cs.ChainID()) - } - // Set up the consenter consenterType := ledgerResources.SharedConfig().ConsensusType() consenter, ok := consenters[consenterType] @@ -94,53 +85,6 @@ func newChainSupport( return cs } -// detectMigration identifies restart after consensus-type migration was committed (green path). -// Restart after migration is detected by: -// 1. The Kafka2RaftMigration capability in on -// 2. The last block carries a config-tx -// 3. In the config-tx, you have: -// - (system-channel && state=COMMIT), OR -// - (standard-channel && state=CONTEXT) -// This assumes that migration was successful (green path). When migration ends successfully, -// every channel will have a config block as the last block. On the system channel, containing state=COMMIT; -// on standard channels, containing state=CONTEXT. -func (cs *ChainSupport) detectMigration(lastBlock *cb.Block) bool { - isMigration := false - - if !cs.ledgerResources.SharedConfig().Capabilities().Kafka2RaftMigration() { - return isMigration - } - - lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock) - if err != nil { - logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err) - } - - logger.Debugf("[channel: %s], sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s", - cs.ChainID(), cs.systemChannel, lastConfigIndex, cs.ledgerResources.Height(), - cs.ledgerResources.SharedConfig().ConsensusMigrationState()) - - if lastConfigIndex == lastBlock.Header.Number { //The last block was a config-tx - state := cs.ledgerResources.SharedConfig().ConsensusMigrationState() - if cs.systemChannel { - if state == orderer.ConsensusType_MIG_STATE_COMMIT { - isMigration = true - } - } else { - if state == orderer.ConsensusType_MIG_STATE_CONTEXT { - isMigration = true - } - } - - if isMigration { - logger.Infof("[channel: %s], Restarting after consensus-type migration. New consensus-type is: %s", - cs.ChainID(), cs.ledgerResources.SharedConfig().ConsensusType()) - } - } - - return isMigration -} - // Block returns a block with the following number, // or nil if such a block doesn't exist. func (cs *ChainSupport) Block(number uint64) *cb.Block { diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index f18dbf12979..24e7f35602c 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -330,11 +330,7 @@ func (c *Chain) Start() { } isJoin := c.support.Height() > 1 - isMigration := false - if isJoin { - isMigration = c.detectMigration() - } - c.Node.start(c.fresh, isJoin, isMigration) + c.Node.start(c.fresh, isJoin) close(c.startC) close(c.errorC) @@ -358,42 +354,6 @@ func (c *Chain) Start() { c.periodicChecker.Run() } -// detectMigration detects if the orderer restarts right after consensus-type migration, -// in which the Height>1 but previous blocks were created by Kafka. -// If this is the case, Raft should be started like it is joining a new channel. -func (c *Chain) detectMigration() bool { - startOfChain := false - if c.support.SharedConfig().Capabilities().Kafka2RaftMigration() { - lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(c.lastBlock) - if err != nil { - c.logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err) - } - - c.logger.Debugf("Detecting if consensus-type migration, sysChan=%v, lastConfigIndex=%d, Height=%d, mig-state: %s", - c.support.IsSystemChannel(), lastConfigIndex, c.lastBlock.Header.Number+1, c.support.SharedConfig().ConsensusMigrationState().String()) - - if lastConfigIndex != c.lastBlock.Header.Number { // The last block is not a config-tx - return startOfChain - } - - // The last block was a config-tx - if c.support.IsSystemChannel() { - if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_COMMIT { - startOfChain = true - } - } else { - if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_CONTEXT { - startOfChain = true - } - } - - if startOfChain { - c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the channel.", c.support.SharedConfig().ConsensusType()) - } - } - return startOfChain -} - // Order submits normal type transactions for ordering. func (c *Chain) Order(env *common.Envelope, configSeq uint64) error { c.Metrics.NormalProposalsReceived.Add(1) @@ -1329,14 +1289,6 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange { return nil } - // Detect if it is a restart right after consensus-type migration. If yes, return early in order to avoid using - // the block metadata as etcdraft.BlockMetadata (see below). Right after migration the block metadata will carry - // Kafka metadata. The etcdraft.BlockMetadata should be extracted from the ConsensusType.Metadata, instead. - if c.detectMigration() { - c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the chain.", c.support.SharedConfig().ConsensusType()) - return nil - } - // extracting current Raft configuration state confState := c.Node.ApplyConfChange(raftpb.ConfChange{}) diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index 9781949fe1e..c737f7d8dd5 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -3500,12 +3500,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64, support := &consensusmocks.FakeConsenterSupport{} support.ChainIDReturns(channel) - support.SharedConfigReturns(&mockconfig.Orderer{ - BatchTimeoutVal: timeout, - CapabilitiesVal: &mockconfig.OrdererCapabilities{ - Kafka2RaftMigVal: false, - }, - }) + support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout}) cutter := mockblockcutter.NewReceiver() close(cutter.Block) diff --git a/orderer/consensus/etcdraft/consenter.go b/orderer/consensus/etcdraft/consenter.go index a0151c05df3..7f58d37e44d 100644 --- a/orderer/consensus/etcdraft/consenter.go +++ b/orderer/consensus/etcdraft/consenter.go @@ -8,7 +8,6 @@ package etcdraft import ( "bytes" - "encoding/hex" "path" "reflect" "time" @@ -120,27 +119,11 @@ func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uin // HandleChain returns a new Chain instance or an error upon failure func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) { - - if support.SharedConfig().Capabilities().Kafka2RaftMigration() { - c.Logger.Debugf("SharedConfig.ConsensusType fields: Type=%s, ConsensusMigrationState=%s, ConsensusMigrationContext=%d, ConsensusMetadata length=%d", - support.SharedConfig().ConsensusType(), support.SharedConfig().ConsensusMigrationState(), - support.SharedConfig().ConsensusMigrationContext(), len(support.SharedConfig().ConsensusMetadata())) - if support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE { - c.Logger.Debugf("SharedConfig.ConsensusType: ConsensusMetadata dump:\n%s", hex.Dump(support.SharedConfig().ConsensusMetadata())) - } - } - m := &etcdraft.ConfigMetadata{} if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil { return nil, errors.Wrap(err, "failed to unmarshal consensus metadata") } - if support.SharedConfig().Capabilities().Kafka2RaftMigration() && - support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE { - c.Logger.Debugf("SharedConfig().ConsensusMetadata(): %s", m.String()) - c.Logger.Debugf("block metadata.Value dump: \n%s", hex.Dump(metadata.Value)) - } - if m.Options == nil { return nil, errors.New("etcdraft options have not been provided") } diff --git a/orderer/consensus/etcdraft/consenter_test.go b/orderer/consensus/etcdraft/consenter_test.go index 679a9552a48..8d1053bceaf 100644 --- a/orderer/consensus/etcdraft/consenter_test.go +++ b/orderer/consensus/etcdraft/consenter_test.go @@ -156,10 +156,7 @@ var _ = Describe("Consenter", func() { metadata := utils.MarshalOrPanic(m) support.SharedConfigReturns(&mockconfig.Orderer{ ConsensusMetadataVal: metadata, - CapabilitiesVal: &mockconfig.OrdererCapabilities{ - Kafka2RaftMigVal: false, - }, - BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, + BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, }) consenter := newConsenter(chainGetter) @@ -199,10 +196,7 @@ var _ = Describe("Consenter", func() { support := &consensusmocks.FakeConsenterSupport{} support.SharedConfigReturns(&mockconfig.Orderer{ ConsensusMetadataVal: metadata, - CapabilitiesVal: &mockconfig.OrdererCapabilities{ - Kafka2RaftMigVal: false, - }, - BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, + BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, }) support.ChainIDReturns("foo") @@ -224,10 +218,7 @@ var _ = Describe("Consenter", func() { metadata := utils.MarshalOrPanic(m) support.SharedConfigReturns(&mockconfig.Orderer{ ConsensusMetadataVal: metadata, - CapabilitiesVal: &mockconfig.OrdererCapabilities{ - Kafka2RaftMigVal: false, - }, - BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, + BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024}, }) consenter := newConsenter(chainGetter) diff --git a/orderer/consensus/etcdraft/node.go b/orderer/consensus/etcdraft/node.go index 3c8fe9ea8e3..07211c19cf7 100644 --- a/orderer/consensus/etcdraft/node.go +++ b/orderer/consensus/etcdraft/node.go @@ -46,20 +46,15 @@ type node struct { raft.Node } -func (n *node) start(fresh, join, migration bool) { +func (n *node) start(fresh, join bool) { raftPeers := RaftPeers(n.metadata.ConsenterIds) n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers)) var campaign bool if fresh { if join { - if !migration { - raftPeers = nil - n.logger.Info("Starting raft node to join an existing channel") - - } else { - n.logger.Info("Starting raft node to join an existing channel, after consensus-type migration") - } + raftPeers = nil + n.logger.Info("Starting raft node to join an existing channel") } else { n.logger.Info("Starting raft node as part of a new channel")