Skip to content

Commit

Permalink
[FAB-2758] Decouple anchor peers endpoints from orgIDs
Browse files Browse the repository at this point in the history
In the current implementation of the join channel message in the
gossip layer, each anchor peer has its own organization, and the
joinChannel invocation enumerates all anchor peers and builds a
list of organizationIDs from them.

When https://gerrit.hyperledger.org/r/#/c/7105/ will be introduced,
it will be needed to create a channel only with either no anchor peers
or anchor peers of the organization of the channel creator.
This would confuse the gossip layer because it'll derive that each channel
has only 1 organization instead of the original participants.

I decoupled the anchor peers from the organization IDs by changing
the JoinChannelMessage interface in api/channel.go to have
Members() []OrgIdentityType  which would
return the channel members (organizations)
and AnchorPeersOf(org OrgIdentityType) []AnchorPeer which would
return for each organization, its corresponding anchor peers.

Also (of course) updated the tests accordingly.

Change-Id: I90131a0726fc7ca68fd750247f4df558c5cca0fd
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Mar 14, 2017
1 parent 844fe2d commit f1a88db
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 99 deletions.
13 changes: 7 additions & 6 deletions gossip/api/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ type JoinChannelMessage interface {
// the JoinChannelMessage originated from
SequenceNumber() uint64

// AnchorPeers returns all the anchor peers that are in the channel
AnchorPeers() []AnchorPeer
// Members returns the organizations of the channel
Members() []OrgIdentityType

// AnchorPeersOf returns the anchor peers of the given organization
AnchorPeersOf(org OrgIdentityType) []AnchorPeer
}

// AnchorPeer is an anchor peer's certificate and endpoint (host:port)
type AnchorPeer struct {
Host string // Host is the hostname/ip address of the remote peer
Port int // Port is the port the remote peer is listening on
OrgID OrgIdentityType // OrgID is the identity of the organization the anchor peer came from

Host string // Host is the hostname/ip address of the remote peer
Port int // Port is the port the remote peer is listening on
}

// OrgIdentityType defines the identity of an organization
Expand Down
26 changes: 6 additions & 20 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Config struct {
PullPeerNum int
PullInterval time.Duration
RequestStateInfoInterval time.Duration
Identity api.PeerIdentityType
}

// GossipChannel defines an object that deals with all channel-related messages
Expand Down Expand Up @@ -103,9 +102,6 @@ type Adapter interface {
// hasn't been signed correctly, nil otherwise.
ValidateStateInfoMessage(message *proto.SignedGossipMessage) error

// OrgByPeerIdentity returns the organization ID of a given peer identity
OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType

// GetOrgOfPeer returns the organization ID of a given peer PKI-ID
GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType

Expand Down Expand Up @@ -328,6 +324,11 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
gc.Lock()
defer gc.Unlock()

if len(joinMsg.Members()) == 0 {
gc.logger.Warning("Received join channel message with empty set of members")
return
}

if gc.joinMsg == nil {
gc.joinMsg = joinMsg
}
Expand All @@ -337,22 +338,7 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
return
}

var orgs []api.OrgIdentityType
existingOrgInJoinChanMsg := make(map[string]struct{})
// We are in the channel if the joinMsg contains an empty set of anchor peers
selfOrg := gc.OrgByPeerIdentity(gc.GetConf().Identity)
if len(joinMsg.AnchorPeers()) == 0 {
orgs = []api.OrgIdentityType{selfOrg}
}
for _, anchorPeer := range joinMsg.AnchorPeers() {
orgID := anchorPeer.OrgID
if _, exists := existingOrgInJoinChanMsg[string(orgID)]; !exists {
orgs = append(orgs, orgID)
existingOrgInJoinChanMsg[string(orgID)] = struct{}{}
}
}

gc.orgs = orgs
gc.orgs = joinMsg.Members()
gc.joinMsg = joinMsg
}

Expand Down
58 changes: 32 additions & 26 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var conf = Config{
PullPeerNum: 3,
PullInterval: time.Second,
RequestStateInfoInterval: time.Millisecond * 100,
Identity: api.PeerIdentityType("pkiIDInOrg1"),
}

