From f1a88db2b3634dda74da7a21b991bf7fc8656405 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Mon, 13 Mar 2017 14:05:39 +0200 Subject: [PATCH] [FAB-2758] Decouple anchor peers endpoints from orgIDs In the current implementation of the join channel message in the gossip layer, each anchor peer has its own organization, and the joinChannel invocation enumerates all anchor peers and builds a list of organizationIDs from them. When https://gerrit.hyperledger.org/r/#/c/7105/ will be introduced, it will be needed to create a channel only with either no anchor peers or anchor peers of the organization of the channel creator. This would confuse the gossip layer because it'll derive that each channel has only 1 organization instead of the original participants. I decoupled the anchor peers from the organization IDs by changing the JoinChannelMessage interface in api/channel.go to have Members() []OrgIdentityType which would return the channel members (organizations) and AnchorPeersOf(org OrgIdentityType) []AnchorPeer which would return for each organization, its corresponding anchor peers. Also (of course) updated the tests accordingly. Change-Id: I90131a0726fc7ca68fd750247f4df558c5cca0fd Signed-off-by: Yacov Manevich --- gossip/api/channel.go | 13 +++--- gossip/gossip/channel/channel.go | 26 +++--------- gossip/gossip/channel/channel_test.go | 58 +++++++++++++++------------ gossip/gossip/chanstate.go | 6 --- gossip/gossip/gossip_impl.go | 19 +++++---- gossip/gossip/gossip_test.go | 35 +++++++++++----- gossip/gossip/orgs_test.go | 12 ++++-- gossip/service/gossip_service.go | 29 +++++++++----- gossip/service/gossip_service_test.go | 15 +++---- gossip/state/state_test.go | 11 +++-- 10 files changed, 125 insertions(+), 99 deletions(-) diff --git a/gossip/api/channel.go b/gossip/api/channel.go index b3abab18e18..b95a758ac7b 100644 --- a/gossip/api/channel.go +++ b/gossip/api/channel.go @@ -46,16 +46,17 @@ type JoinChannelMessage interface { // the JoinChannelMessage originated from SequenceNumber() uint64 - // AnchorPeers returns all the anchor peers that are in the channel - AnchorPeers() []AnchorPeer + // Members returns the organizations of the channel + Members() []OrgIdentityType + + // AnchorPeersOf returns the anchor peers of the given organization + AnchorPeersOf(org OrgIdentityType) []AnchorPeer } // AnchorPeer is an anchor peer's certificate and endpoint (host:port) type AnchorPeer struct { - Host string // Host is the hostname/ip address of the remote peer - Port int // Port is the port the remote peer is listening on - OrgID OrgIdentityType // OrgID is the identity of the organization the anchor peer came from - + Host string // Host is the hostname/ip address of the remote peer + Port int // Port is the port the remote peer is listening on } // OrgIdentityType defines the identity of an organization diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index 4a037ef6eb1..54753ab2029 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -44,7 +44,6 @@ type Config struct { PullPeerNum int PullInterval time.Duration RequestStateInfoInterval time.Duration - Identity api.PeerIdentityType } // GossipChannel defines an object that deals with all channel-related messages @@ -103,9 +102,6 @@ type Adapter interface { // hasn't been signed correctly, nil otherwise. ValidateStateInfoMessage(message *proto.SignedGossipMessage) error - // OrgByPeerIdentity returns the organization ID of a given peer identity - OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType - // GetOrgOfPeer returns the organization ID of a given peer PKI-ID GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType @@ -328,6 +324,11 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) { gc.Lock() defer gc.Unlock() + if len(joinMsg.Members()) == 0 { + gc.logger.Warning("Received join channel message with empty set of members") + return + } + if gc.joinMsg == nil { gc.joinMsg = joinMsg } @@ -337,22 +338,7 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) { return } - var orgs []api.OrgIdentityType - existingOrgInJoinChanMsg := make(map[string]struct{}) - // We are in the channel if the joinMsg contains an empty set of anchor peers - selfOrg := gc.OrgByPeerIdentity(gc.GetConf().Identity) - if len(joinMsg.AnchorPeers()) == 0 { - orgs = []api.OrgIdentityType{selfOrg} - } - for _, anchorPeer := range joinMsg.AnchorPeers() { - orgID := anchorPeer.OrgID - if _, exists := existingOrgInJoinChanMsg[string(orgID)]; !exists { - orgs = append(orgs, orgID) - existingOrgInJoinChanMsg[string(orgID)] = struct{}{} - } - } - - gc.orgs = orgs + gc.orgs = joinMsg.Members() gc.joinMsg = joinMsg } diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index ad8586e07a9..c9b4e1585b5 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -44,7 +44,6 @@ var conf = Config{ PullPeerNum: 3, PullInterval: time.Second, RequestStateInfoInterval: time.Millisecond * 100, - Identity: api.PeerIdentityType("pkiIDInOrg1"), } func init() { @@ -66,8 +65,8 @@ var ( ) type joinChanMsg struct { - getTS func() time.Time - anchorPeers func() []api.AnchorPeer + getTS func() time.Time + members2AnchorPeers map[string][]api.AnchorPeer } // SequenceNumber returns the sequence number of the block @@ -80,12 +79,26 @@ func (jcm *joinChanMsg) SequenceNumber() uint64 { return uint64(time.Now().UnixNano()) } -// AnchorPeers returns all the anchor peers that are in the channel -func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { - if jcm.anchorPeers != nil { - return jcm.anchorPeers() +// Members returns the organizations of the channel +func (jcm *joinChanMsg) Members() []api.OrgIdentityType { + if jcm.members2AnchorPeers == nil { + return []api.OrgIdentityType{orgInChannelA} } - return []api.AnchorPeer{{OrgID: orgInChannelA}} + members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers)) + i := 0 + for org := range jcm.members2AnchorPeers { + members[i] = api.OrgIdentityType(org) + i++ + } + return members +} + +// AnchorPeersOf returns the anchor peers of the given organization +func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer { + if jcm.members2AnchorPeers == nil { + return []api.AnchorPeer{} + } + return jcm.members2AnchorPeers[string(org)] } type cryptoService struct { @@ -197,10 +210,6 @@ func (ga *gossipAdapterMock) ValidateStateInfoMessage(msg *proto.SignedGossipMes return args.Get(0).(error) } -func (ga *gossipAdapterMock) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { - return ga.GetOrgOfPeer(common.PKIidType(identity)) -} - func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdentityType { args := ga.Called(PKIIID) return args.Get(0).(api.OrgIdentityType) @@ -743,30 +752,30 @@ func TestChannelReconfigureChannel(t *testing.T) { adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA) outdatedJoinChanMsg := &joinChanMsg{ - anchorPeers: func() []api.AnchorPeer { - return []api.AnchorPeer{{OrgID: orgNotInChannelA}} - }, getTS: func() time.Time { return time.Now() }, + members2AnchorPeers: map[string][]api.AnchorPeer{ + string(orgNotInChannelA): {}, + }, } newJoinChanMsg := &joinChanMsg{ - anchorPeers: func() []api.AnchorPeer { - return []api.AnchorPeer{{OrgID: orgInChannelA}} - }, getTS: func() time.Time { return time.Now().Add(time.Millisecond * 100) }, + members2AnchorPeers: map[string][]api.AnchorPeer{ + string(orgInChannelA): {}, + }, } updatedJoinChanMsg := &joinChanMsg{ - anchorPeers: func() []api.AnchorPeer { - return []api.AnchorPeer{{OrgID: orgNotInChannelA}} - }, getTS: func() time.Time { return time.Now().Add(time.Millisecond * 200) }, + members2AnchorPeers: map[string][]api.AnchorPeer{ + string(orgNotInChannelA): {}, + }, } gc := NewGossipChannel(cs, channelA, adapter, api.JoinChannelMessage(newJoinChanMsg)) @@ -832,11 +841,8 @@ func TestChannelNoAnchorPeers(t *testing.T) { adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA) jcm := &joinChanMsg{ - anchorPeers: func() []api.AnchorPeer { - return []api.AnchorPeer{} - }, - getTS: func() time.Time { - return time.Now().Add(time.Millisecond * 100) + members2AnchorPeers: map[string][]api.AnchorPeer{ + string(orgInChannelA): {}, }, } diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go index 429b85a7232..083c45d48f3 100644 --- a/gossip/gossip/chanstate.go +++ b/gossip/gossip/chanstate.go @@ -86,7 +86,6 @@ func (ga *gossipAdapterImpl) GetConf() channel.Config { PullInterval: ga.conf.PullInterval, PullPeerNum: ga.conf.PullPeerNum, RequestStateInfoInterval: ga.conf.RequestStateInfoInterval, - Identity: ga.selfIdentity, } } @@ -105,11 +104,6 @@ func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.SignedGossipMes return ga.gossipServiceImpl.validateStateInfoMsg(msg) } -// OrgByPeerIdentity extracts the organization identifier from a peer's identity -func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { - return ga.gossipServiceImpl.secAdvisor.OrgByPeerIdentity(identity) -} - // GetOrgOfPeer returns the organization identifier of a certain peer func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType { return ga.gossipServiceImpl.getOrgOfPeer(PKIID) diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index a79070ba1b9..572c8363ebc 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -175,7 +175,13 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com // joinMsg is supposed to have been already verified g.chanState.joinChannel(joinMsg, chainID) - for _, ap := range joinMsg.AnchorPeers() { + for _, org := range joinMsg.Members() { + g.learnAnchorPeers(org, joinMsg.AnchorPeersOf(org)) + } +} + +func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) { + for _, ap := range anchorPeers { if ap.Host == "" { g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap) continue @@ -191,20 +197,19 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com continue } - inOurOrg := bytes.Equal(g.selfOrg, ap.OrgID) + inOurOrg := bytes.Equal(g.selfOrg, orgOfAnchorPeers) if !inOurOrg && g.selfNetworkMember().Endpoint == "" { - g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(ap.OrgID)) + g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(orgOfAnchorPeers)) continue } - anchorPeerOrg := ap.OrgID isInOurOrg := func() bool { - identity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint}) + remotePeerIdentity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint}) if err != nil { g.logger.Warning("Deep probe of", endpoint, "failed:", err) return false } - isAnchorPeerInMyOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(identity)) - if bytes.Equal(anchorPeerOrg, g.selfOrg) && !isAnchorPeerInMyOrg { + isAnchorPeerInMyOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(remotePeerIdentity)) + if bytes.Equal(orgOfAnchorPeers, g.selfOrg) && !isAnchorPeerInMyOrg { g.logger.Warning("Anchor peer", endpoint, "isn't in our org, but is claimed to be") } return isAnchorPeerInMyOrg diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index c696ae98d23..f0039f4c3e3 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -72,7 +72,7 @@ func acceptLeadershp(message interface{}) bool { } type joinChanMsg struct { - anchorPeers []api.AnchorPeer + members2AnchorPeers map[string][]api.AnchorPeer } // SequenceNumber returns the sequence number of the block this joinChanMsg @@ -81,12 +81,26 @@ func (*joinChanMsg) SequenceNumber() uint64 { return uint64(time.Now().UnixNano()) } -// AnchorPeers returns all the anchor peers that are in the channel -func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { - if len(jcm.anchorPeers) == 0 { - return []api.AnchorPeer{{OrgID: orgInChannelA}} +// Members returns the organizations of the channel +func (jcm *joinChanMsg) Members() []api.OrgIdentityType { + if jcm.members2AnchorPeers == nil { + return []api.OrgIdentityType{orgInChannelA} } - return jcm.anchorPeers + members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers)) + i := 0 + for org := range jcm.members2AnchorPeers { + members[i] = api.OrgIdentityType(org) + i++ + } + return members +} + +// AnchorPeersOf returns the anchor peers of the given organization +func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer { + if jcm.members2AnchorPeers == nil { + return []api.AnchorPeer{} + } + return jcm.members2AnchorPeers[string(org)] } type naiveCryptoService struct { @@ -335,14 +349,13 @@ func TestConnectToAnchorPeers(t *testing.T) { n := 10 anchorPeercount := 3 - jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}} + jcm := &joinChanMsg{members2AnchorPeers: map[string][]api.AnchorPeer{string(orgInChannelA): {}}} for i := 0; i < anchorPeercount; i++ { ap := api.AnchorPeer{ - Port: portPrefix + i, - Host: "localhost", - OrgID: orgInChannelA, + Port: portPrefix + i, + Host: "localhost", } - jcm.anchorPeers = append(jcm.anchorPeers, ap) + jcm.members2AnchorPeers[string(orgInChannelA)] = append(jcm.members2AnchorPeers[string(orgInChannelA)], ap) } peers := make([]Gossip, n) diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index c748871ca43..3de0647da11 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -192,10 +192,14 @@ func TestMultipleOrgEndpointLeakage(t *testing.T) { } jcm := &joinChanMsg{ - anchorPeers: []api.AnchorPeer{ - {Host: "localhost", Port: 11611, OrgID: api.OrgIdentityType(orgA)}, - {Host: "localhost", Port: 11615, OrgID: api.OrgIdentityType(orgB)}, - {Host: "localhost", Port: 11616, OrgID: api.OrgIdentityType(orgA)}, + members2AnchorPeers: map[string][]api.AnchorPeer{ + orgA: { + {Host: "localhost", Port: 11611}, + {Host: "localhost", Port: 11616}, + }, + orgB: { + {Host: "localhost", Port: 11615}, + }, }, } diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 8591a8d37d5..ed153ab8ee5 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -87,16 +87,28 @@ type gossipServiceImpl struct { // This is an implementation of api.JoinChannelMessage. type joinChannelMessage struct { - seqNum uint64 - anchorPeers []api.AnchorPeer + seqNum uint64 + members2AnchorPeers map[string][]api.AnchorPeer } func (jcm *joinChannelMessage) SequenceNumber() uint64 { return jcm.seqNum } -func (jcm *joinChannelMessage) AnchorPeers() []api.AnchorPeer { - return jcm.anchorPeers +// Members returns the organizations of the channel +func (jcm *joinChannelMessage) Members() []api.OrgIdentityType { + members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers)) + i := 0 + for org := range jcm.members2AnchorPeers { + members[i] = api.OrgIdentityType(org) + i++ + } + return members +} + +// AnchorPeersOf returns the anchor peers of the given organization +func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer { + return jcm.members2AnchorPeers[string(org)] } var logger = util.GetLogger(util.LoggingServiceModule, "") @@ -209,15 +221,14 @@ func (g *gossipServiceImpl) configUpdated(config Config) { "among the orgs of the channel:", orgListFromConfig(config), ", aborting.") return } - jcm := &joinChannelMessage{seqNum: config.Sequence(), anchorPeers: []api.AnchorPeer{}} + jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}} for orgID, appOrg := range config.Organizations() { for _, ap := range appOrg.AnchorPeers() { anchorPeer := api.AnchorPeer{ - Host: ap.Host, - Port: int(ap.Port), - OrgID: api.OrgIdentityType(orgID), + Host: ap.Host, + Port: int(ap.Port), } - jcm.anchorPeers = append(jcm.anchorPeers, anchorPeer) + jcm.members2AnchorPeers[orgID] = append(jcm.members2AnchorPeers[orgID], anchorPeer) } } diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 69368502d12..4b9f5af3bca 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -440,7 +440,6 @@ func (es *electionService) callback(isLeader bool) { } type joinChanMsg struct { - anchorPeers []api.AnchorPeer } // SequenceNumber returns the sequence number of the block this joinChanMsg @@ -449,12 +448,14 @@ func (jmc *joinChanMsg) SequenceNumber() uint64 { return uint64(time.Now().UnixNano()) } -// AnchorPeers returns all the anchor peers that are in the channel -func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { - if len(jcm.anchorPeers) == 0 { - return []api.AnchorPeer{{OrgID: orgInChannelA}} - } - return jcm.anchorPeers +// Members returns the organizations of the channel +func (jcm *joinChanMsg) Members() []api.OrgIdentityType { + return []api.OrgIdentityType{orgInChannelA} +} + +// AnchorPeersOf returns the anchor peers of the given organization +func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer { + return []api.AnchorPeer{} } func waitForFullMembership(t *testing.T, gossips []GossipService, peersNum int, timeout time.Duration, testPollInterval time.Duration) bool { diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index be3822032af..adf83237bc1 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -65,9 +65,14 @@ func (*joinChanMsg) SequenceNumber() uint64 { return uint64(time.Now().UnixNano()) } -// AnchorPeers returns all the anchor peers that are in the channel -func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { - return []api.AnchorPeer{{OrgID: orgID}} +// Members returns the organizations of the channel +func (jcm *joinChanMsg) Members() []api.OrgIdentityType { + return []api.OrgIdentityType{orgID} +} + +// AnchorPeersOf returns the anchor peers of the given organization +func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer { + return []api.AnchorPeer{} } type orgCryptoService struct {