Skip to content

Commit

Permalink
[FAB-15389] Fix private data dissemination
Browse files Browse the repository at this point in the history
Endorsing peer was not honoring maxPeerCount for private data
dissemination due to the addition of the spraying logic for balancing
selected peers for disseminiation. There was a chance that peers chosen
from the remaining count up to maxPeerCount could overlap with the
already selected peers for dissemination from the spray, causing the
dissemination plan to be incomplete. This CR ensures that once a peer
has been selected to disseminate private data to, it can not be selected
again when computing the dissemination plan for a particular tx. Also
added logging around dissemination for debugging purposes.

Change-Id: Ib31c37b036363787718dba74e7e90a7764afe7fb
Signed-off-by: Danny Cao <dcao@us.ibm.com>
  • Loading branch information
caod123 authored and mastersingh24 committed Nov 9, 2019
1 parent ffa3335 commit b97ca73
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 46 deletions.
86 changes: 70 additions & 16 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
return nil, errors.WithStack(err)
}

logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -215,20 +216,37 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
return nil, err
}

m := pvtDataMsg.GetPrivateData().Payload

eligiblePeers := d.eligiblePeersOfChannel(routingFilter)
identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP)

// Select one representative from each org
peerEndpoints := map[string]string{}
for _, peer := range eligiblePeers {
epToAdd := peer.Endpoint
if epToAdd == "" {
epToAdd = peer.InternalEndpoint
}
peerEndpoints[string(peer.PKIid)] = epToAdd
}

maximumPeerCount := colAP.MaximumPeerCount()
requiredPeerCount := colAP.RequiredPeerCount()

remainingPeers := []api.PeerIdentityInfo{}
selectedPeerEndpoints := []string{}

rand.Seed(time.Now().Unix())
// Select one representative from each org
if maximumPeerCount > 0 {
for _, selectionPeers := range identitySets {
required := 1
if requiredPeerCount == 0 {
required = 0
}
peer2SendPerOrg := selectionPeers[rand.Intn(len(selectionPeers))]
selectedPeerIndex := rand.Intn(len(selectionPeers))
peer2SendPerOrg := selectionPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
Expand All @@ -246,34 +264,70 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
},
})

// Add unselected peers to remainingPeers
for i, peer := range selectionPeers {
if i != selectedPeerIndex {
remainingPeers = append(remainingPeers, peer)
}
}

if requiredPeerCount > 0 {
requiredPeerCount--
}

maximumPeerCount--
if maximumPeerCount == 0 {
logger.Debug("MaximumPeerCount satisfied")
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}
}
}

// criteria to select remaining peers to satisfy colAP.MaximumPeerCount()
// collection policy parameters
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: maximumPeerCount,
MinAck: requiredPeerCount,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
// criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still
// unselected peers remaining for dissemination
numPeersToSelect := maximumPeerCount
if len(remainingPeers) < maximumPeerCount {
numPeersToSelect = len(remainingPeers)
}
if numPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect)
}
for maximumPeerCount > 0 && len(remainingPeers) > 0 {
required := 1
if requiredPeerCount == 0 {
required = 0
}
selectedPeerIndex := rand.Intn(len(remainingPeers))
peer2Send := remainingPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: 1,
MinAck: required,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2Send.PKIId)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: &protoext.SignedGossipMessage{
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
},
})
if requiredPeerCount > 0 {
requiredPeerCount--
}

disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
maximumPeerCount--

// remove the selected peer from remaining peers
remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...)
}

logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/privdata/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestDistributor(t *testing.T) {
},
}, 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed disseminating 4 out of 4 private dissemination plans")
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private dissemination plans")