func init() {
Expand All @@ -66,8 +65,8 @@ var (
)

type joinChanMsg struct {
getTS func() time.Time
anchorPeers func() []api.AnchorPeer
getTS func() time.Time
members2AnchorPeers map[string][]api.AnchorPeer
}

// SequenceNumber returns the sequence number of the block
Expand All @@ -80,12 +79,26 @@ func (jcm *joinChanMsg) SequenceNumber() uint64 {
return uint64(time.Now().UnixNano())
}

// AnchorPeers returns all the anchor peers that are in the channel
func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer {
if jcm.anchorPeers != nil {
return jcm.anchorPeers()
// Members returns the organizations of the channel
func (jcm *joinChanMsg) Members() []api.OrgIdentityType {
if jcm.members2AnchorPeers == nil {
return []api.OrgIdentityType{orgInChannelA}
}
return []api.AnchorPeer{{OrgID: orgInChannelA}}
members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
i := 0
for org := range jcm.members2AnchorPeers {
members[i] = api.OrgIdentityType(org)
i++
}
return members
}

// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
if jcm.members2AnchorPeers == nil {
return []api.AnchorPeer{}
}
return jcm.members2AnchorPeers[string(org)]
}

type cryptoService struct {
Expand Down Expand Up @@ -197,10 +210,6 @@ func (ga *gossipAdapterMock) ValidateStateInfoMessage(msg *proto.SignedGossipMes
return args.Get(0).(error)
}

func (ga *gossipAdapterMock) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType {
return ga.GetOrgOfPeer(common.PKIidType(identity))
}

func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdentityType {
args := ga.Called(PKIIID)
return args.Get(0).(api.OrgIdentityType)
Expand Down Expand Up @@ -743,30 +752,30 @@ func TestChannelReconfigureChannel(t *testing.T) {
adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA)

outdatedJoinChanMsg := &joinChanMsg{
anchorPeers: func() []api.AnchorPeer {
return []api.AnchorPeer{{OrgID: orgNotInChannelA}}
},
getTS: func() time.Time {
return time.Now()
},
members2AnchorPeers: map[string][]api.AnchorPeer{
string(orgNotInChannelA): {},
},
}

newJoinChanMsg := &joinChanMsg{
anchorPeers: func() []api.AnchorPeer {
return []api.AnchorPeer{{OrgID: orgInChannelA}}
},
getTS: func() time.Time {
return time.Now().Add(time.Millisecond * 100)
},
members2AnchorPeers: map[string][]api.AnchorPeer{
string(orgInChannelA): {},
},
}

updatedJoinChanMsg := &joinChanMsg{
anchorPeers: func() []api.AnchorPeer {
return []api.AnchorPeer{{OrgID: orgNotInChannelA}}
},
getTS: func() time.Time {
return time.Now().Add(time.Millisecond * 200)
},
members2AnchorPeers: map[string][]api.AnchorPeer{
string(orgNotInChannelA): {},
},
}

gc := NewGossipChannel(cs, channelA, adapter, api.JoinChannelMessage(newJoinChanMsg))
Expand Down Expand Up @@ -832,11 +841,8 @@ func TestChannelNoAnchorPeers(t *testing.T) {
adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA)

jcm := &joinChanMsg{
anchorPeers: func() []api.AnchorPeer {
return []api.AnchorPeer{}
},
getTS: func() time.Time {
return time.Now().Add(time.Millisecond * 100)
members2AnchorPeers: map[string][]api.AnchorPeer{
string(orgInChannelA): {},
},
}

Expand Down
6 changes: 0 additions & 6 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (ga *gossipAdapterImpl) GetConf() channel.Config {
PullInterval: ga.conf.PullInterval,
PullPeerNum: ga.conf.PullPeerNum,
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval,
Identity: ga.selfIdentity,
}
}

Expand All @@ -105,11 +104,6 @@ func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.SignedGossipMes
return ga.gossipServiceImpl.validateStateInfoMsg(msg)
}

// OrgByPeerIdentity extracts the organization identifier from a peer's identity
func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType {
return ga.gossipServiceImpl.secAdvisor.OrgByPeerIdentity(identity)
}

// GetOrgOfPeer returns the organization identifier of a certain peer
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
Expand Down
19 changes: 12 additions & 7 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
// joinMsg is supposed to have been already verified
g.chanState.joinChannel(joinMsg, chainID)

for _, ap := range joinMsg.AnchorPeers() {
for _, org := range joinMsg.Members() {
g.learnAnchorPeers(org, joinMsg.AnchorPeersOf(org))
}
}

func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
for _, ap := range anchorPeers {
if ap.Host == "" {
g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap)
continue
Expand All @@ -191,20 +197,19 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
continue
}

inOurOrg := bytes.Equal(g.selfOrg, ap.OrgID)
inOurOrg := bytes.Equal(g.selfOrg, orgOfAnchorPeers)
if !inOurOrg && g.selfNetworkMember().Endpoint == "" {
g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(ap.OrgID))
g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(orgOfAnchorPeers))
continue
}
anchorPeerOrg := ap.OrgID
isInOurOrg := func() bool {
identity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint})
remotePeerIdentity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint})
if err != nil {
g.logger.Warning("Deep probe of", endpoint, "failed:", err)
return false
}
isAnchorPeerInMyOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(identity))
if bytes.Equal(anchorPeerOrg, g.selfOrg) && !isAnchorPeerInMyOrg {
isAnchorPeerInMyOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(remotePeerIdentity))
if bytes.Equal(orgOfAnchorPeers, g.selfOrg) && !isAnchorPeerInMyOrg {
g.logger.Warning("Anchor peer", endpoint, "isn't in our org, but is claimed to be")
}
return isAnchorPeerInMyOrg
Expand Down
35 changes: 24 additions & 11 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func acceptLeadershp(message interface{}) bool {
}

