Skip to content

Commit

Permalink
FAB-16286 Split raft Broadcast/Deliver IT tool out
Browse files Browse the repository at this point in the history
The Raft tests utilize direct Broadcast/Deliver to the orderers.  This
is useful for other tests, particularly ones which need to bypass peers
and send directly to ordering (like, the forthcoming instantiation
policy test).

This CR simply splits the core function out of Raft and into nwo.
Further refactoring may be done in the future.

Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
Change-Id: I7aec7028a37570248b612d2ce53f99a27670ea9e
  • Loading branch information
Jason Yellick committed Nov 15, 2019
1 parent 59bad46 commit 7b19faa
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 117 deletions.
113 changes: 113 additions & 0 deletions integration/nwo/orderer_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package nwo

import (
"context"
"io/ioutil"
"path"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/core/comm"
"github.com/pkg/errors"
)

// Broadcast sends given env to Broadcast API of specified orderer.
func Broadcast(n *Network, o *Orderer, env *common.Envelope) (*orderer.BroadcastResponse, error) {
gRPCclient, err := CreateGRPCClient(n, o)
if err != nil {
return nil, err
}

addr := n.OrdererAddress(o, ListenPort)
conn, err := gRPCclient.NewConnection(addr)
if err != nil {
return nil, err
}
defer conn.Close()

broadcaster, err := orderer.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
if err != nil {
return nil, err
}

err = broadcaster.Send(env)
if err != nil {
return nil, err
}

resp, err := broadcaster.Recv()
if err != nil {
return nil, err
}

return resp, nil
}

// Deliver sends given env to Deliver API of specified orderer.
func Deliver(n *Network, o *Orderer, env *common.Envelope) (*common.Block, error) {
gRPCclient, err := CreateGRPCClient(n, o)
if err != nil {
return nil, err
}

addr := n.OrdererAddress(o, ListenPort)
conn, err := gRPCclient.NewConnection(addr)
if err != nil {
return nil, err
}
defer conn.Close()

deliverer, err := orderer.NewAtomicBroadcastClient(conn).Deliver(context.Background())
if err != nil {
return nil, err
}

err = deliverer.Send(env)
if err != nil {
return nil, err
}

resp, err := deliverer.Recv()
if err != nil {
return nil, err
}

blk := resp.GetBlock()
if blk == nil {
return nil, errors.Errorf("block not found")
}

return blk, nil
}

func CreateGRPCClient(n *Network, o *Orderer) (*comm.GRPCClient, error) {
config := comm.ClientConfig{}
config.Timeout = 5 * time.Second

secOpts := comm.SecureOptions{
UseTLS: true,
RequireClientCert: false,
}

caPEM, err := ioutil.ReadFile(path.Join(n.OrdererLocalTLSDir(o), "ca.crt"))
if err != nil {
return nil, err
}

secOpts.ServerRootCAs = [][]byte{caPEM}
config.SecOpts = secOpts

grpcClient, err := comm.NewGRPCClient(config)
if err != nil {
return nil, err
}

return grpcClient, nil
}
18 changes: 9 additions & 9 deletions integration/raft/cft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {

By("performing operation with orderer1")
env := CreateBroadcastEnvelope(network, o1, network.SystemChannel.Name, []byte("foo"))
resp, err := Broadcast(network, o1, env)
resp, err := nwo.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand All @@ -116,7 +116,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
Eventually(o1Proc.Wait(), network.EventuallyTimeout).Should(Receive(MatchError("exit status 137")))

By("broadcasting envelope to running orderer")
resp, err = Broadcast(network, o2, env)
resp, err = nwo.Broadcast(network, o2, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand All @@ -130,7 +130,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
findLeader([]*ginkgomon.Runner{o1Runner})

By("broadcasting envelope to restarted orderer")
resp, err = Broadcast(network, o1, env)
resp, err = nwo.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand Down Expand Up @@ -176,7 +176,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
env := CreateBroadcastEnvelope(network, o2, channelID, make([]byte, 2000))
for i := 1; i <= 4; i++ { // 4 < MaxSnapshotFiles(5), so that no snapshot is pruned
// Note that MaxMessageCount is 1 be default, so every tx results in a new block
resp, err := Broadcast(network, o2, env)
resp, err := nwo.Broadcast(network, o2, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand Down Expand Up @@ -222,7 +222,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {

By("Asserting cluster is still functional")
env = CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000))
resp, err := Broadcast(network, o1, env)
resp, err := nwo.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand Down Expand Up @@ -334,7 +334,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
// This should fail because current leader steps down
// and there is no leader at this point of time
env := CreateBroadcastEnvelope(network, leader, network.SystemChannel.Name, []byte("foo"))
resp, err := Broadcast(network, leader, env)
resp, err := nwo.Broadcast(network, leader, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SERVICE_UNAVAILABLE))
})
Expand Down Expand Up @@ -522,7 +522,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
channelCreateTxn := createConfigTx(updateTransaction, network.SystemChannel.Name, network, orderer, peer)

By("Updating channel config and failing")
p, err := Broadcast(network, orderer, channelCreateTxn)
p, err := nwo.Broadcast(network, orderer, channelCreateTxn)
Expect(err).NotTo(HaveOccurred())
Expect(p.Status).To(Equal(common.Status_BAD_REQUEST))
Expect(p.Info).To(ContainSubstring("identity expired"))
Expand All @@ -531,7 +531,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
denv := CreateDeliverEnvelope(network, orderer, 0, network.SystemChannel.Name)
Expect(denv).NotTo(BeNil())

