Skip to content

Commit

Permalink
[FAB-2061] Gossip inter-org confidentiality - P2
Browse files Browse the repository at this point in the history
In the previous commit, I adjusted the membership routing so that
membership would only be disseminated between pairs of organizations.

The only thing left is to deal with identity messages, which are
disseminated using the pull mechanism.

This commit adds a filtering capability to the pull engine,
and to its enclosing object- the pull mediator.

The filter acts on a context of a message (that contains info about the
peer's credentials, hence it's organization) sent from the remote peer
(helloMsg and requestMsg) and return a function that declares for each
item (digest or item in the response) whether it should be sent to the remote peer.

In the next commit, I shall write filtering logic to make it so
that peers would gossip identities with destination orgs only
if they are their own identities.

Change-Id: I7996bafe5ce2962279c01e59c57868057cfd1d7e
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Apr 18, 2017
1 parent 077126e commit 90b4c72
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 23 deletions.
35 changes: 29 additions & 6 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func SetResponseWaitTime(time time.Duration) {
viper.Set("peer.gossip.responseWaitTime", time)
}

// DigestFilter filters digests to be sent to a remote peer that
// sent a hello or a request, based on its messages's context
type DigestFilter func(context interface{}) func(digestItem string) bool

// PullAdapter is needed by the PullEngine in order to
// send messages to the remote PullEngine instances.
// The PullEngine expects to be invoked with
Expand Down Expand Up @@ -109,11 +113,12 @@ type PullEngine struct {
lock sync.Mutex
outgoingNONCES *util.Set
incomingNONCES *util.Set
digFilter DigestFilter
}

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
// NewPullEngineWithFilter creates an instance of a PullEngine with a certain sleep time
// between pull initiations, and uses the given filters when sending digests and responses
func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
engine := &PullEngine{
PullAdapter: participant,
stopFlag: int32(0),
Expand All @@ -125,6 +130,7 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
acceptingResponses: int32(0),
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
digFilter: df,
}

go func() {
Expand All @@ -140,8 +146,19 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine
return engine
}

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
acceptAllFilter := func(_ interface{}) func(string) bool {
return func(_ string) bool {
return true
}
}
return NewPullEngineWithFilter(participant, sleepTime, acceptAllFilter)
}

func (engine *PullEngine) toDie() bool {
return (atomic.LoadInt32(&(engine.stopFlag)) == int32(1))
return atomic.LoadInt32(&(engine.stopFlag)) == int32(1)
}

func (engine *PullEngine) acceptResponses() {
Expand Down Expand Up @@ -275,8 +292,13 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {

a := engine.state.ToArray()
digest := make([]string, len(a))
filter := engine.digFilter(context)
for i, item := range a {
digest[i] = item.(string)
dig := item.(string)
if !filter(dig) {
continue
}
digest[i] = dig
}
engine.SendDigest(digest, nonce, context)
}
Expand All @@ -288,9 +310,10 @@ func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{
}
engine.lock.Lock()

filter := engine.digFilter(context)
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) {
if engine.state.Exists(item) && filter(item) {
items2Send = append(items2Send, item)
}
}
Expand Down
51 changes: 48 additions & 3 deletions gossip/gossip/algo/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package algo

import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"fmt"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -501,6 +501,51 @@ func TestSpread(t *testing.T) {
}
}
lock.Unlock()
}

func TestFilter(t *testing.T) {
t.Parallel()
// Scenario: 3 instances, items [0-5] are found only in the first instance, the other 2 have none.
// and also the first instance only gives the 2nd instance even items, and odd items to the 3rd.
// also, instances 2 and 3 don't know each other.
// Expected outcome: inst2 has only even items, and inst3 has only odd items
peers := make(map[string]*pullTestInstance)
inst1 := newPushPullTestInstance("p1", peers)
inst2 := newPushPullTestInstance("p2", peers)
inst3 := newPushPullTestInstance("p3", peers)
defer inst1.stop()
defer inst2.stop()
defer inst3.stop()

inst1.PullEngine.digFilter = func(context interface{}) func(digestItem string) bool {
return func(digestItem string) bool {
n, _ := strconv.ParseInt(digestItem, 10, 64)
if context == "p2" {
return n%2 == 0
}
return n%2 == 1
}
}

inst1.Add("0", "1", "2", "3", "4", "5")
inst2.setNextPeerSelection([]string{"p1"})
inst3.setNextPeerSelection([]string{"p1"})

time.Sleep(time.Second * 2)

assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "0", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "1", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "2", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "3", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "4", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "5", Strcmp) == -1)

assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "0", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "1", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "2", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "3", Strcmp) != -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "4", Strcmp) == -1)
assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "5", Strcmp) != -1)

}

Expand Down
15 changes: 10 additions & 5 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,17 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.Receiv
sender := &senderMock{}
memberSvc := &membershipSvcMock{}
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
adapter := pull.PullAdapter{
Sndr: sender,
MemSvc: memberSvc,
IdExtractor: func(msg *proto.SignedGossipMessage) string {
return string(msg.GetPeerIdentity().PkiId)
},
MsgCons: func(msg *proto.SignedGossipMessage) {

pullMediator := pull.NewPullMediator(config,
sender,
memberSvc,
func(msg *proto.SignedGossipMessage) string { return string(msg.GetPeerIdentity().PkiId) },
func(msg *proto.SignedGossipMessage) {})
},
}
pullMediator := pull.NewPullMediator(config, adapter)
certStore := newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{})
Expand Down
8 changes: 7 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,13 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator {
}
gc.DeMultiplex(msg)
}
return pull.NewPullMediator(conf, gc, gc.memFilter, seqNumFromMsg, blockConsumer)
adapter := pull.PullAdapter{
Sndr: gc,
MemSvc: gc.memFilter,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
}
return pull.NewPullMediator(conf, adapter)
}

