From 90b4c7217dbddb79c5b63a76e4e42c79ed763763 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Sun, 16 Apr 2017 01:35:04 +0300 Subject: [PATCH] [FAB-2061] Gossip inter-org confidentiality - P2 In the previous commit, I adjusted the membership routing so that membership would only be disseminated between pairs of organizations. The only thing left is to deal with identity messages, which are disseminated using the pull mechanism. This commit adds a filtering capability to the pull engine, and to its enclosing object- the pull mediator. The filter acts on a context of a message (that contains info about the peer's credentials, hence it's organization) sent from the remote peer (helloMsg and requestMsg) and return a function that declares for each item (digest or item in the response) whether it should be sent to the remote peer. In the next commit, I shall write filtering logic to make it so that peers would gossip identities with destination orgs only if they are their own identities. Change-Id: I7996bafe5ce2962279c01e59c57868057cfd1d7e Signed-off-by: Yacov Manevich --- gossip/gossip/algo/pull.go | 35 +++++++++++++++---- gossip/gossip/algo/pull_test.go | 51 ++++++++++++++++++++++++++-- gossip/gossip/certstore_test.go | 15 +++++--- gossip/gossip/channel/channel.go | 8 ++++- gossip/gossip/gossip_impl.go | 8 ++++- gossip/gossip/pull/pullstore.go | 48 ++++++++++++++++++++++---- gossip/gossip/pull/pullstore_test.go | 49 +++++++++++++++++++++++++- 7 files changed, 191 insertions(+), 23 deletions(-) diff --git a/gossip/gossip/algo/pull.go b/gossip/gossip/algo/pull.go index 6ccca81ae49..b60a4864fb1 100644 --- a/gossip/gossip/algo/pull.go +++ b/gossip/gossip/algo/pull.go @@ -69,6 +69,10 @@ func SetResponseWaitTime(time time.Duration) { viper.Set("peer.gossip.responseWaitTime", time) } +// DigestFilter filters digests to be sent to a remote peer that +// sent a hello or a request, based on its messages's context +type DigestFilter func(context interface{}) func(digestItem string) bool + // PullAdapter is needed by the PullEngine in order to // send messages to the remote PullEngine instances. // The PullEngine expects to be invoked with @@ -109,11 +113,12 @@ type PullEngine struct { lock sync.Mutex outgoingNONCES *util.Set incomingNONCES *util.Set + digFilter DigestFilter } -// NewPullEngine creates an instance of a PullEngine with a certain sleep time -// between pull initiations -func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine { +// NewPullEngineWithFilter creates an instance of a PullEngine with a certain sleep time +// between pull initiations, and uses the given filters when sending digests and responses +func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine { engine := &PullEngine{ PullAdapter: participant, stopFlag: int32(0), @@ -125,6 +130,7 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine acceptingResponses: int32(0), incomingNONCES: util.NewSet(), outgoingNONCES: util.NewSet(), + digFilter: df, } go func() { @@ -140,8 +146,19 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine return engine } +// NewPullEngine creates an instance of a PullEngine with a certain sleep time +// between pull initiations +func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine { + acceptAllFilter := func(_ interface{}) func(string) bool { + return func(_ string) bool { + return true + } + } + return NewPullEngineWithFilter(participant, sleepTime, acceptAllFilter) +} + func (engine *PullEngine) toDie() bool { - return (atomic.LoadInt32(&(engine.stopFlag)) == int32(1)) + return atomic.LoadInt32(&(engine.stopFlag)) == int32(1) } func (engine *PullEngine) acceptResponses() { @@ -275,8 +292,13 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) { a := engine.state.ToArray() digest := make([]string, len(a)) + filter := engine.digFilter(context) for i, item := range a { - digest[i] = item.(string) + dig := item.(string) + if !filter(dig) { + continue + } + digest[i] = dig } engine.SendDigest(digest, nonce, context) } @@ -288,9 +310,10 @@ func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{ } engine.lock.Lock() + filter := engine.digFilter(context) var items2Send []string for _, item := range items { - if engine.state.Exists(item) { + if engine.state.Exists(item) && filter(item) { items2Send = append(items2Send, item) } } diff --git a/gossip/gossip/algo/pull_test.go b/gossip/gossip/algo/pull_test.go index cc17a8448ef..90eb1ef527a 100644 --- a/gossip/gossip/algo/pull_test.go +++ b/gossip/gossip/algo/pull_test.go @@ -17,14 +17,14 @@ limitations under the License. package algo import ( + "fmt" + "strconv" "strings" "sync" + "sync/atomic" "testing" "time" - "fmt" - "sync/atomic" - "github.com/hyperledger/fabric/gossip/util" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -501,6 +501,51 @@ func TestSpread(t *testing.T) { } } lock.Unlock() +} + +func TestFilter(t *testing.T) { + t.Parallel() + // Scenario: 3 instances, items [0-5] are found only in the first instance, the other 2 have none. + // and also the first instance only gives the 2nd instance even items, and odd items to the 3rd. + // also, instances 2 and 3 don't know each other. + // Expected outcome: inst2 has only even items, and inst3 has only odd items + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + inst3 := newPushPullTestInstance("p3", peers) + defer inst1.stop() + defer inst2.stop() + defer inst3.stop() + + inst1.PullEngine.digFilter = func(context interface{}) func(digestItem string) bool { + return func(digestItem string) bool { + n, _ := strconv.ParseInt(digestItem, 10, 64) + if context == "p2" { + return n%2 == 0 + } + return n%2 == 1 + } + } + + inst1.Add("0", "1", "2", "3", "4", "5") + inst2.setNextPeerSelection([]string{"p1"}) + inst3.setNextPeerSelection([]string{"p1"}) + + time.Sleep(time.Second * 2) + + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "1", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "3", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "4", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "5", Strcmp) == -1) + + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "0", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "2", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "3", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "4", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst3.state.ToArray(), "5", Strcmp) != -1) } diff --git a/gossip/gossip/certstore_test.go b/gossip/gossip/certstore_test.go index 0b06b345e8f..c105cf2ab7f 100644 --- a/gossip/gossip/certstore_test.go +++ b/gossip/gossip/certstore_test.go @@ -120,12 +120,17 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.Receiv sender := &senderMock{} memberSvc := &membershipSvcMock{} memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}}) + adapter := pull.PullAdapter{ + Sndr: sender, + MemSvc: memberSvc, + IdExtractor: func(msg *proto.SignedGossipMessage) string { + return string(msg.GetPeerIdentity().PkiId) + }, + MsgCons: func(msg *proto.SignedGossipMessage) { - pullMediator := pull.NewPullMediator(config, - sender, - memberSvc, - func(msg *proto.SignedGossipMessage) string { return string(msg.GetPeerIdentity().PkiId) }, - func(msg *proto.SignedGossipMessage) {}) + }, + } + pullMediator := pull.NewPullMediator(config, adapter) certStore := newCertStore(&pullerMock{ Mediator: pullMediator, }, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{}) diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index cd37ae91829..772d95b5201 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -268,7 +268,13 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator { } gc.DeMultiplex(msg) } - return pull.NewPullMediator(conf, gc, gc.memFilter, seqNumFromMsg, blockConsumer) + adapter := pull.PullAdapter{ + Sndr: gc, + MemSvc: gc.memFilter, + IdExtractor: seqNumFromMsg, + MsgCons: blockConsumer, + } + return pull.NewPullMediator(conf, adapter) } // IsMemberInChan checks whether the given member is eligible to be in the channel diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index d58607eec7b..36c2a4727f4 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -929,7 +929,13 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator { g.logger.Info("Learned of a new certificate:", idMsg.Cert) } - return pull.NewPullMediator(conf, g.comm, g.disc, pkiIDFromMsg, certConsumer) + adapter := pull.PullAdapter{ + Sndr: g.comm, + MemSvc: g.disc, + IdExtractor: pkiIDFromMsg, + MsgCons: certConsumer, + } + return pull.NewPullMediator(conf, adapter) } func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) { diff --git a/gossip/gossip/pull/pullstore.go b/gossip/gossip/pull/pullstore.go index 612202a1c5d..07db1f793e5 100644 --- a/gossip/gossip/pull/pullstore.go +++ b/gossip/gossip/pull/pullstore.go @@ -55,6 +55,19 @@ type MembershipService interface { GetMembership() []discovery.NetworkMember } +// DigestFilter filters digests to be sent to a remote peer, that +// sent a hello with the following message +type DigestFilter func(helloMsg proto.ReceivedMessage) func(digestItem string) bool + +// byContext converts this DigestFilter to an algo.DigestFilter +func (df DigestFilter) byContext() algo.DigestFilter { + return func(context interface{}) func(digestItem string) bool { + return func(digestItem string) bool { + return df(context.(proto.ReceivedMessage))(digestItem) + } + } +} + // PullConfig defines the configuration of the pull mediator type PullConfig struct { ID string @@ -65,6 +78,16 @@ type PullConfig struct { MsgType proto.PullMsgType } +// PullAdapter defines methods of the pullStore to interact +// with various modules of gossip +type PullAdapter struct { + Sndr Sender + MemSvc MembershipService + IdExtractor proto.IdentifierExtractor + MsgCons proto.MsgConsumer + DigFilter DigestFilter +} + // Mediator is a component wrap a PullEngine and provides the methods // it needs to perform pull synchronization. // The specialization of a pull mediator to a certain type of message is @@ -103,18 +126,31 @@ type pullMediatorImpl struct { } // NewPullMediator returns a new Mediator -func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator { +func NewPullMediator(config PullConfig, adapter PullAdapter) Mediator { + digFilter := adapter.DigFilter + + acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool { + return func(_ string) bool { + return true + } + } + + if digFilter == nil { + digFilter = acceptAllFilter + } + p := &pullMediatorImpl{ - msgCons: msgCons, + msgCons: adapter.MsgCons, msgType2Hook: make(map[PullMsgType][]MessageHook), - idExtractor: idExtractor, + idExtractor: adapter.IdExtractor, config: config, logger: util.GetLogger(util.LoggingPullModule, config.ID), itemID2Msg: make(map[string]*proto.SignedGossipMessage), - memBvc: memSvc, - Sender: sndr, + memBvc: adapter.MemSvc, + Sender: adapter.Sndr, } - p.engine = algo.NewPullEngine(p, config.PullInterval) + + p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext()) return p } diff --git a/gossip/gossip/pull/pullstore_test.go b/gossip/gossip/pull/pullstore_test.go index cf382fd8e7b..e6007d366e8 100644 --- a/gossip/gossip/pull/pullstore_test.go +++ b/gossip/gossip/pull/pullstore_test.go @@ -18,6 +18,7 @@ package pull import ( "fmt" + "strconv" "sync/atomic" "testing" "time" @@ -106,6 +107,10 @@ func (p *pullInstance) wrapPullMsg(msg *proto.SignedGossipMessage) proto.Receive } func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) *pullInstance { + return createPullInstanceWithFilters(endpoint, peer2PullInst, nil) +} + +func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pullInstance, df DigestFilter) *pullInstance { inst := &pullInstance{ items: util.NewSet(), stopChan: make(chan struct{}), @@ -137,7 +142,14 @@ func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) blockConsumer := func(msg *proto.SignedGossipMessage) { inst.items.Add(msg.GetDataMsg().Payload.SeqNum) } - inst.mediator = NewPullMediator(conf, inst, inst, seqNumFromMsg, blockConsumer) + adapter := PullAdapter{ + Sndr: inst, + MemSvc: inst, + IdExtractor: seqNumFromMsg, + MsgCons: blockConsumer, + DigFilter: df, + } + inst.mediator = NewPullMediator(conf, adapter) go func() { for { select { @@ -182,6 +194,41 @@ func TestRegisterMsgHook(t *testing.T) { } +func TestFilter(t *testing.T) { + t.Parallel() + peer2pullInst := make(map[string]*pullInstance) + + eq := func(a interface{}, b interface{}) bool { + return a == b + } + df := func(msg proto.ReceivedMessage) func(string) bool { + if msg.GetGossipMessage().IsDataReq() { + req := msg.GetGossipMessage().GetDataReq() + return func(item string) bool { + return util.IndexInSlice(req.Digests, item, eq) != -1 + } + } + return func(digestItem string) bool { + n, _ := strconv.ParseInt(digestItem, 10, 64) + return n%2 == 0 + } + } + inst1 := createPullInstanceWithFilters("localhost:5611", peer2pullInst, df) + inst2 := createPullInstance("localhost:5612", peer2pullInst) + defer inst1.stop() + defer inst2.stop() + + inst1.mediator.Add(dataMsg(0)) + inst1.mediator.Add(dataMsg(1)) + inst1.mediator.Add(dataMsg(2)) + inst1.mediator.Add(dataMsg(3)) + + waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(0)) }) + waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(2)) }) + assert.False(t, inst2.items.Exists(uint64(1))) + assert.False(t, inst2.items.Exists(uint64(3))) +} + func TestAddAndRemove(t *testing.T) { t.Parallel() peer2pullInst := make(map[string]*pullInstance)