Skip to content

Commit

Permalink
[FAB-2198] Introduce envelopes to gossip message
Browse files Browse the repository at this point in the history
The current gossip implementation uses proto.Marshal
and then signs over this output both in the signer
and the verifier, which might be dangerous because protobuf
marshalling isn't guaranteed to be deterministic.

This is the first commit in the series that introduces
an envelope message to the GossipMessage, and does
some refactoring and preperation for next commit(s)
that'll change the stream definition, the signing,
and entity-storage related structures.

The refactoring basically entails moving the ReceivedMessage
from comm to proto, as part of the extensions to the gossip proto
types.

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: Ifcedaa87bcf45376a4569ed80b7e7d1b76c87883
  • Loading branch information
yacovm committed Feb 12, 2017
1 parent c341fe5 commit 5dbe29e
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 176 deletions.
19 changes: 1 addition & 18 deletions gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Comm interface {

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
Accept(common.MessageAcceptor) <-chan ReceivedMessage
Accept(common.MessageAcceptor) <-chan proto.ReceivedMessage

// PresumedDead returns a read-only channel for node endpoints that are suspected to be offline
PresumedDead() <-chan common.PKIidType
Expand All @@ -63,20 +63,3 @@ type RemotePeer struct {
func (p *RemotePeer) String() string {
return fmt.Sprintf("%s, PKIid:%v", p.Endpoint, p.PKIID)
}

// ReceivedMessage is a GossipMessage wrapper that
// enables the user to send a message to the origin from which
// the ReceivedMessage was sent from.
// It also allows to know the identity of the sender
type ReceivedMessage interface {

// Respond sends a GossipMessage to the origin from which this ReceivedMessage was sent from
Respond(msg *proto.GossipMessage)

// GetGossipMessage returns the underlying GossipMessage
GetGossipMessage() *proto.GossipMessage

// GetPKIID returns the PKI-ID of the remote peer
// that sent the message
GetPKIID() common.PKIidType
}
8 changes: 4 additions & 4 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan ReceivedMessage, 0),
subscriptions: make([]chan proto.ReceivedMessage, 0),
blackListedPKIIDs: make([]common.PKIidType, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
Expand Down Expand Up @@ -154,7 +154,7 @@ type commImpl struct {
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan ReceivedMessage
subscriptions []chan proto.ReceivedMessage
blackListedPKIIDs []common.PKIidType
}

Expand Down Expand Up @@ -290,9 +290,9 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
return err
}

func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan ReceivedMessage {
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan ReceivedMessage, 10)
specificChan := make(chan proto.ReceivedMessage, 10)

if c.isStopping() {
c.logger.Warning("Accept() called but comm module is stopping, returning empty channel")
Expand Down
16 changes: 8 additions & 8 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
return inst, err
}

func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte) []byte, pkiIDmutator func([]byte) []byte) <-chan ReceivedMessage {
func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte) []byte, pkiIDmutator func([]byte) []byte) <-chan proto.ReceivedMessage {
c := &commImpl{}
err := generateCertificates("key.pem", "cert.pem")
defer os.Remove("cert.pem")
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestBasic(t *testing.T) {
m1 := comm1.Accept(acceptAll)
m2 := comm2.Accept(acceptAll)
out := make(chan uint64, 2)
reader := func(ch <-chan ReceivedMessage) {
reader := func(ch <-chan proto.ReceivedMessage) {
m := <-ch
out <- m.GetGossipMessage().Nonce
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestBlackListPKIid(t *testing.T) {
defer comm3.Stop()
defer comm4.Stop()

reader := func(instance string, out chan uint64, in <-chan ReceivedMessage) {
reader := func(instance string, out chan uint64, in <-chan proto.ReceivedMessage) {
for {
msg := <-in
if msg == nil {
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestResponses(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

nonceIncrememter := func(msg ReceivedMessage) ReceivedMessage {
nonceIncrememter := func(msg proto.ReceivedMessage) proto.ReceivedMessage {
msg.GetGossipMessage().Nonce++
return msg
}
Expand Down Expand Up @@ -365,11 +365,11 @@ func TestAccept(t *testing.T) {
comm2, _ := newCommInstance(7612, naiveSec)

evenNONCESelector := func(m interface{}) bool {
return m.(ReceivedMessage).GetGossipMessage().Nonce%2 == 0
return m.(proto.ReceivedMessage).GetGossipMessage().Nonce%2 == 0
}

oddNONCESelector := func(m interface{}) bool {
return m.(ReceivedMessage).GetGossipMessage().Nonce%2 != 0
return m.(proto.ReceivedMessage).GetGossipMessage().Nonce%2 != 0
}

evenNONCES := comm1.Accept(evenNONCESelector)
Expand All @@ -381,7 +381,7 @@ func TestAccept(t *testing.T) {
out := make(chan uint64, defRecvBuffSize)
sem := make(chan struct{}, 0)

readIntoSlice := func(a *[]uint64, ch <-chan ReceivedMessage) {
readIntoSlice := func(a *[]uint64, ch <-chan proto.ReceivedMessage) {
for m := range ch {
*a = append(*a, m.GetGossipMessage().Nonce)
out <- m.GetGossipMessage().Nonce
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestReConnections(t *testing.T) {
comm1, _ := newCommInstance(3611, naiveSec)
comm2, _ := newCommInstance(3612, naiveSec)

reader := func(out chan uint64, in <-chan ReceivedMessage) {
reader := func(out chan uint64, in <-chan proto.ReceivedMessage) {
for {
msg := <-in
if msg == nil {
Expand Down
14 changes: 11 additions & 3 deletions gossip/comm/mock/mock_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type packetMock struct {
type channelMock struct {
accept common.MessageAcceptor

channel chan comm.ReceivedMessage
channel chan proto.ReceivedMessage
}

type commMock struct {
Expand Down Expand Up @@ -91,11 +91,19 @@ func (packet *packetMock) Respond(msg *proto.GossipMessage) {
}
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (packet *packetMock) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

// GetGossipMessage returns the underlying GossipMessage
func (packet *packetMock) GetGossipMessage() *proto.GossipMessage {
return packet.msg.(*proto.GossipMessage)
}

// GetPKIID returns the PKI-ID of the remote peer
// that sent the message
func (packet *packetMock) GetPKIID() common.PKIidType {
return nil
}
Expand Down Expand Up @@ -151,8 +159,8 @@ func (mock *commMock) Probe(peer *comm.RemotePeer) error {

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
func (mock *commMock) Accept(accept common.MessageAcceptor) <-chan comm.ReceivedMessage {
ch := make(chan comm.ReceivedMessage)
func (mock *commMock) Accept(accept common.MessageAcceptor) <-chan proto.ReceivedMessage {
ch := make(chan proto.ReceivedMessage)
mock.acceptors = append(mock.acceptors, &channelMock{accept, ch})
return ch
}
Expand Down
4 changes: 2 additions & 2 deletions gossip/comm/mock/mock_comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestMockComm(t *testing.T) {
defer comm1.Stop()

msgCh := comm1.Accept(func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil ||
message.(comm.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil
return message.(proto.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil ||
message.(proto.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil
})

comm2 := NewCommMock(second.endpoint, members)
Expand Down
6 changes: 6 additions & 0 deletions gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type ReceivedMessageImpl struct {
conn *connection
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (m *ReceivedMessageImpl) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

// Respond sends a msg to the source that sent the ReceivedMessageImpl
func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
m.conn.send(msg, func(e error) {})
Expand Down
3 changes: 1 addition & 2 deletions gossip/election/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
proto "github.com/hyperledger/fabric/protos/gossip"
Expand Down Expand Up @@ -61,7 +60,7 @@ type gossip interface {
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage)
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage)

// Gossip sends a message to other peers to the network
Gossip(msg *proto.GossipMessage)
Expand Down
3 changes: 1 addition & 2 deletions gossip/election/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
proto "github.com/hyperledger/fabric/protos/gossip"
Expand Down Expand Up @@ -216,7 +215,7 @@ func (g *peerMockGossip) Peers() []discovery.NetworkMember {
return res
}

func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) {
func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
ch := make(chan *proto.GossipMessage, 100)
g.acceptorLock.Lock()
g.acceptors = append(g.acceptors, &mockAcceptor{
Expand Down
5 changes: 2 additions & 3 deletions gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip/pull"
"github.com/hyperledger/fabric/gossip/identity"
Expand Down Expand Up @@ -63,7 +62,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a

puller.Add(certStore.createIdentityMessage())

puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ comm.ReceivedMessage) {
puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ proto.ReceivedMessage) {
for _, msg := range msgs {
pkiID := common.PKIidType(msg.GetPeerIdentity().PkiID)
cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert)
Expand All @@ -78,7 +77,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a
return certStore
}

func (cs *certStore) handleMessage(msg comm.ReceivedMessage) {
func (cs *certStore) handleMessage(msg proto.ReceivedMessage) {
if update := msg.GetGossipMessage().GetDataUpdate(); update != nil {
for _, m := range update.Data {
if !m.IsIdentityMsg() {
Expand Down
18 changes: 12 additions & 6 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type sentMsg struct {
mock.Mock
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (s *sentMsg) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

func (s *sentMsg) Respond(msg *proto.GossipMessage) {
s.Called(msg)
}
Expand Down Expand Up @@ -80,30 +86,30 @@ func (m *membershipSvcMock) GetMembership() []discovery.NetworkMember {
}

func TestCertStoreBadSignature(t *testing.T) {
badSignature := func(nonce uint64) comm.ReceivedMessage {
badSignature := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createBadlySignedUpdateMessage())
}

testCertificateUpdate(t, badSignature, false)
}

func TestCertStoreMismatchedIdentity(t *testing.T) {
mismatchedIdentity := func(nonce uint64) comm.ReceivedMessage {
mismatchedIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createMismatchedUpdateMessage())
}

testCertificateUpdate(t, mismatchedIdentity, false)
}

func TestCertStoreShouldSucceed(t *testing.T) {
totallyFineIdentity := func(nonce uint64) comm.ReceivedMessage {
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createValidUpdateMessage())
}

testCertificateUpdate(t, totallyFineIdentity, true)
}

func testCertificateUpdate(t *testing.T, updateFactory func(uint64) comm.ReceivedMessage, shouldSucceed bool) {
func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.ReceivedMessage, shouldSucceed bool) {
config := pull.PullConfig{
MsgType: proto.PullMsgType_IdentityMsg,
PeerCountToSelect: 1,
Expand Down Expand Up @@ -257,7 +263,7 @@ func createValidUpdateMessage() *proto.GossipMessage {
return m
}

func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) comm.ReceivedMessage {
func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) proto.ReceivedMessage {
update := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataUpdate{
Expand All @@ -271,7 +277,7 @@ func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) comm.Received
return &sentMsg{msg: update}
}

func createDigest(nonce uint64) comm.ReceivedMessage {
func createDigest(nonce uint64) proto.ReceivedMessage {
digest := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataDig{
Expand Down
6 changes: 3 additions & 3 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type GossipChannel interface {
IsSubscribed(member discovery.NetworkMember) bool

// HandleMessage processes a message sent by a remote peer
HandleMessage(comm.ReceivedMessage)
HandleMessage(proto.ReceivedMessage)

// AddToMsgStore adds a given GossipMessage to the message store
AddToMsgStore(msg *proto.GossipMessage)
Expand Down Expand Up @@ -354,7 +354,7 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
}

// HandleMessage processes a message sent by a remote peer
func (gc *gossipChannel) HandleMessage(msg comm.ReceivedMessage) {
func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
if !gc.verifyMsg(msg) {
return
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func (gc *gossipChannel) createStateInfoSnapshot() *proto.GossipMessage {
}
}

func (gc *gossipChannel) verifyMsg(msg comm.ReceivedMessage) bool {
func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
if msg == nil {
gc.logger.Warning("Messsage is nil")
return false
Expand Down
6 changes: 6 additions & 0 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type receivedMsg struct {
mock.Mock
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (m *receivedMsg) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

func (m *receivedMsg) GetGossipMessage() *proto.GossipMessage {
return m.msg
}
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Gossip interface {
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage)
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage)

// JoinChan makes the Gossip instance join a channel
JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
Expand Down
Loading

0 comments on commit 5dbe29e

Please sign in to comment.