assert.Equal(t,
[]string{"channel", channelID},
Expand Down
105 changes: 76 additions & 29 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,7 @@ var _ bool = Describe("PrivateData", func() {
)

BeforeEach(func() {
testDir, network, process, orderer, allPeers = initThreeOrgsSetup()
helper = &testHelper{
networkHelper: &networkHelper{
Network: network,
orderer: orderer,
peers: allPeers,
testDir: testDir,
channelID: "testchannel",
},
}

testDir, network = initThreeOrgsSetup()
legacyChaincode = nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Expand All @@ -101,12 +91,61 @@ var _ bool = Describe("PrivateData", func() {
}
})

JustBeforeEach(func() {
process, orderer, allPeers = startNetwork(network)
helper = &testHelper{
networkHelper: &networkHelper{
Network: network,
orderer: orderer,
peers: allPeers,
testDir: testDir,
channelID: "testchannel",
},
}
})

AfterEach(func() {
testCleanup(testDir, network, process)
})

Describe("Reconciliation", func() {
BeforeEach(func() {
Describe("Dissemination", func() {
When("pulling is disabled by setting the pull retry threshold to 0", func() {
BeforeEach(func() {
// set pull retry threshold to 0
peers := []*nwo.Peer{
network.Peer("org1", "peer0"),
network.Peer("org2", "peer0"),
network.Peer("org3", "peer0"),
}
for _, p := range peers {
core := network.ReadPeerConfig(p)
core.Peer.Gossip.PvtData.PullRetryThreshold = 0
network.WritePeerConfig(p, core)
}
})

JustBeforeEach(func() {
By("deploying legacy chaincode and adding marble1")
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
}
helper.deployChaincode(testChaincode)
helper.addMarble(testChaincode.Name,
`{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`,
network.Peer("org1", "peer0"),
)
})

It("disseminates private data per collections_config1", func() {
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1")
})
})

})

Describe("Reconciliation and pulling", func() {
JustBeforeEach(func() {
By("deploying legacy chaincode and adding marble1")
testChaincode = chaincode{
Chaincode: legacyChaincode,
Expand All @@ -125,7 +164,7 @@ var _ bool = Describe("PrivateData", func() {
})

When("org3 is added to collectionMarbles via chaincode upgrade with collections_config2", func() {
BeforeEach(func() {
JustBeforeEach(func() {
// collections_config2.json defines the access as follows:
// 1. collectionMarbles - Org1, Org2 and Org3 have access to this collection
// 2. collectionMarblePrivateDetails - Org2 and Org3 have access to this collection
Expand All @@ -151,22 +190,22 @@ var _ bool = Describe("PrivateData", func() {
var (
newPeer *nwo.Peer
)
BeforeEach(func() {
JustBeforeEach(func() {
newPeer = network.Peer("org1", "peer1")
helper.addPeer(newPeer)
allPeers = append(allPeers, newPeer)
helper.installChaincode(testChaincode, newPeer)
network.VerifyMembership(allPeers, "testchannel", "marblesp")
})

It("causes the new peer to receive the existing private data only for collectionMarbles", func() {
It("causes the new peer to pull the existing private data only for collectionMarbles", func() {
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1", newPeer)
})
})
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -176,7 +215,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode is migrated from legacy to new lifecycle with same collection config", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand Down Expand Up @@ -228,7 +267,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -238,7 +277,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand Down Expand Up @@ -270,7 +309,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -280,7 +319,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand All @@ -292,7 +331,7 @@ var _ bool = Describe("PrivateData", func() {
})

Describe("Collection Config Updates", func() {
BeforeEach(func() {
JustBeforeEach(func() {
By("deploying legacy chaincode")
testChaincode = chaincode{
Chaincode: legacyChaincode,
Expand All @@ -302,7 +341,7 @@ var _ bool = Describe("PrivateData", func() {
})

When("migrating a chaincode from legacy lifecycle to new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
nwo.EnableCapabilities(network, "testchannel", "Application", "V2_0", orderer, allPeers...)
newLifecycleChaincode.CollectionsConfig = collectionConfig("short_btl_config.json")
newLifecycleChaincode.PackageID = "test-package-id"
Expand Down Expand Up @@ -433,7 +472,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -449,7 +488,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand All @@ -468,7 +507,7 @@ var _ bool = Describe("PrivateData", func() {

})

func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
func initThreeOrgsSetup() (string, *nwo.Network) {
var err error
testDir, err := ioutil.TempDir("", "e2e-pvtdata")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -485,8 +524,12 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []

n := nwo.New(networkConfig, testDir, client, StartPort(), components)
n.GenerateConfigTree()
n.Bootstrap()

return testDir, n
}

func startNetwork(n *nwo.Network) (ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
n.Bootstrap()
networkRunner := n.NetworkGroupRunner()
process := ifrit.Invoke(networkRunner)
Eventually(process.Ready(), n.EventuallyTimeout).Should(BeClosed())
Expand All @@ -504,7 +547,7 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []
By("verifying membership")
n.VerifyMembership(expectedPeers, "testchannel")

return testDir, n, process, orderer, expectedPeers
return process, orderer, expectedPeers
}

func testCleanup(testDir string, network *nwo.Network, process ifrit.Process) {
Expand Down Expand Up @@ -734,6 +777,10 @@ func (th *testHelper) assertPvtdataPresencePerCollectionConfig2(chaincodeName, m
}
}

func (th *testHelper) assertPvtdataPresencePerCollectionConfig5(chaincodeName, marbleName string, peers ...*nwo.Peer) {
th.assertPvtdataPresencePerCollectionConfig1(chaincodeName, marbleName, peers...)
}

// assertPresentInCollectionM asserts that the private data for given marble is present in collection
// 'readMarble' at the given peers
func (th *testHelper) assertPresentInCollectionM(chaincodeName, marbleName string, peerList ...*nwo.Peer) {
Expand Down
Loading

0 comments on commit b97ca73

Please sign in to comment.