// IsMemberInChan checks whether the given member is eligible to be in the channel
Expand Down
8 changes: 7 additions & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,13 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
g.logger.Info("Learned of a new certificate:", idMsg.Cert)

}
return pull.NewPullMediator(conf, g.comm, g.disc, pkiIDFromMsg, certConsumer)
adapter := pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
}
return pull.NewPullMediator(conf, adapter)
}

func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
Expand Down
48 changes: 42 additions & 6 deletions gossip/gossip/pull/pullstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ type MembershipService interface {
GetMembership() []discovery.NetworkMember
}

// DigestFilter filters digests to be sent to a remote peer, that
// sent a hello with the following message
type DigestFilter func(helloMsg proto.ReceivedMessage) func(digestItem string) bool

// byContext converts this DigestFilter to an algo.DigestFilter
func (df DigestFilter) byContext() algo.DigestFilter {
return func(context interface{}) func(digestItem string) bool {
return func(digestItem string) bool {
return df(context.(proto.ReceivedMessage))(digestItem)
}
}
}

// PullConfig defines the configuration of the pull mediator
type PullConfig struct {
ID string
Expand All @@ -65,6 +78,16 @@ type PullConfig struct {
MsgType proto.PullMsgType
}

// PullAdapter defines methods of the pullStore to interact
// with various modules of gossip
type PullAdapter struct {
Sndr Sender
MemSvc MembershipService
IdExtractor proto.IdentifierExtractor
MsgCons proto.MsgConsumer
DigFilter DigestFilter
}

// Mediator is a component wrap a PullEngine and provides the methods
// it needs to perform pull synchronization.
// The specialization of a pull mediator to a certain type of message is
Expand Down Expand Up @@ -103,18 +126,31 @@ type pullMediatorImpl struct {
}

// NewPullMediator returns a new Mediator
func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator {
func NewPullMediator(config PullConfig, adapter PullAdapter) Mediator {
digFilter := adapter.DigFilter

acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool {
return func(_ string) bool {
return true
}
}

if digFilter == nil {
digFilter = acceptAllFilter
}

p := &pullMediatorImpl{
msgCons: msgCons,
msgCons: adapter.MsgCons,
msgType2Hook: make(map[PullMsgType][]MessageHook),
idExtractor: idExtractor,
idExtractor: adapter.IdExtractor,
config: config,
logger: util.GetLogger(util.LoggingPullModule, config.ID),
itemID2Msg: make(map[string]*proto.SignedGossipMessage),
memBvc: memSvc,
Sender: sndr,
memBvc: adapter.MemSvc,
Sender: adapter.Sndr,
}
p.engine = algo.NewPullEngine(p, config.PullInterval)

p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext())
return p
}

Expand Down
49 changes: 48 additions & 1 deletion gossip/gossip/pull/pullstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pull

import (
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -106,6 +107,10 @@ func (p *pullInstance) wrapPullMsg(msg *proto.SignedGossipMessage) proto.Receive
}

func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) *pullInstance {
return createPullInstanceWithFilters(endpoint, peer2PullInst, nil)
}

func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pullInstance, df DigestFilter) *pullInstance {
inst := &pullInstance{
items: util.NewSet(),
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -137,7 +142,14 @@ func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance)
blockConsumer := func(msg *proto.SignedGossipMessage) {
inst.items.Add(msg.GetDataMsg().Payload.SeqNum)
}
inst.mediator = NewPullMediator(conf, inst, inst, seqNumFromMsg, blockConsumer)
adapter := PullAdapter{
Sndr: inst,
MemSvc: inst,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
DigFilter: df,
}
inst.mediator = NewPullMediator(conf, adapter)
go func() {
for {
select {
Expand Down Expand Up @@ -182,6 +194,41 @@ func TestRegisterMsgHook(t *testing.T) {

}

func TestFilter(t *testing.T) {
t.Parallel()
peer2pullInst := make(map[string]*pullInstance)

eq := func(a interface{}, b interface{}) bool {
return a == b
}
df := func(msg proto.ReceivedMessage) func(string) bool {
if msg.GetGossipMessage().IsDataReq() {
req := msg.GetGossipMessage().GetDataReq()
return func(item string) bool {
return util.IndexInSlice(req.Digests, item, eq) != -1
}
}
return func(digestItem string) bool {
n, _ := strconv.ParseInt(digestItem, 10, 64)
return n%2 == 0
}
}
inst1 := createPullInstanceWithFilters("localhost:5611", peer2pullInst, df)
inst2 := createPullInstance("localhost:5612", peer2pullInst)
defer inst1.stop()
defer inst2.stop()

inst1.mediator.Add(dataMsg(0))
inst1.mediator.Add(dataMsg(1))
inst1.mediator.Add(dataMsg(2))
inst1.mediator.Add(dataMsg(3))

waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(0)) })
waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(2)) })
assert.False(t, inst2.items.Exists(uint64(1)))
assert.False(t, inst2.items.Exists(uint64(3)))
}

func TestAddAndRemove(t *testing.T) {
t.Parallel()
peer2pullInst := make(map[string]*pullInstance)
Expand Down

0 comments on commit 90b4c72

Please sign in to comment.