Skip to content

Commit

Permalink
Do not require reboot when re-adding consenter
Browse files Browse the repository at this point in the history
FAB-13552

Change-Id: I5d77dd613db3ad4d1296f2e8b960859a785f04ec
Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger committed Nov 13, 2019
1 parent 85aebd6 commit cce2247
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
26 changes: 22 additions & 4 deletions integration/raft/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,10 +882,10 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
firstEvictedNode := findLeader(ordererRunners) - 1

By("Removing the leader from 3-node channel")
serverCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[firstEvictedNode]), "server.crt"))
server1CertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[firstEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))

removeConsenter(network, peer, network.Orderers[(firstEvictedNode+1)%3], "systemchannel", serverCertBytes)
removeConsenter(network, peer, network.Orderers[(firstEvictedNode+1)%3], "systemchannel", server1CertBytes)

var survivedOrdererRunners []*ginkgomon.Runner
for i := range orderers {
Expand Down Expand Up @@ -914,17 +914,35 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
ensureEvicted(orderers[firstEvictedNode], peer, network, "systemchannel")

By("Removing the leader from 2-node channel")
serverCertBytes, err = ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[secondEvictedNode]), "server.crt"))
server2CertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[secondEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))

removeConsenter(network, peer, orderers[surviver], "systemchannel", serverCertBytes)
removeConsenter(network, peer, orderers[surviver], "systemchannel", server2CertBytes)
findLeader([]*ginkgomon.Runner{ordererRunners[surviver]})

fmt.Fprintln(GinkgoWriter, "Ensuring the other orderer detect the eviction of the node on channel systemchannel")
Eventually(ordererRunners[secondEvictedNode].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(stopMsg))

By("Ensuring the evicted orderer now doesn't serve clients")
ensureEvicted(orderers[secondEvictedNode], peer, network, "systemchannel")

By("Re-adding first evicted orderer")
addConsenter(network, peer, network.Orderers[surviver], "systemchannel", etcdraft.Consenter{
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(orderers[firstEvictedNode], nwo.ClusterPort)),
ClientTlsCert: server1CertBytes,
ServerTlsCert: server1CertBytes,
})

By("Ensuring re-added orderer starts serving system channel")
assertBlockReception(map[string]int{
"systemchannel": 3,
}, []*nwo.Orderer{orderers[firstEvictedNode]}, peer, network)

env := CreateBroadcastEnvelope(network, orderers[secondEvictedNode], network.SystemChannel.Name, []byte("foo"))
resp, err := Broadcast(network, orderers[surviver], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
})