type joinChanMsg struct {
anchorPeers []api.AnchorPeer
members2AnchorPeers map[string][]api.AnchorPeer
}

// SequenceNumber returns the sequence number of the block this joinChanMsg
Expand All @@ -81,12 +81,26 @@ func (*joinChanMsg) SequenceNumber() uint64 {
return uint64(time.Now().UnixNano())
}

// AnchorPeers returns all the anchor peers that are in the channel
func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer {
if len(jcm.anchorPeers) == 0 {
return []api.AnchorPeer{{OrgID: orgInChannelA}}
// Members returns the organizations of the channel
func (jcm *joinChanMsg) Members() []api.OrgIdentityType {
if jcm.members2AnchorPeers == nil {
return []api.OrgIdentityType{orgInChannelA}
}
return jcm.anchorPeers
members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
i := 0
for org := range jcm.members2AnchorPeers {
members[i] = api.OrgIdentityType(org)
i++
}
return members
}

// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
if jcm.members2AnchorPeers == nil {
return []api.AnchorPeer{}
}
return jcm.members2AnchorPeers[string(org)]
}

type naiveCryptoService struct {
Expand Down Expand Up @@ -335,14 +349,13 @@ func TestConnectToAnchorPeers(t *testing.T) {
n := 10
anchorPeercount := 3

jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}}
jcm := &joinChanMsg{members2AnchorPeers: map[string][]api.AnchorPeer{string(orgInChannelA): {}}}
for i := 0; i < anchorPeercount; i++ {
ap := api.AnchorPeer{
Port: portPrefix + i,
Host: "localhost",
OrgID: orgInChannelA,
Port: portPrefix + i,
Host: "localhost",
}
jcm.anchorPeers = append(jcm.anchorPeers, ap)
jcm.members2AnchorPeers[string(orgInChannelA)] = append(jcm.members2AnchorPeers[string(orgInChannelA)], ap)
}

peers := make([]Gossip, n)
Expand Down
12 changes: 8 additions & 4 deletions gossip/gossip/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ func TestMultipleOrgEndpointLeakage(t *testing.T) {
}

jcm := &joinChanMsg{
anchorPeers: []api.AnchorPeer{
{Host: "localhost", Port: 11611, OrgID: api.OrgIdentityType(orgA)},
{Host: "localhost", Port: 11615, OrgID: api.OrgIdentityType(orgB)},
{Host: "localhost", Port: 11616, OrgID: api.OrgIdentityType(orgA)},
members2AnchorPeers: map[string][]api.AnchorPeer{
orgA: {
{Host: "localhost", Port: 11611},
{Host: "localhost", Port: 11616},
},
orgB: {
{Host: "localhost", Port: 11615},
},
},
}

Expand Down
29 changes: 20 additions & 9 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,28 @@ type gossipServiceImpl struct {

// This is an implementation of api.JoinChannelMessage.
type joinChannelMessage struct {
seqNum uint64
anchorPeers []api.AnchorPeer
seqNum uint64
members2AnchorPeers map[string][]api.AnchorPeer
}

func (jcm *joinChannelMessage) SequenceNumber() uint64 {
return jcm.seqNum
}

func (jcm *joinChannelMessage) AnchorPeers() []api.AnchorPeer {
return jcm.anchorPeers
// Members returns the organizations of the channel
func (jcm *joinChannelMessage) Members() []api.OrgIdentityType {
members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
i := 0
for org := range jcm.members2AnchorPeers {
members[i] = api.OrgIdentityType(org)
i++
}
return members
}

// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
return jcm.members2AnchorPeers[string(org)]
}

var logger = util.GetLogger(util.LoggingServiceModule, "")
Expand Down Expand Up @@ -209,15 +221,14 @@ func (g *gossipServiceImpl) configUpdated(config Config) {
"among the orgs of the channel:", orgListFromConfig(config), ", aborting.")
return
}
jcm := &joinChannelMessage{seqNum: config.Sequence(), anchorPeers: []api.AnchorPeer{}}
jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}}
for orgID, appOrg := range config.Organizations() {
for _, ap := range appOrg.AnchorPeers() {
anchorPeer := api.AnchorPeer{
Host: ap.Host,
Port: int(ap.Port),
OrgID: api.OrgIdentityType(orgID),
Host: ap.Host,
Port: int(ap.Port),
}
jcm.anchorPeers = append(jcm.anchorPeers, anchorPeer)
jcm.members2AnchorPeers[orgID] = append(jcm.members2AnchorPeers[orgID], anchorPeer)
}
}

Expand Down
Loading

0 comments on commit f1a88db

Please sign in to comment.