From da5effe4c7f1f90fb91efa88d8318af19cc4d9ef Mon Sep 17 00:00:00 2001 From: YACOVM Date: Wed, 30 Nov 2016 22:25:36 +0200 Subject: [PATCH] FAB-1292 Gossip pull refactoring: Mediator This commit introduces: 1) A pull.Mediator, an object that offloads handling of messages and implementation of algo.PullAdapter from the rest of the gossip code, so that it can be used for any type of message we want to sync using the pull mechanism. 2) A certStore object, that holds certificates that are going to be replicated using the pull mechanism. 3) An addition of an identity message which is a certificate that's replicated among peers. Change-Id: Ia7682c3c9874ee40fa3588706239f914f1e334fb Signed-off-by: Yacov Manevich --- gossip/comm/comm_impl.go | 2 +- gossip/comm/comm_test.go | 2 +- gossip/gossip/certstore.go | 104 +++++++ gossip/gossip/gossip_impl.go | 222 ++++---------- gossip/gossip/gossip_test.go | 57 +++- gossip/gossip/pull/pullstore.go | 330 +++++++++++++++++++++ gossip/gossip/pull/pullstore_test.go | 306 +++++++++++++++++++ gossip/integration/integration.go | 58 +++- gossip/proto/{compare.go => extensions.go} | 27 ++ gossip/proto/message.pb.go | 225 ++++++++------ gossip/proto/message.proto | 18 +- gossip/state/state_test.go | 56 +++- gossip/util/misc.go | 2 +- 13 files changed, 1141 insertions(+), 268 deletions(-) create mode 100644 gossip/gossip/certstore.go create mode 100644 gossip/gossip/pull/pullstore.go create mode 100644 gossip/gossip/pull/pullstore_test.go rename gossip/proto/{compare.go => extensions.go} (80%) diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index 32465e05448..4bff20f9654 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -254,7 +254,7 @@ func (c *commImpl) isStopping() bool { func (c *commImpl) Probe(peer *RemotePeer) error { if c.isStopping() { - return fmt.Errorf("Stopping!") + return fmt.Errorf("Stopping") } c.logger.Debug("Entering, endpoint:", peer.Endpoint, "PKIID:", peer.PKIID) var err error diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index 96cdc0e9776..7269b156c93 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -173,7 +173,7 @@ func TestBasic(t *testing.T) { m2 := comm2.Accept(acceptAll) out := make(chan uint64, 2) reader := func(ch <-chan ReceivedMessage) { - m := <- ch + m := <-ch out <- m.GetGossipMessage().Nonce } go reader(m1) diff --git a/gossip/gossip/certstore.go b/gossip/gossip/certstore.go new file mode 100644 index 00000000000..94e805ef75e --- /dev/null +++ b/gossip/gossip/certstore.go @@ -0,0 +1,104 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "sync" + + prot "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/gossip/pull" + "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +// certStore supports pull dissemination of identity messages +type certStore struct { + sync.RWMutex + selfIdentity api.PeerIdentityType + idMapper identity.Mapper + pull pull.Mediator + logger *util.Logger +} + +func newCertStore(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, pullMed pull.Mediator) *certStore { + certStore := &certStore{ + idMapper: identity.NewIdentityMapper(mcs), + selfIdentity: selfIdentity, + pull: pullMed, + } + + selfPKIID := certStore.idMapper.GetPKIidOfCert(selfIdentity) + + certStore.logger = util.GetLogger("certStore", string(selfPKIID)) + + if err := certStore.idMapper.Put(selfPKIID, selfIdentity); err != nil { + certStore.logger.Panic("Failed associating self PKIID to cert:", err) + } + + pullMed.Add(certStore.createIdentityMessage()) + + pullMed.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ comm.ReceivedMessage) { + for _, msg := range msgs { + pkiID := common.PKIidType(msg.GetPeerIdentity().PkiID) + cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert) + if err := certStore.idMapper.Put(pkiID, cert); err != nil { + certStore.logger.Warning("Failed adding identity", cert, ", reason:", err) + } + } + }) + + return certStore +} + +func (cs *certStore) createIdentityMessage() *proto.GossipMessage { + identity := &proto.PeerIdentity{ + Cert: cs.selfIdentity, + Metadata: nil, + PkiID: cs.idMapper.GetPKIidOfCert(cs.selfIdentity), + Sig: nil, + } + + b, err := prot.Marshal(identity) + if err != nil { + cs.logger.Warning("Failed marshalling identity message:", err) + return nil + } + + sig, err := cs.idMapper.Sign(b) + if err != nil { + cs.logger.Warning("Failed signing identity message:", err) + return nil + } + identity.Sig = sig + + return &proto.GossipMessage{ + Channel: nil, + Nonce: 0, + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_PeerIdentity{ + PeerIdentity: identity, + }, + } +} + +func (cs *certStore) stop() { + cs.pull.Stop() +} diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 98d33b361ad..76f09b091ce 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -23,10 +23,11 @@ import ( "sync/atomic" "time" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" - "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/gossip/pull" "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/util" "github.com/op/go-logging" @@ -38,24 +39,25 @@ const ( ) type gossipServiceImpl struct { + certStore *certStore presumedDead chan common.PKIidType disc discovery.Discovery comm comm.Comm *comm.ChannelDeMultiplexer - logger *util.Logger - stopSignal *sync.WaitGroup - conf *Config - toDieChan chan struct{} - stopFlag int32 - msgStore messageStore - emitter batchingEmitter - pushPull *algo.PullEngine - goRoutines []uint64 - discAdapter *discoveryAdapter + logger *util.Logger + stopSignal *sync.WaitGroup + conf *Config + toDieChan chan struct{} + stopFlag int32 + msgStore messageStore + blocksPuller pull.Mediator + emitter batchingEmitter + goRoutines []uint64 + discAdapter *discoveryAdapter } // NewGossipService creates a new gossip instance -func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Gossip { +func NewGossipService(conf *Config, c comm.Comm, mcs api.MessageCryptoService, crypto discovery.CryptoService, selfID api.PeerIdentityType) Gossip { g := &gossipServiceImpl{ presumedDead: make(chan common.PKIidType, presumedDeadChanSize), disc: nil, @@ -79,14 +81,14 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{}, }, g.discAdapter, crypto) - g.pushPull = algo.NewPullEngine(g, conf.PullInterval) - g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { - if dataMsg, isDataMsg := m.(*proto.DataMessage); isDataMsg { - g.pushPull.Remove(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) - } + g.blocksPuller.Remove(m.(*proto.GossipMessage)) }) + g.blocksPuller = g.createPull() + + g.certStore = newCertStore(mcs, selfID, g.createPull()) + g.logger.SetLevel(logging.WARNING) go g.start() @@ -94,6 +96,39 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) return g } +func (g *gossipServiceImpl) createPull() pull.Mediator { + gConf := g.conf + conf := pull.PullConfig{ + MsgType: proto.MsgType_BlockMessage, + Channel: []byte(""), + Id: gConf.SelfEndpoint, + PeerCountToSelect: gConf.PullPeerNum, + PullInterval: gConf.PullInterval, + Tag: proto.GossipMessage_EMPTY, + } + seqNumFromMsg := func(msg *proto.GossipMessage) string { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + return "" + } + return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) + } + blockConsumer := func(msg *proto.GossipMessage) { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + return + } + added := g.msgStore.add(msg) + // if we can't add the message to the msgStore, + // no point in disseminating it to others... + if !added { + return + } + g.DeMultiplex(msg) + } + return pull.NewPullMediator(conf, g.comm, g.disc, seqNumFromMsg, blockConsumer) +} + func (g *gossipServiceImpl) toDie() bool { return atomic.LoadInt32(&g.stopFlag) == int32(1) } @@ -163,104 +198,14 @@ func (g *gossipServiceImpl) acceptMessages(incMsgs <-chan comm.ReceivedMessage) } } -func (g *gossipServiceImpl) SelectPeers() []string { - if g.disc == nil { - return []string{} - } - peers := selectEndpoints(g.conf.PullPeerNum, g.disc.GetMembership()) - g.logger.Debug("Selected", len(peers), "peers") - return peers -} - -func (g *gossipServiceImpl) Hello(dest string, nonce uint64) { - helloMsg := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_Hello{ - Hello: &proto.GossipHello{ - Nonce: nonce, - Metadata: nil, - MsgType: proto.MsgType_BlockMessage, - }, - }, - } - - g.logger.Debug("Sending hello to", dest) - g.comm.Send(helloMsg, g.peersWithEndpoints(dest)...) - -} - -func (g *gossipServiceImpl) SendDigest(digest []string, nonce uint64, context interface{}) { - digMsg := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataDig{ - DataDig: &proto.DataDigest{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Digests: digest, - }, - }, - } - g.logger.Debug("Sending digest", digMsg.GetDataDig().Digests) - context.(comm.ReceivedMessage).Respond(digMsg) -} - -func (g *gossipServiceImpl) SendReq(dest string, items []string, nonce uint64) { - req := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataReq{ - DataReq: &proto.DataRequest{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Digests: items, - }, - }, - } - g.logger.Debug("Sending", req, "to", dest) - g.comm.Send(req, g.peersWithEndpoints(dest)...) -} - -func (g *gossipServiceImpl) SendRes(requestedItems []string, context interface{}, nonce uint64) { - itemMap := make(map[string]*proto.GossipMessage) - for _, msg := range g.msgStore.get() { - if dataMsg := msg.(*proto.GossipMessage).GetDataMsg(); dataMsg != nil { - itemMap[fmt.Sprintf("%d", dataMsg.Payload.SeqNum)] = msg.(*proto.GossipMessage) - } - } - - dataMsgs := []*proto.GossipMessage{} - - for _, item := range requestedItems { - if dataMsg, exists := itemMap[item]; exists { - dataMsgs = append(dataMsgs, dataMsg) - } - } - - returnedUpdate := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataUpdate{ - DataUpdate: &proto.DataUpdate{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Data: dataMsgs, - }, - }, - } - - g.logger.Debug("Sending response", returnedUpdate.GetDataUpdate().Data) - context.(comm.ReceivedMessage).Respond(returnedUpdate) -} - func (g *gossipServiceImpl) handleMessage(msg comm.ReceivedMessage) { if g.toDie() { return } g.logger.Info("Entering,", msg) defer g.logger.Info("Exiting") - if msg == nil { + + if msg == nil || msg.GetGossipMessage() == nil { return } @@ -280,15 +225,14 @@ func (g *gossipServiceImpl) handleMessage(msg comm.ReceivedMessage) { if dataMsg := msg.GetGossipMessage().GetDataMsg(); dataMsg != nil { g.DeMultiplex(msg.GetGossipMessage()) - g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) + g.blocksPuller.Add(msg.GetGossipMessage()) } return } - if msg.GetGossipMessage().GetDataReq() != nil || msg.GetGossipMessage().GetDataUpdate() != nil || - msg.GetGossipMessage().GetHello() != nil || msg.GetGossipMessage().GetDataDig() != nil { - g.handlePushPullMsg(msg) + if msg.GetGossipMessage().IsPullMsg() { + g.blocksPuller.HandleMessage(msg) } } @@ -299,36 +243,6 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg comm.ReceivedMessage) { g.discAdapter.incChan <- msg.GetGossipMessage() } -func (g *gossipServiceImpl) handlePushPullMsg(msg comm.ReceivedMessage) { - g.logger.Debug(msg) - if helloMsg := msg.GetGossipMessage().GetHello(); helloMsg != nil { - if helloMsg.MsgType != proto.MsgType_BlockMessage { - return - } - g.pushPull.OnHello(helloMsg.Nonce, msg) - } - if digest := msg.GetGossipMessage().GetDataDig(); digest != nil { - g.pushPull.OnDigest(digest.Digests, digest.Nonce, msg) - } - if req := msg.GetGossipMessage().GetDataReq(); req != nil { - g.pushPull.OnReq(req.Digests, req.Nonce, msg) - } - if res := msg.GetGossipMessage().GetDataUpdate(); res != nil { - items := make([]string, len(res.Data)) - for i, data := range res.Data { - added := g.msgStore.add(data) - // if we can't add the message to the msgStore, - // no point in disseminating it to others... - if !added { - continue - } - g.DeMultiplex(data) - items[i] = fmt.Sprintf("%d", data.GetDataMsg().Payload.SeqNum) - } - g.pushPull.OnRes(items, res.Nonce) - } -} - func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { msgs2Gossip := make([]*proto.GossipMessage, len(a)) for i, e := range a { @@ -342,30 +256,17 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { g.logger.Error("Discovery has not been initialized yet, aborting!") return } - peers2Send := selectEndpoints(g.conf.PropagatePeerNum, g.disc.GetMembership()) + peers2Send := pull.SelectEndpoints(g.conf.PropagatePeerNum, g.disc.GetMembership()) for _, msg := range msgs { - g.comm.Send(msg, g.peersWithEndpoints(peers2Send...)...) - } -} - -func selectEndpoints(k int, peerPool []discovery.NetworkMember) []string { - if len(peerPool) < k { - k = len(peerPool) - } - - indices := util.GetRandomIndices(k, len(peerPool)-1) - endpoints := make([]string, len(indices)) - for i, j := range indices { - endpoints[i] = peerPool[j].Endpoint + g.comm.Send(msg, peers2Send...) } - return endpoints } func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { g.logger.Info(msg) if dataMsg := msg.GetDataMsg(); dataMsg != nil { g.msgStore.add(msg) - g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) + g.blocksPuller.Add(msg) } g.emitter.Add(msg) } @@ -392,7 +293,8 @@ func (g *gossipServiceImpl) Stop() { }() g.discAdapter.close() g.disc.Stop() - g.pushPull.Stop() + g.blocksPuller.Stop() + g.certStore.stop() g.toDieChan <- struct{}{} g.emitter.Stop() g.ChannelDeMultiplexer.Close() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index a5ec4c780a2..49f907e92d0 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -28,7 +28,9 @@ import ( "strconv" "strings" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/gossip/algo" "github.com/hyperledger/fabric/gossip/proto" @@ -57,6 +59,24 @@ func acceptData(m interface{}) bool { return false } +type naiveSecProvider struct { +} + +func (*naiveSecProvider) IsEnabled() bool { + return true +} + +func (*naiveSecProvider) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +func (*naiveSecProvider) Verify(vkID, signature, message []byte) error { + if bytes.Equal(signature, message) { + return nil + } + return fmt.Errorf("Failed verifying") +} + type naiveCryptoService struct { } @@ -72,15 +92,36 @@ func (*naiveCryptoService) IsEnabled() bool { return true } +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { return msg, nil } -func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { - if bytes.Equal(signature, message) { - return nil +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } - return fmt.Errorf("Failed verifying") + return nil } func bootPeers(ids ...int) []string { @@ -106,11 +147,11 @@ func newGossipInstance(id int, maxMsgCount int, boot ...int) Gossip { PullPeerNum: 5, SelfEndpoint: fmt.Sprintf("localhost:%d", port), } - comm, err := comm.NewCommInstanceWithServer(port, &naiveCryptoService{}, []byte(conf.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(port, &naiveSecProvider{}, []byte(conf.SelfEndpoint)) if err != nil { panic(err) } - return NewGossipService(conf, comm, &naiveCryptoService{}) + return NewGossipService(conf, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.ID)) } func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip { @@ -128,11 +169,11 @@ func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip PullPeerNum: 20, SelfEndpoint: fmt.Sprintf("localhost:%d", port), } - comm, err := comm.NewCommInstanceWithServer(port, &naiveCryptoService{}, []byte(conf.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(port, &naiveSecProvider{}, []byte(conf.SelfEndpoint)) if err != nil { panic(err) } - return NewGossipService(conf, comm, &naiveCryptoService{}) + return NewGossipService(conf, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.ID)) } func TestPull(t *testing.T) { diff --git a/gossip/gossip/pull/pullstore.go b/gossip/gossip/pull/pullstore.go new file mode 100644 index 00000000000..042b9e918fc --- /dev/null +++ b/gossip/gossip/pull/pullstore.go @@ -0,0 +1,330 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pull + +import ( + "sync" + "time" + + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +const ( + HelloMsgType PullMsgType = iota + DigestMsgType + RequestMsgType + ResponseMsgType +) + +// PullMsgType defines the type of a message that is sent to the PullStore +type PullMsgType int + +// MessageHook defines a function that will run after a certain pull message is received +type MessageHook func(itemIds []string, items []*proto.GossipMessage, msg comm.ReceivedMessage) + +type Sender interface { + // Send sends a message to a list of remote peers + Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) +} + +// MembershipService obtains membership information of alive peers +type MembershipService interface { + // GetMembership returns the membership of + GetMembership() []discovery.NetworkMember +} + +// PullConfig defines the configuration of the pull mediator +type PullConfig struct { + Id string + PullInterval time.Duration // Duration between pull invocations + PeerCountToSelect int // Number of peers to initiate pull with + Tag proto.GossipMessage_Tag + Channel common.ChainID + MsgType proto.MsgType +} + +// Mediator is a component wrap a PullEngine and provides the methods +// it needs to perform pull synchronization.. +// The specialization of a pull mediator to a certain type of message is +// done by the configuration, a IdentifierExtractor, IdentifierExtractor +// given at construction, and also hooks that can be registered for each +// type of pullMsgType (hello, digest, req, res). +type Mediator interface { + // Stop stop the Mediator + Stop() + + // RegisterMsgHook registers a message hook to a specific type of pull message + RegisterMsgHook(PullMsgType, MessageHook) + + // Add adds a GossipMessage to the Mediator + Add(*proto.GossipMessage) + + // Remove removes a GossipMessage from the Mediator + Remove(*proto.GossipMessage) + + // HandleMessage handles a message from some remote peer + HandleMessage(msg comm.ReceivedMessage) +} + +// pullStoreImpl is an implementation of PullStore +type pullMediatorImpl struct { + msgType2Hook map[PullMsgType][]MessageHook + idExtractor proto.IdentifierExtractor + msgCons proto.MsgConsumer + config PullConfig + logger *util.Logger + sync.RWMutex + itemId2msg map[string]*proto.GossipMessage + Sender + memBvc MembershipService + engine *algo.PullEngine +} + +func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator { + p := &pullMediatorImpl{ + msgCons: msgCons, + msgType2Hook: make(map[PullMsgType][]MessageHook), + idExtractor: idExtractor, + config: config, + logger: util.GetLogger("Pull", config.Id), + itemId2msg: make(map[string]*proto.GossipMessage), + memBvc: memSvc, + Sender: sndr, + } + p.engine = algo.NewPullEngine(p, config.PullInterval) + return p +} + +func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) { + if m.GetGossipMessage() == nil || !m.GetGossipMessage().IsPullMsg() { + return + } + msg := m.GetGossipMessage() + p.logger.Debug(msg) + + itemIds := []string{} + items := []*proto.GossipMessage{} + var pullMsgType PullMsgType + + if helloMsg := msg.GetHello(); helloMsg != nil { + if helloMsg.MsgType != p.config.MsgType { + return + } + pullMsgType = HelloMsgType + p.engine.OnHello(helloMsg.Nonce, m) + } + if digest := msg.GetDataDig(); digest != nil { + if digest.MsgType != p.config.MsgType { + return + } + itemIds = digest.Digests + pullMsgType = DigestMsgType + p.engine.OnDigest(digest.Digests, digest.Nonce, m) + } + if req := msg.GetDataReq(); req != nil { + if req.MsgType != p.config.MsgType { + return + } + itemIds = req.Digests + pullMsgType = RequestMsgType + p.engine.OnReq(req.Digests, req.Nonce, m) + } + if res := msg.GetDataUpdate(); res != nil { + if res.MsgType != p.config.MsgType { + return + } + itemIds = make([]string, len(res.Data)) + items = make([]*proto.GossipMessage, len(res.Data)) + pullMsgType = ResponseMsgType + for i, pulledMsg := range res.Data { + p.msgCons(pulledMsg) + itemIds[i] = p.idExtractor(pulledMsg) + items[i] = pulledMsg + } + p.engine.OnRes(itemIds, res.Nonce) + } + + // Invoke hooks for relevant message type + for _, h := range p.hooksByMsgType(pullMsgType) { + h(itemIds, items, m) + } +} + +func (p *pullMediatorImpl) Stop() { + p.engine.Stop() +} + +// RegisterMsgHook registers a message hook to a specific type of pull message +func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType PullMsgType, hook MessageHook) { + p.Lock() + defer p.Unlock() + p.msgType2Hook[pullMsgType] = append(p.msgType2Hook[pullMsgType], hook) + +} + +// Add adds a GossipMessage to the store +func (p *pullMediatorImpl) Add(msg *proto.GossipMessage) { + p.Lock() + defer p.Unlock() + itemId := p.idExtractor(msg) + p.itemId2msg[itemId] = msg + p.engine.Add(itemId) +} + +// Remove removes a GossipMessage from the store +func (p *pullMediatorImpl) Remove(msg *proto.GossipMessage) { + p.Lock() + defer p.Unlock() + itemId := p.idExtractor(msg) + delete(p.itemId2msg, itemId) + p.engine.Remove(itemId) +} + +// SelectPeers returns a slice of peers which the engine will initiate the protocol with +func (p *pullMediatorImpl) SelectPeers() []string { + remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.memBvc.GetMembership()) + endpoints := make([]string, len(remotePeers)) + for i, peer := range remotePeers { + endpoints[i] = peer.Endpoint + } + return endpoints +} + +// Hello sends a hello message to initiate the protocol +// and returns an NONCE that is expected to be returned +// in the digest message. +func (p *pullMediatorImpl) Hello(dest string, nonce uint64) { + helloMsg := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Content: &proto.GossipMessage_Hello{ + Hello: &proto.GossipHello{ + Nonce: nonce, + Metadata: nil, + MsgType: p.config.MsgType, + }, + }, + } + + p.logger.Debug("Sending hello to", dest) + p.Send(helloMsg, p.peersWithEndpoints(dest)...) +} + +// SendDigest sends a digest to a remote PullEngine. +// The context parameter specifies the remote engine to send to. +func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) { + digMsg := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataDig{ + DataDig: &proto.DataDigest{ + MsgType: p.config.MsgType, + Nonce: nonce, + Digests: digest, + }, + }, + } + p.logger.Debug("Sending digest", digMsg.GetDataDig().Digests) + context.(comm.ReceivedMessage).Respond(digMsg) +} + +// SendReq sends an array of items to a certain remote PullEngine identified +// by a string +func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) { + req := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataReq{ + DataReq: &proto.DataRequest{ + MsgType: p.config.MsgType, + Nonce: nonce, + Digests: items, + }, + }, + } + p.logger.Debug("Sending", req, "to", dest) + p.Send(req, p.peersWithEndpoints(dest)...) +} + +// SendRes sends an array of items to a remote PullEngine identified by a context. +func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) { + items2return := []*proto.GossipMessage{} + p.RLock() + defer p.RUnlock() + for _, item := range items { + if msg, exists := p.itemId2msg[item]; exists { + items2return = append(items2return, msg) + } + } + + returnedUpdate := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataUpdate{ + DataUpdate: &proto.DataUpdate{ + MsgType: p.config.MsgType, + Nonce: nonce, + Data: items2return, + }, + }, + } + p.logger.Debug("Sending", returnedUpdate, "to") + context.(comm.ReceivedMessage).Respond(returnedUpdate) +} + +func (p *pullMediatorImpl) peersWithEndpoints(endpoints ...string) []*comm.RemotePeer { + peers := []*comm.RemotePeer{} + for _, member := range p.memBvc.GetMembership() { + for _, endpoint := range endpoints { + if member.Endpoint == endpoint { + peers = append(peers, &comm.RemotePeer{Endpoint: member.Endpoint, PKIID: member.PKIid}) + } + } + } + return peers +} + +func (p *pullMediatorImpl) hooksByMsgType(msgType PullMsgType) []MessageHook { + p.RLock() + defer p.RUnlock() + returnedHooks := []MessageHook{} + for _, h := range p.msgType2Hook[msgType] { + returnedHooks = append(returnedHooks, h) + } + return returnedHooks +} + +func SelectEndpoints(k int, peerPool []discovery.NetworkMember) []*comm.RemotePeer { + if len(peerPool) < k { + k = len(peerPool) + } + + indices := util.GetRandomIndices(k, len(peerPool)-1) + endpoints := make([]*comm.RemotePeer, len(indices)) + for i, j := range indices { + endpoints[i] = &comm.RemotePeer{Endpoint: peerPool[j].Endpoint, PKIID: peerPool[j].PKIid} + } + return endpoints +} diff --git a/gossip/gossip/pull/pullstore_test.go b/gossip/gossip/pull/pullstore_test.go new file mode 100644 index 00000000000..1cc650e5f9e --- /dev/null +++ b/gossip/gossip/pull/pullstore_test.go @@ -0,0 +1,306 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pull + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" + "github.com/stretchr/testify/assert" +) + +var pullInterval time.Duration +var timeoutInterval = 10 * time.Second + +func init() { + pullInterval = time.Duration(500) * time.Millisecond + algo.SetDigestWaitTime(pullInterval / 5) + algo.SetRequestWaitTime(pullInterval) + algo.SetResponseWaitTime(pullInterval) +} + +type pullMsg struct { + respondChan chan *pullMsg + msg *proto.GossipMessage +} + +func (pm *pullMsg) Respond(msg *proto.GossipMessage) { + pm.respondChan <- &pullMsg{ + msg: msg, + respondChan: pm.respondChan, + } +} + +func (pm *pullMsg) GetGossipMessage() *proto.GossipMessage { + return pm.msg +} + +type pullInstance struct { + self discovery.NetworkMember + mediator Mediator + items *util.Set + msgChan chan *pullMsg + peer2PullInst map[string]*pullInstance + stopChan chan struct{} +} + +func (p *pullInstance) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) { + for _, peer := range peers { + m := &pullMsg{ + respondChan: p.msgChan, + msg: msg, + } + p.peer2PullInst[peer.Endpoint].msgChan <- m + } +} + +func (p *pullInstance) GetMembership() []discovery.NetworkMember { + members := []discovery.NetworkMember{} + for _, peer := range p.peer2PullInst { + members = append(members, peer.self) + } + return members +} + +func (p *pullInstance) stop() { + p.mediator.Stop() + p.stopChan <- struct{}{} +} + +func (p *pullInstance) wrapPullMsg(msg *proto.GossipMessage) comm.ReceivedMessage { + return &pullMsg{ + msg: msg, + respondChan: p.msgChan, + } +} + +func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) *pullInstance { + inst := &pullInstance{ + items: util.NewSet(), + stopChan: make(chan struct{}), + peer2PullInst: peer2PullInst, + self: discovery.NetworkMember{Endpoint: endpoint, Metadata: []byte{}, PKIid: []byte(endpoint)}, + msgChan: make(chan *pullMsg, 10), + } + + peer2PullInst[endpoint] = inst + + conf := PullConfig{ + MsgType: proto.MsgType_BlockMessage, + Channel: []byte(""), + Id: endpoint, + PeerCountToSelect: 3, + PullInterval: pullInterval, + Tag: proto.GossipMessage_EMPTY, + } + seqNumFromMsg := func(msg *proto.GossipMessage) string { + dataMsg := msg.GetDataMsg() + if dataMsg == nil { + return "" + } + if dataMsg.Payload == nil { + return "" + } + return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) + } + blockConsumer := func(msg *proto.GossipMessage) { + inst.items.Add(msg.GetDataMsg().Payload.SeqNum) + } + inst.mediator = NewPullMediator(conf, inst, inst, seqNumFromMsg, blockConsumer) + go func() { + for { + select { + case <-inst.stopChan: + return + case msg := <-inst.msgChan: + inst.mediator.HandleMessage(msg) + } + } + }() + return inst +} + +func TestCreateAndStop(t *testing.T) { + t.Parallel() + pullInst := createPullInstance("localhost:2000", make(map[string]*pullInstance)) + pullInst.stop() +} + +func TestRegisterMsgHook(t *testing.T) { + t.Parallel() + peer2pullInst := make(map[string]*pullInstance) + inst1 := createPullInstance("localhost:5611", peer2pullInst) + inst2 := createPullInstance("localhost:5612", peer2pullInst) + defer inst1.stop() + defer inst2.stop() + + receivedMsgTypes := util.NewSet() + + for _, msgType := range []PullMsgType{HelloMsgType, DigestMsgType, RequestMsgType, ResponseMsgType} { + mType := msgType + inst1.mediator.RegisterMsgHook(mType, func(_ []string, items []*proto.GossipMessage, _ comm.ReceivedMessage) { + receivedMsgTypes.Add(mType) + }) + } + + inst1.mediator.Add(dataMsg(1)) + inst2.mediator.Add(dataMsg(2)) + + // Ensure all message types are received + waitUntilOrFail(t, func() bool { return len(receivedMsgTypes.ToArray()) == 4 }) + +} + +func TestAddAndRemove(t *testing.T) { + t.Parallel() + peer2pullInst := make(map[string]*pullInstance) + inst1 := createPullInstance("localhost:5611", peer2pullInst) + inst2 := createPullInstance("localhost:5612", peer2pullInst) + defer inst1.stop() + defer inst2.stop() + + msgCount := 3 + + go func() { + for i := 0; i < msgCount; i++ { + time.Sleep(pullInterval) + inst1.mediator.Add(dataMsg(i)) + } + }() + + // Ensure instance 2 got all messages + waitUntilOrFail(t, func() bool { return len(inst2.items.ToArray()) == msgCount }) + + // Remove message 0 from both instances + inst2.mediator.Remove(dataMsg(0)) + inst1.mediator.Remove(dataMsg(0)) + inst2.items.Remove(uint64(0)) + + // Add a message to inst1 + inst1.mediator.Add(dataMsg(10)) + + // Ensure instance 2 got new message + waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(10)) }) + + // Ensure instance 2 doesn't have message 0 + assert.False(t, inst2.items.Exists(uint64(0)), "Instance 2 has message 0 but shouldn't have") +} + +func TestHandleMessage(t *testing.T) { + t.Parallel() + inst1 := createPullInstance("localhost:5611", make(map[string]*pullInstance)) + inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance)) + defer inst1.stop() + defer inst2.stop() + + inst2.mediator.Add(dataMsg(0)) + inst2.mediator.Add(dataMsg(1)) + inst2.mediator.Add(dataMsg(2)) + + inst1ReceivedDigest := int32(0) + inst1ReceivedResponse := int32(0) + + inst1.mediator.RegisterMsgHook(DigestMsgType, func(itemIds []string, _ []*proto.GossipMessage, _ comm.ReceivedMessage) { + atomic.StoreInt32(&inst1ReceivedDigest, int32(1)) + assert.True(t, len(itemIds) == 3) + }) + + inst1.mediator.RegisterMsgHook(ResponseMsgType, func(_ []string, items []*proto.GossipMessage, _ comm.ReceivedMessage) { + atomic.StoreInt32(&inst1ReceivedResponse, int32(1)) + assert.True(t, len(items) == 3) + }) + + // inst1 sends hello to inst2 + inst2.mediator.HandleMessage(inst1.wrapPullMsg(helloMsg())) + + // inst2 is expected to send digest to inst1 + waitUntilOrFail(t, func() bool { return atomic.LoadInt32(&inst1ReceivedDigest) == int32(1) }) + + // inst1 sends request to inst2 + inst2.mediator.HandleMessage(inst1.wrapPullMsg(reqMsg("0", "1", "2"))) + + // inst2 is expected to send response to inst1 + waitUntilOrFail(t, func() bool { return atomic.LoadInt32(&inst1ReceivedResponse) == int32(1) }) + assert.True(t, inst1.items.Exists(uint64(0))) + assert.True(t, inst1.items.Exists(uint64(1))) + assert.True(t, inst1.items.Exists(uint64(2))) +} + +func waitUntilOrFail(t *testing.T, pred func() bool) { + start := time.Now() + limit := start.UnixNano() + timeoutInterval.Nanoseconds() + for time.Now().UnixNano() < limit { + if pred() { + return + } + time.Sleep(timeoutInterval / 60) + } + util.PrintStackTrace() + assert.Fail(t, "Timeout expired!") +} + +func dataMsg(seqNum int) *proto.GossipMessage { + return &proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_DataMsg{ + DataMsg: &proto.DataMessage{ + Payload: &proto.Payload{ + Data: []byte{}, + Hash: "", + SeqNum: uint64(seqNum), + }, + }, + }, + } +} + +func helloMsg() *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: []byte(""), + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_Hello{ + Hello: &proto.GossipHello{ + Nonce: 0, + Metadata: nil, + MsgType: proto.MsgType_BlockMessage, + }, + }, + } +} + +func reqMsg(digest ...string) *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: []byte(""), + Tag: proto.GossipMessage_EMPTY, + Nonce: 0, + Content: &proto.GossipMessage_DataReq{ + DataReq: &proto.DataRequest{ + MsgType: proto.MsgType_BlockMessage, + Nonce: 0, + Digests: digest, + }, + }, + } +} diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index 0f5b91b212e..c771f63c08c 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -27,6 +27,8 @@ import ( "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" "google.golang.org/grpc" + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/common" ) // This file is used to bootstrap a gossip instance for integration/demo purposes ONLY @@ -63,7 +65,7 @@ func newComm(selfEndpoint string, s *grpc.Server, dialOpts ...grpc.DialOption) c func NewGossipComponent(endpoint string, s *grpc.Server, bootPeers ...string) (gossip.Gossip, comm.Comm) { conf := newConfig(endpoint, bootPeers...) comm := newComm(endpoint, s, grpc.WithInsecure()) - return gossip.NewGossipService(conf, comm, NewGossipCryptoService()), comm + return gossip.NewGossipService(conf, comm, &naiveCryptoService{}, NewGossipCryptoService(), api.PeerIdentityType(conf.ID)), comm } // GossipCryptoService is an interface that conforms to both @@ -102,6 +104,13 @@ func (cs *naiveCryptoServiceImpl) ValidateAliveMsg(*proto.AliveMessage) bool { return true } +func (cs *naiveCryptoServiceImpl) Verify(vkID, signature, message []byte) error { + if ! bytes.Equal(signature, message) { + return fmt.Errorf("Wrong signature") + } + return nil +} + // SignMessage signs an AliveMessage and updates its signature field func (cs *naiveCryptoServiceImpl) SignMessage(msg *proto.AliveMessage) *proto.AliveMessage { return msg @@ -117,10 +126,49 @@ func (cs *naiveCryptoServiceImpl) Sign(msg []byte) ([]byte, error) { return msg, nil } -// Verify verifies a signature on a message that came from a peer with a certain vkID -func (cs *naiveCryptoServiceImpl) Verify(vkID, signature, message []byte) error { - if !bytes.Equal(signature, message) { - return fmt.Errorf("Invalid signature!") +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } return nil } + +type naiveCryptoService struct { +} + +func (*naiveCryptoService) ValidateAliveMsg(am *proto.AliveMessage) bool { + return true +} + +func (*naiveCryptoService) SignMessage(am *proto.AliveMessage) *proto.AliveMessage { + return am +} + +func (*naiveCryptoService) IsEnabled() bool { + return true +} + +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. +func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { + return msg, nil +} \ No newline at end of file diff --git a/gossip/proto/compare.go b/gossip/proto/extensions.go similarity index 80% rename from gossip/proto/compare.go rename to gossip/proto/extensions.go index b4024912002..a6ff5400e16 100644 --- a/gossip/proto/compare.go +++ b/gossip/proto/extensions.go @@ -53,6 +53,10 @@ func (mc *msgComparator) invalidationPolicy(this interface{}, that interface{}) return mc.stateInvalidationPolicy(thisMsg.GetStateInfo(), thatMsg.GetStateInfo()) } + if thisMsg.IsIdentityMsg() && thatMsg.IsIdentityMsg() { + return mc.identityInvalidationPolicy(thisMsg.GetPeerIdentity(), thatMsg.GetPeerIdentity()) + } + return common.MessageNoAction } @@ -63,6 +67,14 @@ func (mc *msgComparator) stateInvalidationPolicy(thisStateMsg *StateInfo, thatSt return compareTimestamps(thisStateMsg.Timestamp, thatStateMsg.Timestamp) } +func (mc *msgComparator) identityInvalidationPolicy(thisIdentityMsg *PeerIdentity, thatIdentityMsg *PeerIdentity) common.InvalidationResult { + if bytes.Equal(thisIdentityMsg.PkiID, thatIdentityMsg.PkiID) { + return common.MessageInvalidated + } + + return common.MessageNoAction +} + func (mc *msgComparator) dataInvalidationPolicy(thisDataMsg *DataMessage, thatDataMsg *DataMessage) common.InvalidationResult { if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum { if thisDataMsg.Payload.Hash == thatDataMsg.Payload.Hash { @@ -115,3 +127,18 @@ func (m *GossipMessage) IsDataMsg() bool { func (m *GossipMessage) IsStateInfoMsg() bool { return m.GetStateInfo() != nil } + +func (m *GossipMessage) IsIdentityMsg() bool { + return m.GetPeerIdentity() != nil +} + +func (m *GossipMessage) IsPullMsg() bool { + return m.GetDataReq() != nil || m.GetDataUpdate() != nil || + m.GetHello() != nil || m.GetDataDig() != nil +} + +// MsgConsumer invokes code given a GossipMessage +type MsgConsumer func(*GossipMessage) + +// IdentifierExtractor extracts from a GossipMessage an identifier +type IdentifierExtractor func(*GossipMessage) string diff --git a/gossip/proto/message.pb.go b/gossip/proto/message.pb.go index 6bfa80bcf1d..41297084f0d 100644 --- a/gossip/proto/message.pb.go +++ b/gossip/proto/message.pb.go @@ -13,6 +13,7 @@ It has these top-level messages: StateInfo ChannelCommand ConnEstablish + PeerIdentity DataRequest GossipHello DataUpdate @@ -56,15 +57,18 @@ type MsgType int32 const ( MsgType_UNDEFINED MsgType = 0 MsgType_BlockMessage MsgType = 1 + MsgType_IdentityMsg MsgType = 2 ) var MsgType_name = map[int32]string{ 0: "UNDEFINED", 1: "BlockMessage", + 2: "IdentityMsg", } var MsgType_value = map[string]int32{ "UNDEFINED": 0, "BlockMessage": 1, + "IdentityMsg": 2, } func (x MsgType) String() string { @@ -130,6 +134,7 @@ type GossipMessage struct { // *GossipMessage_StateRequest // *GossipMessage_StateResponse // *GossipMessage_LeadershipMsg + // *GossipMessage_PeerIdentity Content isGossipMessage_Content `protobuf_oneof:"content"` } @@ -187,6 +192,9 @@ type GossipMessage_StateResponse struct { type GossipMessage_LeadershipMsg struct { LeadershipMsg *LeadershipMessage `protobuf:"bytes,18,opt,name=leadershipMsg,oneof"` } +type GossipMessage_PeerIdentity struct { + PeerIdentity *PeerIdentity `protobuf:"bytes,19,opt,name=peerIdentity,oneof"` +} func (*GossipMessage_AliveMsg) isGossipMessage_Content() {} func (*GossipMessage_MemReq) isGossipMessage_Content() {} @@ -203,6 +211,7 @@ func (*GossipMessage_StateInfo) isGossipMessage_Content() {} func (*GossipMessage_StateRequest) isGossipMessage_Content() {} func (*GossipMessage_StateResponse) isGossipMessage_Content() {} func (*GossipMessage_LeadershipMsg) isGossipMessage_Content() {} +func (*GossipMessage_PeerIdentity) isGossipMessage_Content() {} func (m *GossipMessage) GetContent() isGossipMessage_Content { if m != nil { @@ -316,6 +325,13 @@ func (m *GossipMessage) GetLeadershipMsg() *LeadershipMessage { return nil } +func (m *GossipMessage) GetPeerIdentity() *PeerIdentity { + if x, ok := m.GetContent().(*GossipMessage_PeerIdentity); ok { + return x.PeerIdentity + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffer) error, func(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error), func(msg proto1.Message) (n int), []interface{}) { return _GossipMessage_OneofMarshaler, _GossipMessage_OneofUnmarshaler, _GossipMessage_OneofSizer, []interface{}{ @@ -334,6 +350,7 @@ func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffe (*GossipMessage_StateRequest)(nil), (*GossipMessage_StateResponse)(nil), (*GossipMessage_LeadershipMsg)(nil), + (*GossipMessage_PeerIdentity)(nil), } } @@ -416,6 +433,11 @@ func _GossipMessage_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { if err := b.EncodeMessage(x.LeadershipMsg); err != nil { return err } + case *GossipMessage_PeerIdentity: + b.EncodeVarint(19<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.PeerIdentity); err != nil { + return err + } case nil: default: return fmt.Errorf("GossipMessage.Content has unexpected type %T", x) @@ -546,6 +568,14 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_LeadershipMsg{msg} return true, err + case 19: // content.peerIdentity + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(PeerIdentity) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_PeerIdentity{msg} + return true, err default: return false, nil } @@ -630,6 +660,11 @@ func _GossipMessage_OneofSizer(msg proto1.Message) (n int) { n += proto1.SizeVarint(18<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s + case *GossipMessage_PeerIdentity: + s := proto1.Size(x.PeerIdentity) + n += proto1.SizeVarint(19<<3 | proto1.WireBytes) + n += proto1.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -675,6 +710,7 @@ func (*ChannelCommand) Descriptor() ([]byte, []int) { return fileDescriptor0, [] type ConnEstablish struct { Sig []byte `protobuf:"bytes,1,opt,name=sig,proto3" json:"sig,omitempty"` PkiID []byte `protobuf:"bytes,2,opt,name=pkiID,proto3" json:"pkiID,omitempty"` + Cert []byte `protobuf:"bytes,3,opt,name=cert,proto3" json:"cert,omitempty"` } func (m *ConnEstablish) Reset() { *m = ConnEstablish{} } @@ -682,6 +718,21 @@ func (m *ConnEstablish) String() string { return proto1.CompactTextSt func (*ConnEstablish) ProtoMessage() {} func (*ConnEstablish) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +// PeerIdentity defines the identity of the peer +// Used to make other peers learn of the identity +// of a certain peer +type PeerIdentity struct { + Sig []byte `protobuf:"bytes,1,opt,name=sig,proto3" json:"sig,omitempty"` + PkiID []byte `protobuf:"bytes,2,opt,name=pkiID,proto3" json:"pkiID,omitempty"` + Cert []byte `protobuf:"bytes,3,opt,name=cert,proto3" json:"cert,omitempty"` + Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (m *PeerIdentity) Reset() { *m = PeerIdentity{} } +func (m *PeerIdentity) String() string { return proto1.CompactTextString(m) } +func (*PeerIdentity) ProtoMessage() {} +func (*PeerIdentity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + // DataRequest is a message used for a peer to request // certain data blocks from a remote peer type DataRequest struct { @@ -693,7 +744,7 @@ type DataRequest struct { func (m *DataRequest) Reset() { *m = DataRequest{} } func (m *DataRequest) String() string { return proto1.CompactTextString(m) } func (*DataRequest) ProtoMessage() {} -func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } // GossipHello is the message that is used for the peer to initiate // a pull round with another peer @@ -706,7 +757,7 @@ type GossipHello struct { func (m *GossipHello) Reset() { *m = GossipHello{} } func (m *GossipHello) String() string { return proto1.CompactTextString(m) } func (*GossipHello) ProtoMessage() {} -func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } // DataUpdate is the the final message in the pull phase // sent from the receiver to the initiator @@ -719,7 +770,7 @@ type DataUpdate struct { func (m *DataUpdate) Reset() { *m = DataUpdate{} } func (m *DataUpdate) String() string { return proto1.CompactTextString(m) } func (*DataUpdate) ProtoMessage() {} -func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (m *DataUpdate) GetData() []*GossipMessage { if m != nil { @@ -739,7 +790,7 @@ type DataDigest struct { func (m *DataDigest) Reset() { *m = DataDigest{} } func (m *DataDigest) String() string { return proto1.CompactTextString(m) } func (*DataDigest) ProtoMessage() {} -func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } // DataMessage is the message that contains a block type DataMessage struct { @@ -749,7 +800,7 @@ type DataMessage struct { func (m *DataMessage) Reset() { *m = DataMessage{} } func (m *DataMessage) String() string { return proto1.CompactTextString(m) } func (*DataMessage) ProtoMessage() {} -func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } +func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } func (m *DataMessage) GetPayload() *Payload { if m != nil { @@ -768,7 +819,7 @@ type Payload struct { func (m *Payload) Reset() { *m = Payload{} } func (m *Payload) String() string { return proto1.CompactTextString(m) } func (*Payload) ProtoMessage() {} -func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } // AliveMessage is sent to inform remote peers // of a peer's existence and activity @@ -776,12 +827,13 @@ type AliveMessage struct { Membership *Member `protobuf:"bytes,1,opt,name=membership" json:"membership,omitempty"` Timestamp *PeerTime `protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"` Signature []byte `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty"` + Identity []byte `protobuf:"bytes,4,opt,name=identity,proto3" json:"identity,omitempty"` } func (m *AliveMessage) Reset() { *m = AliveMessage{} } func (m *AliveMessage) String() string { return proto1.CompactTextString(m) } func (*AliveMessage) ProtoMessage() {} -func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } +func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } func (m *AliveMessage) GetMembership() *Member { if m != nil { @@ -808,7 +860,7 @@ type LeadershipMessage struct { func (m *LeadershipMessage) Reset() { *m = LeadershipMessage{} } func (m *LeadershipMessage) String() string { return proto1.CompactTextString(m) } func (*LeadershipMessage) ProtoMessage() {} -func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } +func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } func (m *LeadershipMessage) GetMembership() *Member { if m != nil { @@ -833,7 +885,7 @@ type PeerTime struct { func (m *PeerTime) Reset() { *m = PeerTime{} } func (m *PeerTime) String() string { return proto1.CompactTextString(m) } func (*PeerTime) ProtoMessage() {} -func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } +func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } // MembershipRequest is used to ask membership information // from a remote peer @@ -845,7 +897,7 @@ type MembershipRequest struct { func (m *MembershipRequest) Reset() { *m = MembershipRequest{} } func (m *MembershipRequest) String() string { return proto1.CompactTextString(m) } func (*MembershipRequest) ProtoMessage() {} -func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } +func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } func (m *MembershipRequest) GetSelfInformation() *AliveMessage { if m != nil { @@ -863,7 +915,7 @@ type MembershipResponse struct { func (m *MembershipResponse) Reset() { *m = MembershipResponse{} } func (m *MembershipResponse) String() string { return proto1.CompactTextString(m) } func (*MembershipResponse) ProtoMessage() {} -func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } +func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } func (m *MembershipResponse) GetAlive() []*AliveMessage { if m != nil { @@ -890,7 +942,7 @@ type Member struct { func (m *Member) Reset() { *m = Member{} } func (m *Member) String() string { return proto1.CompactTextString(m) } func (*Member) ProtoMessage() {} -func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } +func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } // Empty is used for pinging and in tests type Empty struct { @@ -899,7 +951,7 @@ type Empty struct { func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) String() string { return proto1.CompactTextString(m) } func (*Empty) ProtoMessage() {} -func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } // RemoteStateRequest is used to ask a set of blocks // from a remote peer @@ -910,7 +962,7 @@ type RemoteStateRequest struct { func (m *RemoteStateRequest) Reset() { *m = RemoteStateRequest{} } func (m *RemoteStateRequest) String() string { return proto1.CompactTextString(m) } func (*RemoteStateRequest) ProtoMessage() {} -func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } +func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } // RemoteStateResponse is used to send a set of blocks // to a remote peer @@ -921,7 +973,7 @@ type RemoteStateResponse struct { func (m *RemoteStateResponse) Reset() { *m = RemoteStateResponse{} } func (m *RemoteStateResponse) String() string { return proto1.CompactTextString(m) } func (*RemoteStateResponse) ProtoMessage() {} -func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } +func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } func (m *RemoteStateResponse) GetPayloads() []*Payload { if m != nil { @@ -935,6 +987,7 @@ func init() { proto1.RegisterType((*StateInfo)(nil), "proto.StateInfo") proto1.RegisterType((*ChannelCommand)(nil), "proto.ChannelCommand") proto1.RegisterType((*ConnEstablish)(nil), "proto.ConnEstablish") + proto1.RegisterType((*PeerIdentity)(nil), "proto.PeerIdentity") proto1.RegisterType((*DataRequest)(nil), "proto.DataRequest") proto1.RegisterType((*GossipHello)(nil), "proto.GossipHello") proto1.RegisterType((*DataUpdate)(nil), "proto.DataUpdate") @@ -1099,73 +1152,77 @@ var _Gossip_serviceDesc = grpc.ServiceDesc{ func init() { proto1.RegisterFile("message.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1073 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0x5f, 0x4f, 0xe3, 0x46, - 0x10, 0x8f, 0xf3, 0x17, 0x4f, 0x1c, 0x08, 0x73, 0xd7, 0xca, 0x45, 0xad, 0x84, 0xac, 0x93, 0x9a, - 0x46, 0x25, 0xdc, 0xc1, 0xc3, 0x3d, 0x55, 0x3d, 0x20, 0x94, 0x20, 0x1d, 0x01, 0x2d, 0x5c, 0xab, - 0xeb, 0x0b, 0x5a, 0xe2, 0xc5, 0xb1, 0x88, 0xd7, 0x26, 0x6b, 0xae, 0xe2, 0x2b, 0xf4, 0xa5, 0x5f, - 0xad, 0x1f, 0xa9, 0xda, 0x3f, 0x76, 0x6c, 0x12, 0xaa, 0xde, 0x43, 0x75, 0x4f, 0xf1, 0xcc, 0xfe, - 0x7e, 0x3b, 0xb3, 0xbb, 0xf3, 0x9b, 0x09, 0x74, 0x22, 0x26, 0x04, 0x0d, 0xd8, 0x20, 0x99, 0xc7, - 0x69, 0x8c, 0x0d, 0xf5, 0xe3, 0xfd, 0xdd, 0x82, 0xce, 0x49, 0x2c, 0x44, 0x98, 0x9c, 0xe9, 0x65, - 0x7c, 0x09, 0x0d, 0x1e, 0xf3, 0x09, 0x73, 0xad, 0x6d, 0xab, 0x57, 0x27, 0xda, 0x40, 0x17, 0x5a, - 0x93, 0x29, 0xe5, 0x9c, 0xcd, 0xdc, 0xea, 0xb6, 0xd5, 0x73, 0x48, 0x66, 0x62, 0x1f, 0x6a, 0x29, - 0x0d, 0xdc, 0xda, 0xb6, 0xd5, 0x5b, 0xdf, 0x73, 0xf5, 0xee, 0x83, 0xd2, 0x96, 0x83, 0x2b, 0x1a, - 0x10, 0x09, 0xc2, 0x37, 0xb0, 0x46, 0x67, 0xe1, 0x27, 0x76, 0x26, 0x02, 0xb7, 0xbe, 0x6d, 0xf5, - 0xda, 0x7b, 0x2f, 0x0c, 0xe1, 0x40, 0xb9, 0x35, 0x7e, 0x54, 0x21, 0x39, 0x0c, 0xf7, 0xa0, 0x19, - 0xb1, 0x88, 0xb0, 0x7b, 0xb7, 0xa1, 0x08, 0x59, 0x84, 0x33, 0x16, 0xdd, 0xb0, 0xb9, 0x98, 0x86, - 0x09, 0x61, 0xf7, 0x0f, 0x4c, 0xa4, 0xa3, 0x0a, 0x31, 0x48, 0xdc, 0x37, 0x1c, 0xe1, 0x36, 0x15, - 0xe7, 0x9b, 0x15, 0x1c, 0x91, 0xc4, 0x5c, 0xb0, 0x9c, 0x24, 0x70, 0x00, 0x2d, 0x9f, 0xa6, 0x54, - 0xa6, 0xd6, 0x52, 0x2c, 0x34, 0xac, 0xa1, 0xf4, 0xe6, 0x99, 0x65, 0x20, 0xec, 0x43, 0x63, 0xca, - 0x66, 0xb3, 0xd8, 0xfd, 0xad, 0x84, 0xd6, 0x27, 0x1f, 0xc9, 0x95, 0x51, 0x85, 0x68, 0x08, 0xee, - 0xe8, 0xbd, 0x87, 0x61, 0xe0, 0xda, 0x0a, 0xbd, 0x59, 0xd8, 0x7b, 0x18, 0x06, 0x3a, 0xfd, 0x0c, - 0x93, 0xa5, 0x22, 0x0f, 0x0d, 0x4b, 0xa9, 0x2c, 0x8e, 0x9b, 0x81, 0x70, 0x1f, 0x40, 0x7e, 0x7e, - 0x48, 0x7c, 0x9a, 0x32, 0xb7, 0xbd, 0x14, 0x41, 0x2f, 0x8c, 0x2a, 0xa4, 0x00, 0xc3, 0x37, 0xfa, - 0x45, 0x8f, 0x22, 0xdf, 0x75, 0x14, 0xe3, 0x2b, 0xc3, 0x38, 0xd2, 0x0f, 0x7b, 0x14, 0x47, 0x11, - 0xe5, 0xbe, 0x8c, 0x63, 0x70, 0xf8, 0x0a, 0x1a, 0x2c, 0x4a, 0xd2, 0x47, 0xb7, 0xa3, 0x08, 0x8e, - 0x21, 0x1c, 0x4b, 0x9f, 0x3c, 0xac, 0x5a, 0xc4, 0x3e, 0xd4, 0x27, 0x31, 0xe7, 0xee, 0xba, 0x02, - 0xbd, 0xcc, 0x76, 0x8d, 0x39, 0x3f, 0x16, 0x29, 0xbd, 0x99, 0x85, 0x62, 0x3a, 0xaa, 0x10, 0x85, - 0xc1, 0xd7, 0x60, 0x8b, 0x94, 0xa6, 0xec, 0x94, 0xdf, 0xc6, 0xee, 0x86, 0x22, 0x74, 0x0d, 0xe1, - 0x32, 0xf3, 0x8f, 0x2a, 0x64, 0x01, 0xc2, 0x9f, 0xc1, 0x51, 0x86, 0xb9, 0x06, 0xb7, 0x5b, 0x7a, - 0x61, 0xc2, 0xa2, 0x38, 0x65, 0x97, 0x05, 0xc0, 0xa8, 0x42, 0x4a, 0x04, 0x3c, 0x84, 0x8e, 0xb1, - 0x75, 0x09, 0xb8, 0x9b, 0x6a, 0x87, 0xad, 0x55, 0x3b, 0xe4, 0x45, 0x52, 0xa6, 0xe0, 0x3b, 0xe8, - 0xcc, 0x18, 0xf5, 0x75, 0x2d, 0xc9, 0x8a, 0xc1, 0x52, 0x6d, 0xbe, 0x5f, 0xac, 0xe5, 0x75, 0x53, - 0x26, 0x78, 0x63, 0xa8, 0x5d, 0xd1, 0x00, 0x3b, 0x60, 0x7f, 0x18, 0x0f, 0x8f, 0x7f, 0x39, 0x1d, - 0x1f, 0x0f, 0xbb, 0x15, 0xb4, 0xa1, 0x71, 0x7c, 0x76, 0x71, 0xf5, 0xb1, 0x6b, 0xa1, 0x03, 0x6b, - 0xe7, 0xe4, 0xe4, 0xfa, 0x7c, 0xfc, 0xfe, 0x63, 0xb7, 0x2a, 0x71, 0x47, 0xa3, 0x83, 0xb1, 0x36, - 0x6b, 0xd8, 0x05, 0x47, 0x99, 0x07, 0xe3, 0xe1, 0xf5, 0x39, 0x39, 0xe9, 0xd6, 0x0f, 0x6d, 0x68, - 0x4d, 0x62, 0x9e, 0x32, 0x9e, 0x7a, 0x7f, 0x5a, 0x60, 0xe7, 0x97, 0x87, 0x5b, 0xb0, 0x16, 0xb1, - 0x94, 0xca, 0x87, 0x57, 0x8a, 0x76, 0x48, 0x6e, 0xe3, 0x0e, 0xd8, 0x69, 0x18, 0x31, 0x91, 0xd2, - 0x28, 0x51, 0xb2, 0x6e, 0xef, 0x6d, 0x98, 0x23, 0x5c, 0x30, 0x36, 0xbf, 0x0a, 0x23, 0x46, 0x16, - 0x08, 0xd9, 0x19, 0x92, 0xbb, 0xf0, 0x74, 0xa8, 0xb4, 0xee, 0x10, 0x6d, 0xe0, 0xb7, 0x60, 0x8b, - 0x30, 0xe0, 0x34, 0x7d, 0x98, 0x33, 0x25, 0x6a, 0x87, 0x2c, 0x1c, 0x5e, 0x1f, 0xd6, 0xcb, 0xf5, - 0x24, 0x3b, 0x49, 0x42, 0x1f, 0x67, 0x31, 0xf5, 0x4d, 0x3e, 0x99, 0xe9, 0xbd, 0x85, 0x4e, 0xa9, - 0x4a, 0xb0, 0x0b, 0x35, 0x11, 0x06, 0x06, 0x26, 0x3f, 0x17, 0x29, 0x54, 0x0b, 0x29, 0x78, 0x01, - 0xb4, 0x0b, 0xca, 0x78, 0xbe, 0x83, 0xf9, 0x4a, 0x69, 0xc2, 0xad, 0x6e, 0xd7, 0x7a, 0x36, 0xc9, - 0x4c, 0xec, 0x41, 0x2b, 0x12, 0xc1, 0xd5, 0x63, 0xc2, 0x4c, 0x17, 0x5b, 0xcf, 0xfa, 0x85, 0xf6, - 0x92, 0x6c, 0xd9, 0x0b, 0xa1, 0x5d, 0xd0, 0xf7, 0x33, 0x81, 0x8a, 0x37, 0x5e, 0x7d, 0x72, 0xe3, - 0xff, 0x3d, 0xd4, 0x27, 0x80, 0x85, 0x74, 0x9f, 0x89, 0xd4, 0x83, 0xba, 0x89, 0x52, 0x2b, 0x28, - 0xad, 0xd4, 0x7b, 0x49, 0xfd, 0x33, 0xe3, 0xde, 0xea, 0xb8, 0xba, 0x29, 0xfd, 0x8f, 0x57, 0xf9, - 0x56, 0xbf, 0x59, 0x36, 0x75, 0x7a, 0xe5, 0xaa, 0x68, 0xe7, 0xc4, 0x0b, 0xed, 0x5d, 0x54, 0xc9, - 0x29, 0xb4, 0x8c, 0x0f, 0xbf, 0x86, 0xa6, 0x60, 0xf7, 0xe3, 0x87, 0xc8, 0xa4, 0x67, 0x2c, 0x44, - 0xa8, 0x4f, 0xa9, 0x98, 0xaa, 0xdb, 0xb7, 0x89, 0xfa, 0x96, 0x3e, 0x75, 0x57, 0xba, 0x76, 0xd5, - 0xb7, 0x54, 0x8a, 0x53, 0x1c, 0x3c, 0xb8, 0x03, 0x10, 0xe5, 0x33, 0xc2, 0x24, 0xd2, 0x29, 0x0d, - 0x0f, 0x52, 0x00, 0x7c, 0xae, 0x7e, 0x4a, 0x4a, 0xa9, 0x3d, 0x55, 0xca, 0x5f, 0x16, 0x6c, 0x2e, - 0x35, 0x8e, 0x2f, 0x9a, 0xd1, 0x01, 0xac, 0x65, 0x24, 0xfc, 0x0e, 0x20, 0xe4, 0x93, 0x6b, 0xfe, - 0x20, 0x43, 0x99, 0xeb, 0xb6, 0x43, 0x3e, 0x19, 0x2b, 0x47, 0xe1, 0x25, 0xaa, 0xc5, 0x97, 0xf0, - 0xa6, 0xb0, 0xb9, 0x34, 0xa8, 0xf1, 0x27, 0xd8, 0x10, 0x6c, 0x76, 0x2b, 0xdb, 0xd3, 0x3c, 0xa2, - 0x69, 0x18, 0x73, 0x73, 0xb0, 0x55, 0x7f, 0x06, 0xc8, 0x53, 0xac, 0xac, 0xc9, 0x3b, 0x1e, 0xff, - 0xc1, 0x55, 0xed, 0x39, 0x44, 0x1b, 0xde, 0x14, 0x70, 0x79, 0xbc, 0xe3, 0x0f, 0xd0, 0x50, 0xff, - 0x24, 0x5c, 0x4b, 0x49, 0x64, 0x65, 0x00, 0x8d, 0xc0, 0xef, 0xa1, 0xee, 0x33, 0xea, 0x1b, 0x31, - 0xad, 0x44, 0x2a, 0x80, 0xf7, 0x2b, 0x34, 0x75, 0x24, 0xa9, 0x74, 0xc6, 0xfd, 0x24, 0x0e, 0x79, - 0xaa, 0x4e, 0x60, 0x93, 0xdc, 0xfe, 0xd7, 0x2e, 0xb0, 0xb2, 0x91, 0x7a, 0x2d, 0x68, 0xa8, 0x49, - 0xea, 0x0d, 0x00, 0x97, 0xe7, 0x98, 0x14, 0x9d, 0xbe, 0x54, 0xa1, 0x0e, 0x53, 0x27, 0x99, 0xe9, - 0x1d, 0xc0, 0x8b, 0x15, 0x53, 0x0b, 0xfb, 0xb0, 0x66, 0x34, 0x23, 0xcc, 0xf1, 0x9f, 0x6a, 0x2a, - 0x5f, 0xef, 0xf7, 0xa1, 0x65, 0x14, 0xfa, 0x74, 0x24, 0x75, 0xc1, 0x39, 0x9c, 0xc5, 0x93, 0x3b, - 0x73, 0x07, 0x5d, 0x6b, 0x2f, 0x81, 0xa6, 0x6e, 0x31, 0xf8, 0x0e, 0x1c, 0xfd, 0x75, 0x99, 0xce, - 0x19, 0x8d, 0x70, 0x65, 0x07, 0xda, 0x5a, 0xe9, 0xf5, 0x2a, 0x3d, 0xeb, 0xb5, 0x85, 0xaf, 0xa0, - 0x7e, 0x11, 0xf2, 0x00, 0x4b, 0x7f, 0x25, 0xb6, 0x4a, 0x96, 0x57, 0x39, 0xfc, 0xf1, 0xf7, 0x7e, - 0x10, 0xa6, 0xd3, 0x87, 0x9b, 0xc1, 0x24, 0x8e, 0x76, 0xa7, 0x8f, 0x09, 0x9b, 0xcf, 0x98, 0x1f, - 0xb0, 0xf9, 0xee, 0x2d, 0xbd, 0x99, 0x87, 0x93, 0xdd, 0x40, 0x6d, 0xbd, 0xab, 0x58, 0x37, 0x4d, - 0xf5, 0xb3, 0xff, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc9, 0xb3, 0xdc, 0x93, 0xf1, 0x0a, 0x00, - 0x00, + // 1140 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0x6f, 0x4f, 0xe3, 0xc6, + 0x13, 0x8e, 0xf3, 0xdf, 0x13, 0x07, 0xc2, 0x70, 0xbf, 0x9f, 0x5c, 0xd4, 0x4a, 0xc8, 0x3a, 0xa9, + 0x69, 0x54, 0xc2, 0x1d, 0xbc, 0xa8, 0xaa, 0xaa, 0xea, 0x01, 0xa1, 0x04, 0xf5, 0x08, 0x68, 0xe1, + 0x5a, 0x5d, 0xdf, 0x20, 0x13, 0x2f, 0x8e, 0x45, 0xbc, 0x36, 0xd9, 0xe5, 0x2a, 0xbe, 0x42, 0xfb, + 0xa2, 0xdf, 0xa2, 0x9f, 0xb3, 0xda, 0x3f, 0x4e, 0x6c, 0x12, 0xaa, 0x9e, 0xaa, 0xaa, 0xaf, 0xec, + 0x99, 0x7d, 0x9e, 0x9d, 0xd9, 0xdd, 0x99, 0x67, 0x17, 0xda, 0x31, 0xe5, 0xdc, 0x0f, 0x69, 0x3f, + 0x9d, 0x25, 0x22, 0xc1, 0x9a, 0xfa, 0x78, 0xbf, 0x35, 0xa1, 0x7d, 0x92, 0x70, 0x1e, 0xa5, 0x67, + 0x7a, 0x18, 0x5f, 0x40, 0x8d, 0x25, 0x6c, 0x4c, 0x5d, 0x6b, 0xdb, 0xea, 0x56, 0x89, 0x36, 0xd0, + 0x85, 0xc6, 0x78, 0xe2, 0x33, 0x46, 0xa7, 0x6e, 0x79, 0xdb, 0xea, 0x3a, 0x24, 0x33, 0xb1, 0x07, + 0x15, 0xe1, 0x87, 0x6e, 0x65, 0xdb, 0xea, 0xae, 0xed, 0xb9, 0x7a, 0xf6, 0x7e, 0x61, 0xca, 0xfe, + 0x95, 0x1f, 0x12, 0x09, 0xc2, 0xd7, 0xd0, 0xf4, 0xa7, 0xd1, 0x07, 0x7a, 0xc6, 0x43, 0xb7, 0xba, + 0x6d, 0x75, 0x5b, 0x7b, 0x9b, 0x86, 0x70, 0xa0, 0xdc, 0x1a, 0x3f, 0x2c, 0x91, 0x39, 0x0c, 0xf7, + 0xa0, 0x1e, 0xd3, 0x98, 0xd0, 0x7b, 0xb7, 0xa6, 0x08, 0x59, 0x84, 0x33, 0x1a, 0xdf, 0xd0, 0x19, + 0x9f, 0x44, 0x29, 0xa1, 0xf7, 0x0f, 0x94, 0x8b, 0x61, 0x89, 0x18, 0x24, 0xee, 0x1b, 0x0e, 0x77, + 0xeb, 0x8a, 0xf3, 0xc9, 0x0a, 0x0e, 0x4f, 0x13, 0xc6, 0xe9, 0x9c, 0xc4, 0xb1, 0x0f, 0x8d, 0xc0, + 0x17, 0xbe, 0x4c, 0xad, 0xa1, 0x58, 0x68, 0x58, 0x03, 0xe9, 0x9d, 0x67, 0x96, 0x81, 0xb0, 0x07, + 0xb5, 0x09, 0x9d, 0x4e, 0x13, 0xf7, 0xa7, 0x02, 0x5a, 0xaf, 0x7c, 0x28, 0x47, 0x86, 0x25, 0xa2, + 0x21, 0xb8, 0xa3, 0xe7, 0x1e, 0x44, 0xa1, 0x6b, 0x2b, 0xf4, 0x46, 0x6e, 0xee, 0x41, 0x14, 0xea, + 0xf4, 0x33, 0x4c, 0x96, 0x8a, 0x5c, 0x34, 0x2c, 0xa5, 0xb2, 0x58, 0x6e, 0x06, 0xc2, 0x7d, 0x00, + 0xf9, 0xfb, 0x2e, 0x0d, 0x7c, 0x41, 0xdd, 0xd6, 0x52, 0x04, 0x3d, 0x30, 0x2c, 0x91, 0x1c, 0x0c, + 0x5f, 0xeb, 0x13, 0x3d, 0x8a, 0x03, 0xd7, 0x51, 0x8c, 0xff, 0x19, 0xc6, 0x91, 0x3e, 0xd8, 0xa3, + 0x24, 0x8e, 0x7d, 0x16, 0xc8, 0x38, 0x06, 0x87, 0x2f, 0xa1, 0x46, 0xe3, 0x54, 0x3c, 0xba, 0x6d, + 0x45, 0x70, 0x0c, 0xe1, 0x58, 0xfa, 0xe4, 0x62, 0xd5, 0x20, 0xf6, 0xa0, 0x3a, 0x4e, 0x18, 0x73, + 0xd7, 0x14, 0xe8, 0x45, 0x36, 0x6b, 0xc2, 0xd8, 0x31, 0x17, 0xfe, 0xcd, 0x34, 0xe2, 0x93, 0x61, + 0x89, 0x28, 0x0c, 0xbe, 0x02, 0x9b, 0x0b, 0x5f, 0xd0, 0x53, 0x76, 0x9b, 0xb8, 0xeb, 0x8a, 0xd0, + 0x31, 0x84, 0xcb, 0xcc, 0x3f, 0x2c, 0x91, 0x05, 0x08, 0xbf, 0x03, 0x47, 0x19, 0x66, 0x1b, 0xdc, + 0x4e, 0xe1, 0x84, 0x09, 0x8d, 0x13, 0x41, 0x2f, 0x73, 0x80, 0x61, 0x89, 0x14, 0x08, 0x78, 0x08, + 0x6d, 0x63, 0xeb, 0x12, 0x70, 0x37, 0xd4, 0x0c, 0x5b, 0xab, 0x66, 0x98, 0x17, 0x49, 0x91, 0x82, + 0x6f, 0xa0, 0x3d, 0xa5, 0x7e, 0xa0, 0x6b, 0x49, 0x56, 0x0c, 0x16, 0x6a, 0xf3, 0xed, 0x62, 0x6c, + 0x5e, 0x37, 0x45, 0x02, 0x7e, 0x0d, 0x4e, 0x4a, 0xe9, 0xec, 0x34, 0xa0, 0x4c, 0x44, 0xe2, 0xd1, + 0xdd, 0x2c, 0x74, 0xc3, 0x45, 0x6e, 0x48, 0x2e, 0x20, 0x0f, 0xf5, 0x46, 0x50, 0xb9, 0xf2, 0x43, + 0x6c, 0x83, 0xfd, 0x6e, 0x34, 0x38, 0xfe, 0xfe, 0x74, 0x74, 0x3c, 0xe8, 0x94, 0xd0, 0x86, 0xda, + 0xf1, 0xd9, 0xc5, 0xd5, 0xfb, 0x8e, 0x85, 0x0e, 0x34, 0xcf, 0xc9, 0xc9, 0xf5, 0xf9, 0xe8, 0xed, + 0xfb, 0x4e, 0x59, 0xe2, 0x8e, 0x86, 0x07, 0x23, 0x6d, 0x56, 0xb0, 0x03, 0x8e, 0x32, 0x0f, 0x46, + 0x83, 0xeb, 0x73, 0x72, 0xd2, 0xa9, 0x1e, 0xda, 0xd0, 0x18, 0x27, 0x4c, 0x50, 0x26, 0xbc, 0x5f, + 0x2d, 0xb0, 0xe7, 0xfb, 0x8e, 0x5b, 0xd0, 0x8c, 0xa9, 0xf0, 0x65, 0xcd, 0x28, 0x31, 0x70, 0xc8, + 0xdc, 0xc6, 0x1d, 0xb0, 0x45, 0x14, 0x53, 0x2e, 0xfc, 0x38, 0x55, 0x8a, 0xd0, 0xda, 0x5b, 0xcf, + 0x25, 0x7f, 0x15, 0xc5, 0x94, 0x2c, 0x10, 0x52, 0x54, 0xd2, 0xbb, 0xe8, 0x74, 0xa0, 0x64, 0xc2, + 0x21, 0xda, 0xc0, 0x4f, 0xc1, 0xe6, 0x51, 0xc8, 0x7c, 0xf1, 0x30, 0xa3, 0x4a, 0x0f, 0x1c, 0xb2, + 0x70, 0x78, 0x3d, 0x58, 0x2b, 0x96, 0xa2, 0x14, 0xa1, 0xd4, 0x7f, 0x9c, 0x26, 0x7e, 0x60, 0xf2, + 0xc9, 0x4c, 0xef, 0x07, 0x68, 0x17, 0x0a, 0x0c, 0x3b, 0x50, 0xe1, 0x51, 0x68, 0x60, 0xf2, 0x77, + 0x91, 0x42, 0x39, 0x9f, 0x02, 0x42, 0x75, 0x4c, 0x67, 0xc2, 0xe4, 0xa5, 0xfe, 0xbd, 0x5b, 0x70, + 0xf2, 0x07, 0xf0, 0x4f, 0xe6, 0x2a, 0xec, 0x61, 0xb5, 0xb8, 0x87, 0x5e, 0x08, 0xad, 0x5c, 0x43, + 0x3f, 0x2f, 0xbc, 0x81, 0x12, 0x08, 0xee, 0x96, 0xb7, 0x2b, 0x5d, 0x9b, 0x64, 0x26, 0x76, 0xa1, + 0x11, 0xf3, 0xf0, 0xea, 0x31, 0xa5, 0x46, 0x7c, 0xd7, 0x32, 0x99, 0xd3, 0x5e, 0x92, 0x0d, 0x7b, + 0x11, 0xb4, 0x72, 0xb2, 0xf4, 0x4c, 0xa0, 0x7c, 0xa6, 0xe5, 0x27, 0xa7, 0xfd, 0xf7, 0x43, 0x7d, + 0x00, 0x58, 0x28, 0xce, 0x33, 0x91, 0xba, 0x50, 0x35, 0x51, 0x2a, 0x39, 0x81, 0x28, 0x5c, 0x19, + 0xa4, 0xfa, 0x91, 0x71, 0x6f, 0x75, 0x5c, 0xad, 0xa5, 0xff, 0xe2, 0x56, 0x7e, 0xa5, 0xcf, 0x2c, + 0xbb, 0x2c, 0xbb, 0xc5, 0x8a, 0x6c, 0xcd, 0x89, 0x17, 0xda, 0xbb, 0xa8, 0xd0, 0x53, 0x68, 0x18, + 0x1f, 0xfe, 0x1f, 0xea, 0x9c, 0xde, 0x8f, 0x1e, 0x62, 0x93, 0x9e, 0xb1, 0x64, 0xfd, 0x4c, 0x7c, + 0x3e, 0x51, 0xbb, 0x6f, 0x13, 0xf5, 0x2f, 0x7d, 0x6a, 0xaf, 0x4c, 0x4d, 0xa9, 0xba, 0xf9, 0xc3, + 0x02, 0x27, 0x7f, 0x5f, 0xe2, 0x0e, 0x40, 0x3c, 0xbf, 0xda, 0x4c, 0x22, 0xed, 0xc2, 0x9d, 0x47, + 0x72, 0x80, 0x8f, 0xed, 0xdd, 0x42, 0x97, 0x56, 0x9e, 0x74, 0xa9, 0x2c, 0x9b, 0x28, 0x13, 0x31, + 0x53, 0xe0, 0x99, 0xed, 0xfd, 0x6e, 0xc1, 0xc6, 0x92, 0x16, 0xfe, 0x97, 0xd9, 0x7a, 0x07, 0xd0, + 0xcc, 0x48, 0xf8, 0x19, 0x40, 0xc4, 0xc6, 0xd7, 0xec, 0x41, 0x86, 0x32, 0x47, 0x61, 0x47, 0x6c, + 0x3c, 0x52, 0x8e, 0xdc, 0x29, 0x95, 0xf3, 0xa7, 0xe4, 0x4d, 0x60, 0x63, 0xe9, 0xed, 0x81, 0xdf, + 0xc2, 0x3a, 0xa7, 0xd3, 0x5b, 0x29, 0x9b, 0xb3, 0xd8, 0x17, 0x51, 0xc2, 0xcc, 0xc2, 0x56, 0xbd, + 0x6f, 0xc8, 0x53, 0xac, 0xac, 0xd7, 0x3b, 0x96, 0xfc, 0xc2, 0x54, 0x5d, 0x3a, 0x44, 0x1b, 0xde, + 0x04, 0x70, 0xf9, 0xc5, 0x82, 0x5f, 0x40, 0x4d, 0x3d, 0x8e, 0x5c, 0x4b, 0xb5, 0xcf, 0xca, 0x00, + 0x1a, 0x81, 0x9f, 0x43, 0x35, 0xa0, 0x7e, 0x60, 0x1a, 0x6d, 0x25, 0x52, 0x01, 0xbc, 0x1f, 0xa1, + 0xae, 0x23, 0xc9, 0xe3, 0xa4, 0x2c, 0x48, 0x93, 0x88, 0x09, 0xb5, 0x02, 0x9b, 0xcc, 0xed, 0xbf, + 0x54, 0x88, 0x95, 0x02, 0xef, 0x35, 0xa0, 0xa6, 0x1e, 0x07, 0x5e, 0x1f, 0x70, 0xf9, 0x6a, 0x96, + 0x0d, 0xa9, 0x37, 0x95, 0xab, 0xc5, 0x54, 0x49, 0x66, 0x7a, 0x07, 0xb0, 0xb9, 0xe2, 0x22, 0xc6, + 0x1e, 0x34, 0x4d, 0x3f, 0x71, 0xb3, 0xfc, 0xa7, 0xfd, 0x36, 0x1f, 0xef, 0x7d, 0x03, 0x0d, 0xd3, + 0xbd, 0x4f, 0xaf, 0xca, 0x0e, 0x38, 0x87, 0xd3, 0x64, 0x7c, 0x67, 0xf6, 0xa0, 0x63, 0xe1, 0x3a, + 0xb4, 0x32, 0xb5, 0x3f, 0xe3, 0x61, 0xa7, 0xbc, 0x97, 0x42, 0x5d, 0xeb, 0x11, 0xbe, 0x01, 0x47, + 0xff, 0x5d, 0x8a, 0x19, 0xf5, 0x63, 0x5c, 0x29, 0x57, 0x5b, 0x2b, 0xbd, 0x5e, 0xa9, 0x6b, 0xbd, + 0xb2, 0xf0, 0x25, 0x54, 0x2f, 0x22, 0x16, 0x62, 0xe1, 0xb9, 0xb4, 0x55, 0xb0, 0xbc, 0xd2, 0xe1, + 0x97, 0x3f, 0xf7, 0xc2, 0x48, 0x4c, 0x1e, 0x6e, 0xfa, 0xe3, 0x24, 0xde, 0x9d, 0x3c, 0xa6, 0x74, + 0x36, 0xa5, 0x41, 0x48, 0x67, 0xbb, 0xb7, 0xfe, 0xcd, 0x2c, 0x1a, 0xef, 0x86, 0x6a, 0xea, 0x5d, + 0xc5, 0xba, 0xa9, 0xab, 0xcf, 0xfe, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x03, 0xbd, 0x16, 0x54, + 0xd5, 0x0b, 0x00, 0x00, } diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto index 41c35f1c7b1..199d50ea230 100644 --- a/gossip/proto/message.proto +++ b/gossip/proto/message.proto @@ -77,6 +77,9 @@ message GossipMessage { // Used to indicate intent of peer to become leader LeadershipMessage leadershipMsg = 18; + + // Used to learn of a peer's certificate + PeerIdentity peerIdentity = 19; } } @@ -102,13 +105,25 @@ message ChannelCommand { message ConnEstablish { bytes sig = 1; bytes pkiID = 2; + bytes cert = 3; +} + +// PeerIdentity defines the identity of the peer +// Used to make other peers learn of the identity +// of a certain peer +message PeerIdentity { + bytes sig = 1; + bytes pkiID = 2; + bytes cert = 3; + bytes metadata = 4; } // Messages related to pull mechanism enum MsgType { - UNDEFINED = 0; + UNDEFINED = 0; BlockMessage = 1; + IdentityMsg = 2; } // DataRequest is a message used for a peer to request @@ -167,6 +182,7 @@ message AliveMessage { Member membership = 1; PeerTime timestamp = 2; bytes signature = 3; + bytes identity = 4; } // Leadership Message is sent during leader election to inform diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 4bd18592799..d0e1bb8d908 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -17,17 +17,20 @@ limitations under the License. package state import ( - "bytes" "fmt" "os" "strconv" "testing" "time" + "bytes" + pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" pcomm "github.com/hyperledger/fabric/protos/common" @@ -39,6 +42,24 @@ var ( logger, _ = logging.GetLogger("GossipStateProviderTest") ) +type naiveSecProvider struct { +} + +func (*naiveSecProvider) IsEnabled() bool { + return true +} + +func (*naiveSecProvider) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +func (*naiveSecProvider) Verify(vkID, signature, message []byte) error { + if bytes.Equal(signature, message) { + return nil + } + return fmt.Errorf("Failed verifying") +} + type naiveCryptoService struct { } @@ -54,15 +75,36 @@ func (*naiveCryptoService) IsEnabled() bool { return true } +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { return msg, nil } -func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { - if bytes.Equal(signature, message) { - return nil +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } - return fmt.Errorf("Failed verifying") + return nil } func bootPeers(ids ...int) []string { @@ -110,14 +152,14 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { // Create gossip instance func newGossipInstance(config *gossip.Config, comm comm.Comm) gossip.Gossip { - return gossip.NewGossipService(config, comm, &naiveCryptoService{}) + return gossip.NewGossipService(config, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(config.ID)) } // Setup and create basic communication module // need to be used for peer-to-peer communication // between peers and state transfer func newCommInstance(config *gossip.Config) comm.Comm { - comm, err := comm.NewCommInstanceWithServer(config.BindPort, &naiveCryptoService{}, []byte(config.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(config.BindPort, &naiveSecProvider{}, []byte(config.SelfEndpoint)) if err != nil { panic(err) } diff --git a/gossip/util/misc.go b/gossip/util/misc.go index 3b5e0413f3c..297c5ba2a3e 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -130,4 +130,4 @@ func PrintStackTrace() { buf := make([]byte, 1<<16) runtime.Stack(buf, true) fmt.Printf("%s", buf) -} +} \ No newline at end of file