It("notices it even if it is down at the time of its eviction", func() {
Expand Down
9 changes: 9 additions & 0 deletions orderer/common/cluster/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ func (r *Replicator) ReplicateChains() []string {
if err != nil {
r.Logger.Panicf("Failed to create a ledger for channel %s: %v", channel.ChannelName, err)
}

if channel.GenesisBlock == nil {
if ledger.Height() == 0 {
r.Logger.Panicf("Expecting channel %s to at least contain genesis block, but it doesn't", channel.ChannelName)
}

continue
}

gb, err := ChannelCreationBlockToGenesisBlock(channel.GenesisBlock)
if err != nil {
r.Logger.Panicf("Failed converting channel creation block for channel %s to genesis block: %v",
Expand Down
4 changes: 1 addition & 3 deletions orderer/common/server/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,9 @@ type chainCreation struct {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be activated.
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChainCallback etcdraft.CreateChainCallback) {
if genesisBlock == nil {
dc.logger.Panicf("Called with a nil genesis block")
}
dc.lock.Lock()
defer dc.lock.Unlock()

dc.logger.Infof("Adding %s to the set of chains to track", chain)
dc.chains2CreationCallbacks[chain] = chainCreation{
genesisBlock: genesisBlock,
Expand Down
14 changes: 4 additions & 10 deletions orderer/common/server/onboarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,18 +871,12 @@ func TestInactiveChainReplicatorChannels(t *testing.T) {
chains2CreationCallbacks: make(map[string]chainCreation),
}
icr.TrackChain("foo", &common.Block{}, func() {})
assert.Contains(t, icr.Channels(), cluster.ChannelGenesisBlock{ChannelName: "foo", GenesisBlock: &common.Block{}})

assert.Equal(t, []cluster.ChannelGenesisBlock{{ChannelName: "foo", GenesisBlock: &common.Block{}}}, icr.Channels())
icr.Close()
}
icr.TrackChain("bar", nil, func() {})
assert.Contains(t, icr.Channels(), cluster.ChannelGenesisBlock{ChannelName: "bar", GenesisBlock: nil})

func TestTrackChainNilGenesisBlock(t *testing.T) {
icr := &inactiveChainReplicator{
logger: flogging.MustGetLogger("test"),
}
assert.PanicsWithValue(t, "Called with a nil genesis block", func() {
icr.TrackChain("foo", nil, func() {})
})
icr.Close()
}

func injectConsenterCertificate(t *testing.T, block *common.Block, tlsCert []byte) {
Expand Down
7 changes: 7 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type Chain struct {

periodicChecker *PeriodicCheck

haltCallback func()
// BCCSP instane
CryptoProvider bccsp.BCCSP
}
Expand All @@ -205,6 +206,7 @@ func NewChain(
rpc RPC,
cryptoProvider bccsp.BCCSP,
f CreateBlockPuller,
haltCallback func(),
observeC chan<- raft.SoftState,
) (*Chain, error) {

Expand Down Expand Up @@ -264,6 +266,7 @@ func NewChain(
confState: cc,
createPuller: f,
clock: opts.Clock,
haltCallback: haltCallback,
Metrics: &Metrics{
ClusterSize: opts.Metrics.ClusterSize.With("channel", support.ChannelID()),
IsLeader: opts.Metrics.IsLeader.With("channel", support.ChannelID()),
Expand Down Expand Up @@ -423,6 +426,10 @@ func (c *Chain) Halt() {
return
}
<-c.doneC

if c.haltCallback != nil {
c.haltCallback()
}
}

func (c *Chain) isRunning() error {
Expand Down
8 changes: 6 additions & 2 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var _ = Describe("Chain", func() {
}

JustBeforeEach(func() {
chain, err = etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, observeC)
chain, err = etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, nil, observeC)
Expect(err).NotTo(HaveOccurred())

chain.Start()
Expand Down Expand Up @@ -799,7 +799,7 @@ var _ = Describe("Chain", func() {
os.Chmod(path.Join(walDir, f.Name()), 0300)
}

c, err := etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, observeC)
c, err := etcdraft.NewChain(support, opts, configurator, nil, cryptoProvider, noOpBlockPuller, nil, observeC)
Expect(c).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("permission denied")))
})
Expand Down Expand Up @@ -1216,6 +1216,7 @@ var _ = Describe("Chain", func() {
nil,
cryptoProvider,
nil,
nil,
observeC)
Expect(chain).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1249,6 +1250,7 @@ var _ = Describe("Chain", func() {
nil,
cryptoProvider,
noOpBlockPuller,
nil,
nil)
Expect(chain).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1278,6 +1280,7 @@ var _ = Describe("Chain", func() {
nil,
cryptoProvider,
noOpBlockPuller,
nil,
nil)
Expect(chain).To(BeNil())
Expect(err).To(MatchError(ContainSubstring("failed to initialize WAL: mkdir")))
Expand Down Expand Up @@ -3386,6 +3389,7 @@ func (c *chain) init() {
c.rpc,
c.cryptoProvider,
func() (etcdraft.BlockPuller, error) { return c.puller, nil },
nil,
c.observe,
)
Expect(err).NotTo(HaveOccurred())
Expand Down
3 changes: 3 additions & 0 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
func() (BlockPuller, error) {
return NewBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster, c.BCCSP)
},
func() {
c.InactiveChainRegistry.TrackChain(support.ChannelID(), nil, func() { c.CreateChain(support.ChannelID()) })
},
nil,
)
}
Expand Down

0 comments on commit cce2247

Please sign in to comment.