block, err := Deliver(network, orderer, denv)
block, err := nwo.Deliver(network, orderer, denv)
Expect(denv).NotTo(BeNil())
Expect(block).To(BeNil())
Eventually(runner.Err(), time.Minute, time.Second).Should(gbytes.Say("client identity expired"))
Expand All @@ -548,7 +548,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
findLeader([]*ginkgomon.Runner{runner})

By("Updating channel config and succeeding")
p, err = Broadcast(network, orderer, channelCreateTxn)
p, err = nwo.Broadcast(network, orderer, channelCreateTxn)
Expect(err).NotTo(HaveOccurred())
Expect(p.Status).To(Equal(common.Status_SUCCESS))

Expand Down
103 changes: 1 addition & 102 deletions integration/raft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,129 +7,28 @@ SPDX-License-Identifier: Apache-2.0
package raft

import (
"context"
"io/ioutil"
"path"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/cmd/common/signer"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
)

// Broadcast sends given env to Broadcast API of specified orderer.
func Broadcast(n *nwo.Network, o *nwo.Orderer, env *common.Envelope) (*orderer.BroadcastResponse, error) {
gRPCclient, err := CreateGRPCClient(n, o)
if err != nil {
return nil, err
}

addr := n.OrdererAddress(o, nwo.ListenPort)
conn, err := gRPCclient.NewConnection(addr)
if err != nil {
return nil, err
}
defer conn.Close()

broadcaster, err := orderer.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
if err != nil {
return nil, err
}

err = broadcaster.Send(env)
if err != nil {
return nil, err
}

resp, err := broadcaster.Recv()
if err != nil {
return nil, err
}

return resp, nil
}

// Deliver sends given env to Deliver API of specified orderer.
func Deliver(n *nwo.Network, o *nwo.Orderer, env *common.Envelope) (*common.Block, error) {
gRPCclient, err := CreateGRPCClient(n, o)
if err != nil {
return nil, err
}

addr := n.OrdererAddress(o, nwo.ListenPort)
conn, err := gRPCclient.NewConnection(addr)
if err != nil {
return nil, err
}
defer conn.Close()

deliverer, err := orderer.NewAtomicBroadcastClient(conn).Deliver(context.Background())
if err != nil {
return nil, err
}

err = deliverer.Send(env)
if err != nil {
return nil, err
}

resp, err := deliverer.Recv()
if err != nil {
return nil, err
}

blk := resp.GetBlock()
if blk == nil {
return nil, errors.Errorf("block not found")
}

return blk, nil
}

func FetchBlock(n *nwo.Network, o *nwo.Orderer, seq uint64, channel string) *common.Block {
denv := CreateDeliverEnvelope(n, o, seq, channel)
Expect(denv).NotTo(BeNil())

var blk *common.Block
Eventually(func() error {
var err error
blk, err = Deliver(n, o, denv)
blk, err = nwo.Deliver(n, o, denv)
return err
}, n.EventuallyTimeout).ShouldNot(HaveOccurred())

return blk
}

func CreateGRPCClient(n *nwo.Network, o *nwo.Orderer) (*comm.GRPCClient, error) {
config := comm.ClientConfig{}
config.Timeout = 5 * time.Second

secOpts := comm.SecureOptions{
UseTLS: true,
RequireClientCert: false,
}

caPEM, err := ioutil.ReadFile(path.Join(n.OrdererLocalTLSDir(o), "ca.crt"))
if err != nil {
return nil, err
}

secOpts.ServerRootCAs = [][]byte{caPEM}
config.SecOpts = secOpts

grpcClient, err := comm.NewGRPCClient(config)
if err != nil {
return nil, err
}

return grpcClient, nil
}

func CreateBroadcastEnvelope(n *nwo.Network, signer interface{}, channel string, data []byte) *common.Envelope {
env, err := protoutil.CreateSignedEnvelope(
common.HeaderType_MESSAGE,
Expand Down
12 changes: 6 additions & 6 deletions integration/raft/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {

By("Broadcasting envelope to testchannel")
env := CreateBroadcastEnvelope(network, peer, "testchannel", []byte("hello"))
resp, err := Broadcast(network, o1, env)
resp, err := nwo.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand Down Expand Up @@ -643,12 +643,12 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {

By("Ensuring orderer4 doesn't serve testchannel2 and testchannel3")
env = CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
resp, err = Broadcast(network, o4, env)
resp, err = nwo.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SERVICE_UNAVAILABLE))

env = CreateBroadcastEnvelope(network, peer, "testchannel3", []byte("hello"))
resp, err = Broadcast(network, o4, env)
resp, err = nwo.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SERVICE_UNAVAILABLE))

Expand Down Expand Up @@ -676,7 +676,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {

By("Submitting a transaction through orderer4")
env = CreateBroadcastEnvelope(network, peer, "testchannel2", []byte("hello"))
resp, err = Broadcast(network, o4, env)
resp, err = nwo.Broadcast(network, o4, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

Expand Down Expand Up @@ -940,7 +940,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}, []*nwo.Orderer{orderers[firstEvictedNode]}, peer, network)

env := CreateBroadcastEnvelope(network, orderers[secondEvictedNode], network.SystemChannel.Name, []byte("foo"))
resp, err := Broadcast(network, orderers[surviver], env)
resp, err := nwo.Broadcast(network, orderers[surviver], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
})
Expand Down Expand Up @@ -1165,7 +1165,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
}

env := CreateBroadcastEnvelope(network, orderers[4], network.SystemChannel.Name, []byte("hello"))
resp, err := Broadcast(network, orderers[4], env)
resp, err := nwo.Broadcast(network, orderers[4], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
blockSeq++
Expand Down

0 comments on commit 7b19faa

Please sign in to comment.