diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index 32465e05448..4bff20f9654 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -254,7 +254,7 @@ func (c *commImpl) isStopping() bool { func (c *commImpl) Probe(peer *RemotePeer) error { if c.isStopping() { - return fmt.Errorf("Stopping!") + return fmt.Errorf("Stopping") } c.logger.Debug("Entering, endpoint:", peer.Endpoint, "PKIID:", peer.PKIID) var err error diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index 96cdc0e9776..7269b156c93 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -173,7 +173,7 @@ func TestBasic(t *testing.T) { m2 := comm2.Accept(acceptAll) out := make(chan uint64, 2) reader := func(ch <-chan ReceivedMessage) { - m := <- ch + m := <-ch out <- m.GetGossipMessage().Nonce } go reader(m1) diff --git a/gossip/gossip/certstore.go b/gossip/gossip/certstore.go new file mode 100644 index 00000000000..94e805ef75e --- /dev/null +++ b/gossip/gossip/certstore.go @@ -0,0 +1,104 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "sync" + + prot "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/gossip/pull" + "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +// certStore supports pull dissemination of identity messages +type certStore struct { + sync.RWMutex + selfIdentity api.PeerIdentityType + idMapper identity.Mapper + pull pull.Mediator + logger *util.Logger +} + +func newCertStore(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, pullMed pull.Mediator) *certStore { + certStore := &certStore{ + idMapper: identity.NewIdentityMapper(mcs), + selfIdentity: selfIdentity, + pull: pullMed, + } + + selfPKIID := certStore.idMapper.GetPKIidOfCert(selfIdentity) + + certStore.logger = util.GetLogger("certStore", string(selfPKIID)) + + if err := certStore.idMapper.Put(selfPKIID, selfIdentity); err != nil { + certStore.logger.Panic("Failed associating self PKIID to cert:", err) + } + + pullMed.Add(certStore.createIdentityMessage()) + + pullMed.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ comm.ReceivedMessage) { + for _, msg := range msgs { + pkiID := common.PKIidType(msg.GetPeerIdentity().PkiID) + cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert) + if err := certStore.idMapper.Put(pkiID, cert); err != nil { + certStore.logger.Warning("Failed adding identity", cert, ", reason:", err) + } + } + }) + + return certStore +} + +func (cs *certStore) createIdentityMessage() *proto.GossipMessage { + identity := &proto.PeerIdentity{ + Cert: cs.selfIdentity, + Metadata: nil, + PkiID: cs.idMapper.GetPKIidOfCert(cs.selfIdentity), + Sig: nil, + } + + b, err := prot.Marshal(identity) + if err != nil { + cs.logger.Warning("Failed marshalling identity message:", err) + return nil + } + + sig, err := cs.idMapper.Sign(b) + if err != nil { + cs.logger.Warning("Failed signing identity message:", err) + return nil + } + identity.Sig = sig + + return &proto.GossipMessage{ + Channel: nil, + Nonce: 0, + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_PeerIdentity{ + PeerIdentity: identity, + }, + } +} + +func (cs *certStore) stop() { + cs.pull.Stop() +} diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 98d33b361ad..76f09b091ce 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -23,10 +23,11 @@ import ( "sync/atomic" "time" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" - "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/gossip/pull" "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/util" "github.com/op/go-logging" @@ -38,24 +39,25 @@ const ( ) type gossipServiceImpl struct { + certStore *certStore presumedDead chan common.PKIidType disc discovery.Discovery comm comm.Comm *comm.ChannelDeMultiplexer - logger *util.Logger - stopSignal *sync.WaitGroup - conf *Config - toDieChan chan struct{} - stopFlag int32 - msgStore messageStore - emitter batchingEmitter - pushPull *algo.PullEngine - goRoutines []uint64 - discAdapter *discoveryAdapter + logger *util.Logger + stopSignal *sync.WaitGroup + conf *Config + toDieChan chan struct{} + stopFlag int32 + msgStore messageStore + blocksPuller pull.Mediator + emitter batchingEmitter + goRoutines []uint64 + discAdapter *discoveryAdapter } // NewGossipService creates a new gossip instance -func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Gossip { +func NewGossipService(conf *Config, c comm.Comm, mcs api.MessageCryptoService, crypto discovery.CryptoService, selfID api.PeerIdentityType) Gossip { g := &gossipServiceImpl{ presumedDead: make(chan common.PKIidType, presumedDeadChanSize), disc: nil, @@ -79,14 +81,14 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{}, }, g.discAdapter, crypto) - g.pushPull = algo.NewPullEngine(g, conf.PullInterval) - g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { - if dataMsg, isDataMsg := m.(*proto.DataMessage); isDataMsg { - g.pushPull.Remove(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) - } + g.blocksPuller.Remove(m.(*proto.GossipMessage)) }) + g.blocksPuller = g.createPull() + + g.certStore = newCertStore(mcs, selfID, g.createPull()) + g.logger.SetLevel(logging.WARNING) go g.start() @@ -94,6 +96,39 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) return g } +func (g *gossipServiceImpl) createPull() pull.Mediator { + gConf := g.conf + conf := pull.PullConfig{ + MsgType: proto.MsgType_BlockMessage, + Channel: []byte(""), + Id: gConf.SelfEndpoint, + PeerCountToSelect: gConf.PullPeerNum, + PullInterval: gConf.PullInterval, + Tag: proto.GossipMessage_EMPTY, + } + seqNumFromMsg := func(msg *proto.GossipMessage) string { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + return "" + } + return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) + } + blockConsumer := func(msg *proto.GossipMessage) { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + return + } + added := g.msgStore.add(msg) + // if we can't add the message to the msgStore, + // no point in disseminating it to others... + if !added { + return + } + g.DeMultiplex(msg) + } + return pull.NewPullMediator(conf, g.comm, g.disc, seqNumFromMsg, blockConsumer) +} + func (g *gossipServiceImpl) toDie() bool { return atomic.LoadInt32(&g.stopFlag) == int32(1) } @@ -163,104 +198,14 @@ func (g *gossipServiceImpl) acceptMessages(incMsgs <-chan comm.ReceivedMessage) } } -func (g *gossipServiceImpl) SelectPeers() []string { - if g.disc == nil { - return []string{} - } - peers := selectEndpoints(g.conf.PullPeerNum, g.disc.GetMembership()) - g.logger.Debug("Selected", len(peers), "peers") - return peers -} - -func (g *gossipServiceImpl) Hello(dest string, nonce uint64) { - helloMsg := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_Hello{ - Hello: &proto.GossipHello{ - Nonce: nonce, - Metadata: nil, - MsgType: proto.MsgType_BlockMessage, - }, - }, - } - - g.logger.Debug("Sending hello to", dest) - g.comm.Send(helloMsg, g.peersWithEndpoints(dest)...) - -} - -func (g *gossipServiceImpl) SendDigest(digest []string, nonce uint64, context interface{}) { - digMsg := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataDig{ - DataDig: &proto.DataDigest{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Digests: digest, - }, - }, - } - g.logger.Debug("Sending digest", digMsg.GetDataDig().Digests) - context.(comm.ReceivedMessage).Respond(digMsg) -} - -func (g *gossipServiceImpl) SendReq(dest string, items []string, nonce uint64) { - req := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataReq{ - DataReq: &proto.DataRequest{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Digests: items, - }, - }, - } - g.logger.Debug("Sending", req, "to", dest) - g.comm.Send(req, g.peersWithEndpoints(dest)...) -} - -func (g *gossipServiceImpl) SendRes(requestedItems []string, context interface{}, nonce uint64) { - itemMap := make(map[string]*proto.GossipMessage) - for _, msg := range g.msgStore.get() { - if dataMsg := msg.(*proto.GossipMessage).GetDataMsg(); dataMsg != nil { - itemMap[fmt.Sprintf("%d", dataMsg.Payload.SeqNum)] = msg.(*proto.GossipMessage) - } - } - - dataMsgs := []*proto.GossipMessage{} - - for _, item := range requestedItems { - if dataMsg, exists := itemMap[item]; exists { - dataMsgs = append(dataMsgs, dataMsg) - } - } - - returnedUpdate := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: 0, - Content: &proto.GossipMessage_DataUpdate{ - DataUpdate: &proto.DataUpdate{ - MsgType: proto.MsgType_BlockMessage, - Nonce: nonce, - Data: dataMsgs, - }, - }, - } - - g.logger.Debug("Sending response", returnedUpdate.GetDataUpdate().Data) - context.(comm.ReceivedMessage).Respond(returnedUpdate) -} - func (g *gossipServiceImpl) handleMessage(msg comm.ReceivedMessage) { if g.toDie() { return } g.logger.Info("Entering,", msg) defer g.logger.Info("Exiting") - if msg == nil { + + if msg == nil || msg.GetGossipMessage() == nil { return } @@ -280,15 +225,14 @@ func (g *gossipServiceImpl) handleMessage(msg comm.ReceivedMessage) { if dataMsg := msg.GetGossipMessage().GetDataMsg(); dataMsg != nil { g.DeMultiplex(msg.GetGossipMessage()) - g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) + g.blocksPuller.Add(msg.GetGossipMessage()) } return } - if msg.GetGossipMessage().GetDataReq() != nil || msg.GetGossipMessage().GetDataUpdate() != nil || - msg.GetGossipMessage().GetHello() != nil || msg.GetGossipMessage().GetDataDig() != nil { - g.handlePushPullMsg(msg) + if msg.GetGossipMessage().IsPullMsg() { + g.blocksPuller.HandleMessage(msg) } } @@ -299,36 +243,6 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg comm.ReceivedMessage) { g.discAdapter.incChan <- msg.GetGossipMessage() } -func (g *gossipServiceImpl) handlePushPullMsg(msg comm.ReceivedMessage) { - g.logger.Debug(msg) - if helloMsg := msg.GetGossipMessage().GetHello(); helloMsg != nil { - if helloMsg.MsgType != proto.MsgType_BlockMessage { - return - } - g.pushPull.OnHello(helloMsg.Nonce, msg) - } - if digest := msg.GetGossipMessage().GetDataDig(); digest != nil { - g.pushPull.OnDigest(digest.Digests, digest.Nonce, msg) - } - if req := msg.GetGossipMessage().GetDataReq(); req != nil { - g.pushPull.OnReq(req.Digests, req.Nonce, msg) - } - if res := msg.GetGossipMessage().GetDataUpdate(); res != nil { - items := make([]string, len(res.Data)) - for i, data := range res.Data { - added := g.msgStore.add(data) - // if we can't add the message to the msgStore, - // no point in disseminating it to others... - if !added { - continue - } - g.DeMultiplex(data) - items[i] = fmt.Sprintf("%d", data.GetDataMsg().Payload.SeqNum) - } - g.pushPull.OnRes(items, res.Nonce) - } -} - func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { msgs2Gossip := make([]*proto.GossipMessage, len(a)) for i, e := range a { @@ -342,30 +256,17 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { g.logger.Error("Discovery has not been initialized yet, aborting!") return } - peers2Send := selectEndpoints(g.conf.PropagatePeerNum, g.disc.GetMembership()) + peers2Send := pull.SelectEndpoints(g.conf.PropagatePeerNum, g.disc.GetMembership()) for _, msg := range msgs { - g.comm.Send(msg, g.peersWithEndpoints(peers2Send...)...) - } -} - -func selectEndpoints(k int, peerPool []discovery.NetworkMember) []string { - if len(peerPool) < k { - k = len(peerPool) - } - - indices := util.GetRandomIndices(k, len(peerPool)-1) - endpoints := make([]string, len(indices)) - for i, j := range indices { - endpoints[i] = peerPool[j].Endpoint + g.comm.Send(msg, peers2Send...) } - return endpoints } func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { g.logger.Info(msg) if dataMsg := msg.GetDataMsg(); dataMsg != nil { g.msgStore.add(msg) - g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) + g.blocksPuller.Add(msg) } g.emitter.Add(msg) } @@ -392,7 +293,8 @@ func (g *gossipServiceImpl) Stop() { }() g.discAdapter.close() g.disc.Stop() - g.pushPull.Stop() + g.blocksPuller.Stop() + g.certStore.stop() g.toDieChan <- struct{}{} g.emitter.Stop() g.ChannelDeMultiplexer.Close() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index a5ec4c780a2..49f907e92d0 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -28,7 +28,9 @@ import ( "strconv" "strings" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/gossip/algo" "github.com/hyperledger/fabric/gossip/proto" @@ -57,6 +59,24 @@ func acceptData(m interface{}) bool { return false } +type naiveSecProvider struct { +} + +func (*naiveSecProvider) IsEnabled() bool { + return true +} + +func (*naiveSecProvider) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +func (*naiveSecProvider) Verify(vkID, signature, message []byte) error { + if bytes.Equal(signature, message) { + return nil + } + return fmt.Errorf("Failed verifying") +} + type naiveCryptoService struct { } @@ -72,15 +92,36 @@ func (*naiveCryptoService) IsEnabled() bool { return true } +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { return msg, nil } -func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { - if bytes.Equal(signature, message) { - return nil +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } - return fmt.Errorf("Failed verifying") + return nil } func bootPeers(ids ...int) []string { @@ -106,11 +147,11 @@ func newGossipInstance(id int, maxMsgCount int, boot ...int) Gossip { PullPeerNum: 5, SelfEndpoint: fmt.Sprintf("localhost:%d", port), } - comm, err := comm.NewCommInstanceWithServer(port, &naiveCryptoService{}, []byte(conf.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(port, &naiveSecProvider{}, []byte(conf.SelfEndpoint)) if err != nil { panic(err) } - return NewGossipService(conf, comm, &naiveCryptoService{}) + return NewGossipService(conf, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.ID)) } func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip { @@ -128,11 +169,11 @@ func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip PullPeerNum: 20, SelfEndpoint: fmt.Sprintf("localhost:%d", port), } - comm, err := comm.NewCommInstanceWithServer(port, &naiveCryptoService{}, []byte(conf.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(port, &naiveSecProvider{}, []byte(conf.SelfEndpoint)) if err != nil { panic(err) } - return NewGossipService(conf, comm, &naiveCryptoService{}) + return NewGossipService(conf, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.ID)) } func TestPull(t *testing.T) { diff --git a/gossip/gossip/pull/pullstore.go b/gossip/gossip/pull/pullstore.go new file mode 100644 index 00000000000..042b9e918fc --- /dev/null +++ b/gossip/gossip/pull/pullstore.go @@ -0,0 +1,330 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pull + +import ( + "sync" + "time" + + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +const ( + HelloMsgType PullMsgType = iota + DigestMsgType + RequestMsgType + ResponseMsgType +) + +// PullMsgType defines the type of a message that is sent to the PullStore +type PullMsgType int + +// MessageHook defines a function that will run after a certain pull message is received +type MessageHook func(itemIds []string, items []*proto.GossipMessage, msg comm.ReceivedMessage) + +type Sender interface { + // Send sends a message to a list of remote peers + Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) +} + +// MembershipService obtains membership information of alive peers +type MembershipService interface { + // GetMembership returns the membership of + GetMembership() []discovery.NetworkMember +} + +// PullConfig defines the configuration of the pull mediator +type PullConfig struct { + Id string + PullInterval time.Duration // Duration between pull invocations + PeerCountToSelect int // Number of peers to initiate pull with + Tag proto.GossipMessage_Tag + Channel common.ChainID + MsgType proto.MsgType +} + +// Mediator is a component wrap a PullEngine and provides the methods +// it needs to perform pull synchronization.. +// The specialization of a pull mediator to a certain type of message is +// done by the configuration, a IdentifierExtractor, IdentifierExtractor +// given at construction, and also hooks that can be registered for each +// type of pullMsgType (hello, digest, req, res). +type Mediator interface { + // Stop stop the Mediator + Stop() + + // RegisterMsgHook registers a message hook to a specific type of pull message + RegisterMsgHook(PullMsgType, MessageHook) + + // Add adds a GossipMessage to the Mediator + Add(*proto.GossipMessage) + + // Remove removes a GossipMessage from the Mediator + Remove(*proto.GossipMessage) + + // HandleMessage handles a message from some remote peer + HandleMessage(msg comm.ReceivedMessage) +} + +// pullStoreImpl is an implementation of PullStore +type pullMediatorImpl struct { + msgType2Hook map[PullMsgType][]MessageHook + idExtractor proto.IdentifierExtractor + msgCons proto.MsgConsumer + config PullConfig + logger *util.Logger + sync.RWMutex + itemId2msg map[string]*proto.GossipMessage + Sender + memBvc MembershipService + engine *algo.PullEngine +} + +func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator { + p := &pullMediatorImpl{ + msgCons: msgCons, + msgType2Hook: make(map[PullMsgType][]MessageHook), + idExtractor: idExtractor, + config: config, + logger: util.GetLogger("Pull", config.Id), + itemId2msg: make(map[string]*proto.GossipMessage), + memBvc: memSvc, + Sender: sndr, + } + p.engine = algo.NewPullEngine(p, config.PullInterval) + return p +} + +func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) { + if m.GetGossipMessage() == nil || !m.GetGossipMessage().IsPullMsg() { + return + } + msg := m.GetGossipMessage() + p.logger.Debug(msg) + + itemIds := []string{} + items := []*proto.GossipMessage{} + var pullMsgType PullMsgType + + if helloMsg := msg.GetHello(); helloMsg != nil { + if helloMsg.MsgType != p.config.MsgType { + return + } + pullMsgType = HelloMsgType + p.engine.OnHello(helloMsg.Nonce, m) + } + if digest := msg.GetDataDig(); digest != nil { + if digest.MsgType != p.config.MsgType { + return + } + itemIds = digest.Digests + pullMsgType = DigestMsgType + p.engine.OnDigest(digest.Digests, digest.Nonce, m) + } + if req := msg.GetDataReq(); req != nil { + if req.MsgType != p.config.MsgType { + return + } + itemIds = req.Digests + pullMsgType = RequestMsgType + p.engine.OnReq(req.Digests, req.Nonce, m) + } + if res := msg.GetDataUpdate(); res != nil { + if res.MsgType != p.config.MsgType { + return + } + itemIds = make([]string, len(res.Data)) + items = make([]*proto.GossipMessage, len(res.Data)) + pullMsgType = ResponseMsgType + for i, pulledMsg := range res.Data { + p.msgCons(pulledMsg) + itemIds[i] = p.idExtractor(pulledMsg) + items[i] = pulledMsg + } + p.engine.OnRes(itemIds, res.Nonce) + } + + // Invoke hooks for relevant message type + for _, h := range p.hooksByMsgType(pullMsgType) { + h(itemIds, items, m) + } +} + +func (p *pullMediatorImpl) Stop() { + p.engine.Stop() +} + +// RegisterMsgHook registers a message hook to a specific type of pull message +func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType PullMsgType, hook MessageHook) { + p.Lock() + defer p.Unlock() + p.msgType2Hook[pullMsgType] = append(p.msgType2Hook[pullMsgType], hook) + +} + +// Add adds a GossipMessage to the store +func (p *pullMediatorImpl) Add(msg *proto.GossipMessage) { + p.Lock() + defer p.Unlock() + itemId := p.idExtractor(msg) + p.itemId2msg[itemId] = msg + p.engine.Add(itemId) +} + +// Remove removes a GossipMessage from the store +func (p *pullMediatorImpl) Remove(msg *proto.GossipMessage) { + p.Lock() + defer p.Unlock() + itemId := p.idExtractor(msg) + delete(p.itemId2msg, itemId) + p.engine.Remove(itemId) +} + +// SelectPeers returns a slice of peers which the engine will initiate the protocol with +func (p *pullMediatorImpl) SelectPeers() []string { + remotePeers := SelectEndpoints(p.config.PeerCountToSelect, p.memBvc.GetMembership()) + endpoints := make([]string, len(remotePeers)) + for i, peer := range remotePeers { + endpoints[i] = peer.Endpoint + } + return endpoints +} + +// Hello sends a hello message to initiate the protocol +// and returns an NONCE that is expected to be returned +// in the digest message. +func (p *pullMediatorImpl) Hello(dest string, nonce uint64) { + helloMsg := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Content: &proto.GossipMessage_Hello{ + Hello: &proto.GossipHello{ + Nonce: nonce, + Metadata: nil, + MsgType: p.config.MsgType, + }, + }, + } + + p.logger.Debug("Sending hello to", dest) + p.Send(helloMsg, p.peersWithEndpoints(dest)...) +} + +// SendDigest sends a digest to a remote PullEngine. +// The context parameter specifies the remote engine to send to. +func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) { + digMsg := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataDig{ + DataDig: &proto.DataDigest{ + MsgType: p.config.MsgType, + Nonce: nonce, + Digests: digest, + }, + }, + } + p.logger.Debug("Sending digest", digMsg.GetDataDig().Digests) + context.(comm.ReceivedMessage).Respond(digMsg) +} + +// SendReq sends an array of items to a certain remote PullEngine identified +// by a string +func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) { + req := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataReq{ + DataReq: &proto.DataRequest{ + MsgType: p.config.MsgType, + Nonce: nonce, + Digests: items, + }, + }, + } + p.logger.Debug("Sending", req, "to", dest) + p.Send(req, p.peersWithEndpoints(dest)...) +} + +// SendRes sends an array of items to a remote PullEngine identified by a context. +func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) { + items2return := []*proto.GossipMessage{} + p.RLock() + defer p.RUnlock() + for _, item := range items { + if msg, exists := p.itemId2msg[item]; exists { + items2return = append(items2return, msg) + } + } + + returnedUpdate := &proto.GossipMessage{ + Channel: p.config.Channel, + Tag: p.config.Tag, + Nonce: 0, + Content: &proto.GossipMessage_DataUpdate{ + DataUpdate: &proto.DataUpdate{ + MsgType: p.config.MsgType, + Nonce: nonce, + Data: items2return, + }, + }, + } + p.logger.Debug("Sending", returnedUpdate, "to") + context.(comm.ReceivedMessage).Respond(returnedUpdate) +} + +func (p *pullMediatorImpl) peersWithEndpoints(endpoints ...string) []*comm.RemotePeer { + peers := []*comm.RemotePeer{} + for _, member := range p.memBvc.GetMembership() { + for _, endpoint := range endpoints { + if member.Endpoint == endpoint { + peers = append(peers, &comm.RemotePeer{Endpoint: member.Endpoint, PKIID: member.PKIid}) + } + } + } + return peers +} + +func (p *pullMediatorImpl) hooksByMsgType(msgType PullMsgType) []MessageHook { + p.RLock() + defer p.RUnlock() + returnedHooks := []MessageHook{} + for _, h := range p.msgType2Hook[msgType] { + returnedHooks = append(returnedHooks, h) + } + return returnedHooks +} + +func SelectEndpoints(k int, peerPool []discovery.NetworkMember) []*comm.RemotePeer { + if len(peerPool) < k { + k = len(peerPool) + } + + indices := util.GetRandomIndices(k, len(peerPool)-1) + endpoints := make([]*comm.RemotePeer, len(indices)) + for i, j := range indices { + endpoints[i] = &comm.RemotePeer{Endpoint: peerPool[j].Endpoint, PKIID: peerPool[j].PKIid} + } + return endpoints +} diff --git a/gossip/gossip/pull/pullstore_test.go b/gossip/gossip/pull/pullstore_test.go new file mode 100644 index 00000000000..1cc650e5f9e --- /dev/null +++ b/gossip/gossip/pull/pullstore_test.go @@ -0,0 +1,306 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pull + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" + "github.com/stretchr/testify/assert" +) + +var pullInterval time.Duration +var timeoutInterval = 10 * time.Second + +func init() { + pullInterval = time.Duration(500) * time.Millisecond + algo.SetDigestWaitTime(pullInterval / 5) + algo.SetRequestWaitTime(pullInterval) + algo.SetResponseWaitTime(pullInterval) +} + +type pullMsg struct { + respondChan chan *pullMsg + msg *proto.GossipMessage +} + +func (pm *pullMsg) Respond(msg *proto.GossipMessage) { + pm.respondChan <- &pullMsg{ + msg: msg, + respondChan: pm.respondChan, + } +} + +func (pm *pullMsg) GetGossipMessage() *proto.GossipMessage { + return pm.msg +} + +type pullInstance struct { + self discovery.NetworkMember + mediator Mediator + items *util.Set + msgChan chan *pullMsg + peer2PullInst map[string]*pullInstance + stopChan chan struct{} +} + +func (p *pullInstance) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) { + for _, peer := range peers { + m := &pullMsg{ + respondChan: p.msgChan, + msg: msg, + } + p.peer2PullInst[peer.Endpoint].msgChan <- m + } +} + +func (p *pullInstance) GetMembership() []discovery.NetworkMember { + members := []discovery.NetworkMember{} + for _, peer := range p.peer2PullInst { + members = append(members, peer.self) + } + return members +} + +func (p *pullInstance) stop() { + p.mediator.Stop() + p.stopChan <- struct{}{} +} + +func (p *pullInstance) wrapPullMsg(msg *proto.GossipMessage) comm.ReceivedMessage { + return &pullMsg{ + msg: msg, + respondChan: p.msgChan, + } +} + +func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) *pullInstance { + inst := &pullInstance{ + items: util.NewSet(), + stopChan: make(chan struct{}), + peer2PullInst: peer2PullInst, + self: discovery.NetworkMember{Endpoint: endpoint, Metadata: []byte{}, PKIid: []byte(endpoint)}, + msgChan: make(chan *pullMsg, 10), + } + + peer2PullInst[endpoint] = inst + + conf := PullConfig{ + MsgType: proto.MsgType_BlockMessage, + Channel: []byte(""), + Id: endpoint, + PeerCountToSelect: 3, + PullInterval: pullInterval, + Tag: proto.GossipMessage_EMPTY, + } + seqNumFromMsg := func(msg *proto.GossipMessage) string { + dataMsg := msg.GetDataMsg() + if dataMsg == nil { + return "" + } + if dataMsg.Payload == nil { + return "" + } + return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) + } + blockConsumer := func(msg *proto.GossipMessage) { + inst.items.Add(msg.GetDataMsg().Payload.SeqNum) + } + inst.mediator = NewPullMediator(conf, inst, inst, seqNumFromMsg, blockConsumer) + go func() { + for { + select { + case <-inst.stopChan: + return + case msg := <-inst.msgChan: + inst.mediator.HandleMessage(msg) + } + } + }() + return inst +} + +func TestCreateAndStop(t *testing.T) { + t.Parallel() + pullInst := createPullInstance("localhost:2000", make(map[string]*pullInstance)) + pullInst.stop() +} + +func TestRegisterMsgHook(t *testing.T) { + t.Parallel() + peer2pullInst := make(map[string]*pullInstance) + inst1 := createPullInstance("localhost:5611", peer2pullInst) + inst2 := createPullInstance("localhost:5612", peer2pullInst) + defer inst1.stop() + defer inst2.stop() + + receivedMsgTypes := util.NewSet() + + for _, msgType := range []PullMsgType{HelloMsgType, DigestMsgType, RequestMsgType, ResponseMsgType} { + mType := msgType + inst1.mediator.RegisterMsgHook(mType, func(_ []string, items []*proto.GossipMessage, _ comm.ReceivedMessage) { + receivedMsgTypes.Add(mType) + }) + } + + inst1.mediator.Add(dataMsg(1)) + inst2.mediator.Add(dataMsg(2)) + + // Ensure all message types are received + waitUntilOrFail(t, func() bool { return len(receivedMsgTypes.ToArray()) == 4 }) + +} + +func TestAddAndRemove(t *testing.T) { + t.Parallel() + peer2pullInst := make(map[string]*pullInstance) + inst1 := createPullInstance("localhost:5611", peer2pullInst) + inst2 := createPullInstance("localhost:5612", peer2pullInst) + defer inst1.stop() + defer inst2.stop() + + msgCount := 3 + + go func() { + for i := 0; i < msgCount; i++ { + time.Sleep(pullInterval) + inst1.mediator.Add(dataMsg(i)) + } + }() + + // Ensure instance 2 got all messages + waitUntilOrFail(t, func() bool { return len(inst2.items.ToArray()) == msgCount }) + + // Remove message 0 from both instances + inst2.mediator.Remove(dataMsg(0)) + inst1.mediator.Remove(dataMsg(0)) + inst2.items.Remove(uint64(0)) + + // Add a message to inst1 + inst1.mediator.Add(dataMsg(10)) + + // Ensure instance 2 got new message + waitUntilOrFail(t, func() bool { return inst2.items.Exists(uint64(10)) }) + + // Ensure instance 2 doesn't have message 0 + assert.False(t, inst2.items.Exists(uint64(0)), "Instance 2 has message 0 but shouldn't have") +} + +func TestHandleMessage(t *testing.T) { + t.Parallel() + inst1 := createPullInstance("localhost:5611", make(map[string]*pullInstance)) + inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance)) + defer inst1.stop() + defer inst2.stop() + + inst2.mediator.Add(dataMsg(0)) + inst2.mediator.Add(dataMsg(1)) + inst2.mediator.Add(dataMsg(2)) + + inst1ReceivedDigest := int32(0) + inst1ReceivedResponse := int32(0) + + inst1.mediator.RegisterMsgHook(DigestMsgType, func(itemIds []string, _ []*proto.GossipMessage, _ comm.ReceivedMessage) { + atomic.StoreInt32(&inst1ReceivedDigest, int32(1)) + assert.True(t, len(itemIds) == 3) + }) + + inst1.mediator.RegisterMsgHook(ResponseMsgType, func(_ []string, items []*proto.GossipMessage, _ comm.ReceivedMessage) { + atomic.StoreInt32(&inst1ReceivedResponse, int32(1)) + assert.True(t, len(items) == 3) + }) + + // inst1 sends hello to inst2 + inst2.mediator.HandleMessage(inst1.wrapPullMsg(helloMsg())) + + // inst2 is expected to send digest to inst1 + waitUntilOrFail(t, func() bool { return atomic.LoadInt32(&inst1ReceivedDigest) == int32(1) }) + + // inst1 sends request to inst2 + inst2.mediator.HandleMessage(inst1.wrapPullMsg(reqMsg("0", "1", "2"))) + + // inst2 is expected to send response to inst1 + waitUntilOrFail(t, func() bool { return atomic.LoadInt32(&inst1ReceivedResponse) == int32(1) }) + assert.True(t, inst1.items.Exists(uint64(0))) + assert.True(t, inst1.items.Exists(uint64(1))) + assert.True(t, inst1.items.Exists(uint64(2))) +} + +func waitUntilOrFail(t *testing.T, pred func() bool) { + start := time.Now() + limit := start.UnixNano() + timeoutInterval.Nanoseconds() + for time.Now().UnixNano() < limit { + if pred() { + return + } + time.Sleep(timeoutInterval / 60) + } + util.PrintStackTrace() + assert.Fail(t, "Timeout expired!") +} + +func dataMsg(seqNum int) *proto.GossipMessage { + return &proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_DataMsg{ + DataMsg: &proto.DataMessage{ + Payload: &proto.Payload{ + Data: []byte{}, + Hash: "", + SeqNum: uint64(seqNum), + }, + }, + }, + } +} + +func helloMsg() *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: []byte(""), + Tag: proto.GossipMessage_EMPTY, + Content: &proto.GossipMessage_Hello{ + Hello: &proto.GossipHello{ + Nonce: 0, + Metadata: nil, + MsgType: proto.MsgType_BlockMessage, + }, + }, + } +} + +func reqMsg(digest ...string) *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: []byte(""), + Tag: proto.GossipMessage_EMPTY, + Nonce: 0, + Content: &proto.GossipMessage_DataReq{ + DataReq: &proto.DataRequest{ + MsgType: proto.MsgType_BlockMessage, + Nonce: 0, + Digests: digest, + }, + }, + } +} diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index 0f5b91b212e..c771f63c08c 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -27,6 +27,8 @@ import ( "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" "google.golang.org/grpc" + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/common" ) // This file is used to bootstrap a gossip instance for integration/demo purposes ONLY @@ -63,7 +65,7 @@ func newComm(selfEndpoint string, s *grpc.Server, dialOpts ...grpc.DialOption) c func NewGossipComponent(endpoint string, s *grpc.Server, bootPeers ...string) (gossip.Gossip, comm.Comm) { conf := newConfig(endpoint, bootPeers...) comm := newComm(endpoint, s, grpc.WithInsecure()) - return gossip.NewGossipService(conf, comm, NewGossipCryptoService()), comm + return gossip.NewGossipService(conf, comm, &naiveCryptoService{}, NewGossipCryptoService(), api.PeerIdentityType(conf.ID)), comm } // GossipCryptoService is an interface that conforms to both @@ -102,6 +104,13 @@ func (cs *naiveCryptoServiceImpl) ValidateAliveMsg(*proto.AliveMessage) bool { return true } +func (cs *naiveCryptoServiceImpl) Verify(vkID, signature, message []byte) error { + if ! bytes.Equal(signature, message) { + return fmt.Errorf("Wrong signature") + } + return nil +} + // SignMessage signs an AliveMessage and updates its signature field func (cs *naiveCryptoServiceImpl) SignMessage(msg *proto.AliveMessage) *proto.AliveMessage { return msg @@ -117,10 +126,49 @@ func (cs *naiveCryptoServiceImpl) Sign(msg []byte) ([]byte, error) { return msg, nil } -// Verify verifies a signature on a message that came from a peer with a certain vkID -func (cs *naiveCryptoServiceImpl) Verify(vkID, signature, message []byte) error { - if !bytes.Equal(signature, message) { - return fmt.Errorf("Invalid signature!") +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } return nil } + +type naiveCryptoService struct { +} + +func (*naiveCryptoService) ValidateAliveMsg(am *proto.AliveMessage) bool { + return true +} + +func (*naiveCryptoService) SignMessage(am *proto.AliveMessage) *proto.AliveMessage { + return am +} + +func (*naiveCryptoService) IsEnabled() bool { + return true +} + +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. +func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { + return msg, nil +} \ No newline at end of file diff --git a/gossip/proto/compare.go b/gossip/proto/extensions.go similarity index 80% rename from gossip/proto/compare.go rename to gossip/proto/extensions.go index b4024912002..a6ff5400e16 100644 --- a/gossip/proto/compare.go +++ b/gossip/proto/extensions.go @@ -53,6 +53,10 @@ func (mc *msgComparator) invalidationPolicy(this interface{}, that interface{}) return mc.stateInvalidationPolicy(thisMsg.GetStateInfo(), thatMsg.GetStateInfo()) } + if thisMsg.IsIdentityMsg() && thatMsg.IsIdentityMsg() { + return mc.identityInvalidationPolicy(thisMsg.GetPeerIdentity(), thatMsg.GetPeerIdentity()) + } + return common.MessageNoAction } @@ -63,6 +67,14 @@ func (mc *msgComparator) stateInvalidationPolicy(thisStateMsg *StateInfo, thatSt return compareTimestamps(thisStateMsg.Timestamp, thatStateMsg.Timestamp) } +func (mc *msgComparator) identityInvalidationPolicy(thisIdentityMsg *PeerIdentity, thatIdentityMsg *PeerIdentity) common.InvalidationResult { + if bytes.Equal(thisIdentityMsg.PkiID, thatIdentityMsg.PkiID) { + return common.MessageInvalidated + } + + return common.MessageNoAction +} + func (mc *msgComparator) dataInvalidationPolicy(thisDataMsg *DataMessage, thatDataMsg *DataMessage) common.InvalidationResult { if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum { if thisDataMsg.Payload.Hash == thatDataMsg.Payload.Hash { @@ -115,3 +127,18 @@ func (m *GossipMessage) IsDataMsg() bool { func (m *GossipMessage) IsStateInfoMsg() bool { return m.GetStateInfo() != nil } + +func (m *GossipMessage) IsIdentityMsg() bool { + return m.GetPeerIdentity() != nil +} + +func (m *GossipMessage) IsPullMsg() bool { + return m.GetDataReq() != nil || m.GetDataUpdate() != nil || + m.GetHello() != nil || m.GetDataDig() != nil +} + +// MsgConsumer invokes code given a GossipMessage +type MsgConsumer func(*GossipMessage) + +// IdentifierExtractor extracts from a GossipMessage an identifier +type IdentifierExtractor func(*GossipMessage) string diff --git a/gossip/proto/message.pb.go b/gossip/proto/message.pb.go index 6bfa80bcf1d..41297084f0d 100644 --- a/gossip/proto/message.pb.go +++ b/gossip/proto/message.pb.go @@ -13,6 +13,7 @@ It has these top-level messages: StateInfo ChannelCommand ConnEstablish + PeerIdentity DataRequest GossipHello DataUpdate @@ -56,15 +57,18 @@ type MsgType int32 const ( MsgType_UNDEFINED MsgType = 0 MsgType_BlockMessage MsgType = 1 + MsgType_IdentityMsg MsgType = 2 ) var MsgType_name = map[int32]string{ 0: "UNDEFINED", 1: "BlockMessage", + 2: "IdentityMsg", } var MsgType_value = map[string]int32{ "UNDEFINED": 0, "BlockMessage": 1, + "IdentityMsg": 2, } func (x MsgType) String() string { @@ -130,6 +134,7 @@ type GossipMessage struct { // *GossipMessage_StateRequest // *GossipMessage_StateResponse // *GossipMessage_LeadershipMsg + // *GossipMessage_PeerIdentity Content isGossipMessage_Content `protobuf_oneof:"content"` } @@ -187,6 +192,9 @@ type GossipMessage_StateResponse struct { type GossipMessage_LeadershipMsg struct { LeadershipMsg *LeadershipMessage `protobuf:"bytes,18,opt,name=leadershipMsg,oneof"` } +type GossipMessage_PeerIdentity struct { + PeerIdentity *PeerIdentity `protobuf:"bytes,19,opt,name=peerIdentity,oneof"` +} func (*GossipMessage_AliveMsg) isGossipMessage_Content() {} func (*GossipMessage_MemReq) isGossipMessage_Content() {} @@ -203,6 +211,7 @@ func (*GossipMessage_StateInfo) isGossipMessage_Content() {} func (*GossipMessage_StateRequest) isGossipMessage_Content() {} func (*GossipMessage_StateResponse) isGossipMessage_Content() {} func (*GossipMessage_LeadershipMsg) isGossipMessage_Content() {} +func (*GossipMessage_PeerIdentity) isGossipMessage_Content() {} func (m *GossipMessage) GetContent() isGossipMessage_Content { if m != nil { @@ -316,6 +325,13 @@ func (m *GossipMessage) GetLeadershipMsg() *LeadershipMessage { return nil } +func (m *GossipMessage) GetPeerIdentity() *PeerIdentity { + if x, ok := m.GetContent().(*GossipMessage_PeerIdentity); ok { + return x.PeerIdentity + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffer) error, func(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error), func(msg proto1.Message) (n int), []interface{}) { return _GossipMessage_OneofMarshaler, _GossipMessage_OneofUnmarshaler, _GossipMessage_OneofSizer, []interface{}{ @@ -334,6 +350,7 @@ func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffe (*GossipMessage_StateRequest)(nil), (*GossipMessage_StateResponse)(nil), (*GossipMessage_LeadershipMsg)(nil), + (*GossipMessage_PeerIdentity)(nil), } } @@ -416,6 +433,11 @@ func _GossipMessage_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { if err := b.EncodeMessage(x.LeadershipMsg); err != nil { return err } + case *GossipMessage_PeerIdentity: + b.EncodeVarint(19<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.PeerIdentity); err != nil { + return err + } case nil: default: return fmt.Errorf("GossipMessage.Content has unexpected type %T", x) @@ -546,6 +568,14 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_LeadershipMsg{msg} return true, err + case 19: // content.peerIdentity + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(PeerIdentity) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_PeerIdentity{msg} + return true, err default: return false, nil } @@ -630,6 +660,11 @@ func _GossipMessage_OneofSizer(msg proto1.Message) (n int) { n += proto1.SizeVarint(18<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s + case *GossipMessage_PeerIdentity: + s := proto1.Size(x.PeerIdentity) + n += proto1.SizeVarint(19<<3 | proto1.WireBytes) + n += proto1.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -675,6 +710,7 @@ func (*ChannelCommand) Descriptor() ([]byte, []int) { return fileDescriptor0, [] type ConnEstablish struct { Sig []byte `protobuf:"bytes,1,opt,name=sig,proto3" json:"sig,omitempty"` PkiID []byte `protobuf:"bytes,2,opt,name=pkiID,proto3" json:"pkiID,omitempty"` + Cert []byte `protobuf:"bytes,3,opt,name=cert,proto3" json:"cert,omitempty"` } func (m *ConnEstablish) Reset() { *m = ConnEstablish{} } @@ -682,6 +718,21 @@ func (m *ConnEstablish) String() string { return proto1.CompactTextSt func (*ConnEstablish) ProtoMessage() {} func (*ConnEstablish) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +// PeerIdentity defines the identity of the peer +// Used to make other peers learn of the identity +// of a certain peer +type PeerIdentity struct { + Sig []byte `protobuf:"bytes,1,opt,name=sig,proto3" json:"sig,omitempty"` + PkiID []byte `protobuf:"bytes,2,opt,name=pkiID,proto3" json:"pkiID,omitempty"` + Cert []byte `protobuf:"bytes,3,opt,name=cert,proto3" json:"cert,omitempty"` + Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (m *PeerIdentity) Reset() { *m = PeerIdentity{} } +func (m *PeerIdentity) String() string { return proto1.CompactTextString(m) } +func (*PeerIdentity) ProtoMessage() {} +func (*PeerIdentity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + // DataRequest is a message used for a peer to request // certain data blocks from a remote peer type DataRequest struct { @@ -693,7 +744,7 @@ type DataRequest struct { func (m *DataRequest) Reset() { *m = DataRequest{} } func (m *DataRequest) String() string { return proto1.CompactTextString(m) } func (*DataRequest) ProtoMessage() {} -func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } // GossipHello is the message that is used for the peer to initiate // a pull round with another peer @@ -706,7 +757,7 @@ type GossipHello struct { func (m *GossipHello) Reset() { *m = GossipHello{} } func (m *GossipHello) String() string { return proto1.CompactTextString(m) } func (*GossipHello) ProtoMessage() {} -func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } // DataUpdate is the the final message in the pull phase // sent from the receiver to the initiator @@ -719,7 +770,7 @@ type DataUpdate struct { func (m *DataUpdate) Reset() { *m = DataUpdate{} } func (m *DataUpdate) String() string { return proto1.CompactTextString(m) } func (*DataUpdate) ProtoMessage() {} -func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (m *DataUpdate) GetData() []*GossipMessage { if m != nil { @@ -739,7 +790,7 @@ type DataDigest struct { func (m *DataDigest) Reset() { *m = DataDigest{} } func (m *DataDigest) String() string { return proto1.CompactTextString(m) } func (*DataDigest) ProtoMessage() {} -func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } // DataMessage is the message that contains a block type DataMessage struct { @@ -749,7 +800,7 @@ type DataMessage struct { func (m *DataMessage) Reset() { *m = DataMessage{} } func (m *DataMessage) String() string { return proto1.CompactTextString(m) } func (*DataMessage) ProtoMessage() {} -func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } +func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } func (m *DataMessage) GetPayload() *Payload { if m != nil { @@ -768,7 +819,7 @@ type Payload struct { func (m *Payload) Reset() { *m = Payload{} } func (m *Payload) String() string { return proto1.CompactTextString(m) } func (*Payload) ProtoMessage() {} -func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } // AliveMessage is sent to inform remote peers // of a peer's existence and activity @@ -776,12 +827,13 @@ type AliveMessage struct { Membership *Member `protobuf:"bytes,1,opt,name=membership" json:"membership,omitempty"` Timestamp *PeerTime `protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"` Signature []byte `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty"` + Identity []byte `protobuf:"bytes,4,opt,name=identity,proto3" json:"identity,omitempty"` } func (m *AliveMessage) Reset() { *m = AliveMessage{} } func (m *AliveMessage) String() string { return proto1.CompactTextString(m) } func (*AliveMessage) ProtoMessage() {} -func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } +func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } func (m *AliveMessage) GetMembership() *Member { if m != nil { @@ -808,7 +860,7 @@ type LeadershipMessage struct { func (m *LeadershipMessage) Reset() { *m = LeadershipMessage{} } func (m *LeadershipMessage) String() string { return proto1.CompactTextString(m) } func (*LeadershipMessage) ProtoMessage() {} -func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } +func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } func (m *LeadershipMessage) GetMembership() *Member { if m != nil { @@ -833,7 +885,7 @@ type PeerTime struct { func (m *PeerTime) Reset() { *m = PeerTime{} } func (m *PeerTime) String() string { return proto1.CompactTextString(m) } func (*PeerTime) ProtoMessage() {} -func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } +func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } // MembershipRequest is used to ask membership information // from a remote peer @@ -845,7 +897,7 @@ type MembershipRequest struct { func (m *MembershipRequest) Reset() { *m = MembershipRequest{} } func (m *MembershipRequest) String() string { return proto1.CompactTextString(m) } func (*MembershipRequest) ProtoMessage() {} -func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } +func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } func (m *MembershipRequest) GetSelfInformation() *AliveMessage { if m != nil { @@ -863,7 +915,7 @@ type MembershipResponse struct { func (m *MembershipResponse) Reset() { *m = MembershipResponse{} } func (m *MembershipResponse) String() string { return proto1.CompactTextString(m) } func (*MembershipResponse) ProtoMessage() {} -func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } +func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } func (m *MembershipResponse) GetAlive() []*AliveMessage { if m != nil { @@ -890,7 +942,7 @@ type Member struct { func (m *Member) Reset() { *m = Member{} } func (m *Member) String() string { return proto1.CompactTextString(m) } func (*Member) ProtoMessage() {} -func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } +func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } // Empty is used for pinging and in tests type Empty struct { @@ -899,7 +951,7 @@ type Empty struct { func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) String() string { return proto1.CompactTextString(m) } func (*Empty) ProtoMessage() {} -func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } // RemoteStateRequest is used to ask a set of blocks // from a remote peer @@ -910,7 +962,7 @@ type RemoteStateRequest struct { func (m *RemoteStateRequest) Reset() { *m = RemoteStateRequest{} } func (m *RemoteStateRequest) String() string { return proto1.CompactTextString(m) } func (*RemoteStateRequest) ProtoMessage() {} -func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } +func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } // RemoteStateResponse is used to send a set of blocks // to a remote peer @@ -921,7 +973,7 @@ type RemoteStateResponse struct { func (m *RemoteStateResponse) Reset() { *m = RemoteStateResponse{} } func (m *RemoteStateResponse) String() string { return proto1.CompactTextString(m) } func (*RemoteStateResponse) ProtoMessage() {} -func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } +func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } func (m *RemoteStateResponse) GetPayloads() []*Payload { if m != nil { @@ -935,6 +987,7 @@ func init() { proto1.RegisterType((*StateInfo)(nil), "proto.StateInfo") proto1.RegisterType((*ChannelCommand)(nil), "proto.ChannelCommand") proto1.RegisterType((*ConnEstablish)(nil), "proto.ConnEstablish") + proto1.RegisterType((*PeerIdentity)(nil), "proto.PeerIdentity") proto1.RegisterType((*DataRequest)(nil), "proto.DataRequest") proto1.RegisterType((*GossipHello)(nil), "proto.GossipHello") proto1.RegisterType((*DataUpdate)(nil), "proto.DataUpdate") @@ -1099,73 +1152,77 @@ var _Gossip_serviceDesc = grpc.ServiceDesc{ func init() { proto1.RegisterFile("message.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1073 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0x5f, 0x4f, 0xe3, 0x46, - 0x10, 0x8f, 0xf3, 0x17, 0x4f, 0x1c, 0x08, 0x73, 0xd7, 0xca, 0x45, 0xad, 0x84, 0xac, 0x93, 0x9a, - 0x46, 0x25, 0xdc, 0xc1, 0xc3, 0x3d, 0x55, 0x3d, 0x20, 0x94, 0x20, 0x1d, 0x01, 0x2d, 0x5c, 0xab, - 0xeb, 0x0b, 0x5a, 0xe2, 0xc5, 0xb1, 0x88, 0xd7, 0x26, 0x6b, 0xae, 0xe2, 0x2b, 0xf4, 0xa5, 0x5f, - 0xad, 0x1f, 0xa9, 0xda, 0x3f, 0x76, 0x6c, 0x12, 0xaa, 0xde, 0x43, 0x75, 0x4f, 0xf1, 0xcc, 0xfe, - 0x7e, 0x3b, 0xb3, 0xbb, 0xf3, 0x9b, 0x09, 0x74, 0x22, 0x26, 0x04, 0x0d, 0xd8, 0x20, 0x99, 0xc7, - 0x69, 0x8c, 0x0d, 0xf5, 0xe3, 0xfd, 0xdd, 0x82, 0xce, 0x49, 0x2c, 0x44, 0x98, 0x9c, 0xe9, 0x65, - 0x7c, 0x09, 0x0d, 0x1e, 0xf3, 0x09, 0x73, 0xad, 0x6d, 0xab, 0x57, 0x27, 0xda, 0x40, 0x17, 0x5a, - 0x93, 0x29, 0xe5, 0x9c, 0xcd, 0xdc, 0xea, 0xb6, 0xd5, 0x73, 0x48, 0x66, 0x62, 0x1f, 0x6a, 0x29, - 0x0d, 0xdc, 0xda, 0xb6, 0xd5, 0x5b, 0xdf, 0x73, 0xf5, 0xee, 0x83, 0xd2, 0x96, 0x83, 0x2b, 0x1a, - 0x10, 0x09, 0xc2, 0x37, 0xb0, 0x46, 0x67, 0xe1, 0x27, 0x76, 0x26, 0x02, 0xb7, 0xbe, 0x6d, 0xf5, - 0xda, 0x7b, 0x2f, 0x0c, 0xe1, 0x40, 0xb9, 0x35, 0x7e, 0x54, 0x21, 0x39, 0x0c, 0xf7, 0xa0, 0x19, - 0xb1, 0x88, 0xb0, 0x7b, 0xb7, 0xa1, 0x08, 0x59, 0x84, 0x33, 0x16, 0xdd, 0xb0, 0xb9, 0x98, 0x86, - 0x09, 0x61, 0xf7, 0x0f, 0x4c, 0xa4, 0xa3, 0x0a, 0x31, 0x48, 0xdc, 0x37, 0x1c, 0xe1, 0x36, 0x15, - 0xe7, 0x9b, 0x15, 0x1c, 0x91, 0xc4, 0x5c, 0xb0, 0x9c, 0x24, 0x70, 0x00, 0x2d, 0x9f, 0xa6, 0x54, - 0xa6, 0xd6, 0x52, 0x2c, 0x34, 0xac, 0xa1, 0xf4, 0xe6, 0x99, 0x65, 0x20, 0xec, 0x43, 0x63, 0xca, - 0x66, 0xb3, 0xd8, 0xfd, 0xad, 0x84, 0xd6, 0x27, 0x1f, 0xc9, 0x95, 0x51, 0x85, 0x68, 0x08, 0xee, - 0xe8, 0xbd, 0x87, 0x61, 0xe0, 0xda, 0x0a, 0xbd, 0x59, 0xd8, 0x7b, 0x18, 0x06, 0x3a, 0xfd, 0x0c, - 0x93, 0xa5, 0x22, 0x0f, 0x0d, 0x4b, 0xa9, 0x2c, 0x8e, 0x9b, 0x81, 0x70, 0x1f, 0x40, 0x7e, 0x7e, - 0x48, 0x7c, 0x9a, 0x32, 0xb7, 0xbd, 0x14, 0x41, 0x2f, 0x8c, 0x2a, 0xa4, 0x00, 0xc3, 0x37, 0xfa, - 0x45, 0x8f, 0x22, 0xdf, 0x75, 0x14, 0xe3, 0x2b, 0xc3, 0x38, 0xd2, 0x0f, 0x7b, 0x14, 0x47, 0x11, - 0xe5, 0xbe, 0x8c, 0x63, 0x70, 0xf8, 0x0a, 0x1a, 0x2c, 0x4a, 0xd2, 0x47, 0xb7, 0xa3, 0x08, 0x8e, - 0x21, 0x1c, 0x4b, 0x9f, 0x3c, 0xac, 0x5a, 0xc4, 0x3e, 0xd4, 0x27, 0x31, 0xe7, 0xee, 0xba, 0x02, - 0xbd, 0xcc, 0x76, 0x8d, 0x39, 0x3f, 0x16, 0x29, 0xbd, 0x99, 0x85, 0x62, 0x3a, 0xaa, 0x10, 0x85, - 0xc1, 0xd7, 0x60, 0x8b, 0x94, 0xa6, 0xec, 0x94, 0xdf, 0xc6, 0xee, 0x86, 0x22, 0x74, 0x0d, 0xe1, - 0x32, 0xf3, 0x8f, 0x2a, 0x64, 0x01, 0xc2, 0x9f, 0xc1, 0x51, 0x86, 0xb9, 0x06, 0xb7, 0x5b, 0x7a, - 0x61, 0xc2, 0xa2, 0x38, 0x65, 0x97, 0x05, 0xc0, 0xa8, 0x42, 0x4a, 0x04, 0x3c, 0x84, 0x8e, 0xb1, - 0x75, 0x09, 0xb8, 0x9b, 0x6a, 0x87, 0xad, 0x55, 0x3b, 0xe4, 0x45, 0x52, 0xa6, 0xe0, 0x3b, 0xe8, - 0xcc, 0x18, 0xf5, 0x75, 0x2d, 0xc9, 0x8a, 0xc1, 0x52, 0x6d, 0xbe, 0x5f, 0xac, 0xe5, 0x75, 0x53, - 0x26, 0x78, 0x63, 0xa8, 0x5d, 0xd1, 0x00, 0x3b, 0x60, 0x7f, 0x18, 0x0f, 0x8f, 0x7f, 0x39, 0x1d, - 0x1f, 0x0f, 0xbb, 0x15, 0xb4, 0xa1, 0x71, 0x7c, 0x76, 0x71, 0xf5, 0xb1, 0x6b, 0xa1, 0x03, 0x6b, - 0xe7, 0xe4, 0xe4, 0xfa, 0x7c, 0xfc, 0xfe, 0x63, 0xb7, 0x2a, 0x71, 0x47, 0xa3, 0x83, 0xb1, 0x36, - 0x6b, 0xd8, 0x05, 0x47, 0x99, 0x07, 0xe3, 0xe1, 0xf5, 0x39, 0x39, 0xe9, 0xd6, 0x0f, 0x6d, 0x68, - 0x4d, 0x62, 0x9e, 0x32, 0x9e, 0x7a, 0x7f, 0x5a, 0x60, 0xe7, 0x97, 0x87, 0x5b, 0xb0, 0x16, 0xb1, - 0x94, 0xca, 0x87, 0x57, 0x8a, 0x76, 0x48, 0x6e, 0xe3, 0x0e, 0xd8, 0x69, 0x18, 0x31, 0x91, 0xd2, - 0x28, 0x51, 0xb2, 0x6e, 0xef, 0x6d, 0x98, 0x23, 0x5c, 0x30, 0x36, 0xbf, 0x0a, 0x23, 0x46, 0x16, - 0x08, 0xd9, 0x19, 0x92, 0xbb, 0xf0, 0x74, 0xa8, 0xb4, 0xee, 0x10, 0x6d, 0xe0, 0xb7, 0x60, 0x8b, - 0x30, 0xe0, 0x34, 0x7d, 0x98, 0x33, 0x25, 0x6a, 0x87, 0x2c, 0x1c, 0x5e, 0x1f, 0xd6, 0xcb, 0xf5, - 0x24, 0x3b, 0x49, 0x42, 0x1f, 0x67, 0x31, 0xf5, 0x4d, 0x3e, 0x99, 0xe9, 0xbd, 0x85, 0x4e, 0xa9, - 0x4a, 0xb0, 0x0b, 0x35, 0x11, 0x06, 0x06, 0x26, 0x3f, 0x17, 0x29, 0x54, 0x0b, 0x29, 0x78, 0x01, - 0xb4, 0x0b, 0xca, 0x78, 0xbe, 0x83, 0xf9, 0x4a, 0x69, 0xc2, 0xad, 0x6e, 0xd7, 0x7a, 0x36, 0xc9, - 0x4c, 0xec, 0x41, 0x2b, 0x12, 0xc1, 0xd5, 0x63, 0xc2, 0x4c, 0x17, 0x5b, 0xcf, 0xfa, 0x85, 0xf6, - 0x92, 0x6c, 0xd9, 0x0b, 0xa1, 0x5d, 0xd0, 0xf7, 0x33, 0x81, 0x8a, 0x37, 0x5e, 0x7d, 0x72, 0xe3, - 0xff, 0x3d, 0xd4, 0x27, 0x80, 0x85, 0x74, 0x9f, 0x89, 0xd4, 0x83, 0xba, 0x89, 0x52, 0x2b, 0x28, - 0xad, 0xd4, 0x7b, 0x49, 0xfd, 0x33, 0xe3, 0xde, 0xea, 0xb8, 0xba, 0x29, 0xfd, 0x8f, 0x57, 0xf9, - 0x56, 0xbf, 0x59, 0x36, 0x75, 0x7a, 0xe5, 0xaa, 0x68, 0xe7, 0xc4, 0x0b, 0xed, 0x5d, 0x54, 0xc9, - 0x29, 0xb4, 0x8c, 0x0f, 0xbf, 0x86, 0xa6, 0x60, 0xf7, 0xe3, 0x87, 0xc8, 0xa4, 0x67, 0x2c, 0x44, - 0xa8, 0x4f, 0xa9, 0x98, 0xaa, 0xdb, 0xb7, 0x89, 0xfa, 0x96, 0x3e, 0x75, 0x57, 0xba, 0x76, 0xd5, - 0xb7, 0x54, 0x8a, 0x53, 0x1c, 0x3c, 0xb8, 0x03, 0x10, 0xe5, 0x33, 0xc2, 0x24, 0xd2, 0x29, 0x0d, - 0x0f, 0x52, 0x00, 0x7c, 0xae, 0x7e, 0x4a, 0x4a, 0xa9, 0x3d, 0x55, 0xca, 0x5f, 0x16, 0x6c, 0x2e, - 0x35, 0x8e, 0x2f, 0x9a, 0xd1, 0x01, 0xac, 0x65, 0x24, 0xfc, 0x0e, 0x20, 0xe4, 0x93, 0x6b, 0xfe, - 0x20, 0x43, 0x99, 0xeb, 0xb6, 0x43, 0x3e, 0x19, 0x2b, 0x47, 0xe1, 0x25, 0xaa, 0xc5, 0x97, 0xf0, - 0xa6, 0xb0, 0xb9, 0x34, 0xa8, 0xf1, 0x27, 0xd8, 0x10, 0x6c, 0x76, 0x2b, 0xdb, 0xd3, 0x3c, 0xa2, - 0x69, 0x18, 0x73, 0x73, 0xb0, 0x55, 0x7f, 0x06, 0xc8, 0x53, 0xac, 0xac, 0xc9, 0x3b, 0x1e, 0xff, - 0xc1, 0x55, 0xed, 0x39, 0x44, 0x1b, 0xde, 0x14, 0x70, 0x79, 0xbc, 0xe3, 0x0f, 0xd0, 0x50, 0xff, - 0x24, 0x5c, 0x4b, 0x49, 0x64, 0x65, 0x00, 0x8d, 0xc0, 0xef, 0xa1, 0xee, 0x33, 0xea, 0x1b, 0x31, - 0xad, 0x44, 0x2a, 0x80, 0xf7, 0x2b, 0x34, 0x75, 0x24, 0xa9, 0x74, 0xc6, 0xfd, 0x24, 0x0e, 0x79, - 0xaa, 0x4e, 0x60, 0x93, 0xdc, 0xfe, 0xd7, 0x2e, 0xb0, 0xb2, 0x91, 0x7a, 0x2d, 0x68, 0xa8, 0x49, - 0xea, 0x0d, 0x00, 0x97, 0xe7, 0x98, 0x14, 0x9d, 0xbe, 0x54, 0xa1, 0x0e, 0x53, 0x27, 0x99, 0xe9, - 0x1d, 0xc0, 0x8b, 0x15, 0x53, 0x0b, 0xfb, 0xb0, 0x66, 0x34, 0x23, 0xcc, 0xf1, 0x9f, 0x6a, 0x2a, - 0x5f, 0xef, 0xf7, 0xa1, 0x65, 0x14, 0xfa, 0x74, 0x24, 0x75, 0xc1, 0x39, 0x9c, 0xc5, 0x93, 0x3b, - 0x73, 0x07, 0x5d, 0x6b, 0x2f, 0x81, 0xa6, 0x6e, 0x31, 0xf8, 0x0e, 0x1c, 0xfd, 0x75, 0x99, 0xce, - 0x19, 0x8d, 0x70, 0x65, 0x07, 0xda, 0x5a, 0xe9, 0xf5, 0x2a, 0x3d, 0xeb, 0xb5, 0x85, 0xaf, 0xa0, - 0x7e, 0x11, 0xf2, 0x00, 0x4b, 0x7f, 0x25, 0xb6, 0x4a, 0x96, 0x57, 0x39, 0xfc, 0xf1, 0xf7, 0x7e, - 0x10, 0xa6, 0xd3, 0x87, 0x9b, 0xc1, 0x24, 0x8e, 0x76, 0xa7, 0x8f, 0x09, 0x9b, 0xcf, 0x98, 0x1f, - 0xb0, 0xf9, 0xee, 0x2d, 0xbd, 0x99, 0x87, 0x93, 0xdd, 0x40, 0x6d, 0xbd, 0xab, 0x58, 0x37, 0x4d, - 0xf5, 0xb3, 0xff, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc9, 0xb3, 0xdc, 0x93, 0xf1, 0x0a, 0x00, - 0x00, + // 1140 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0x6f, 0x4f, 0xe3, 0xc6, + 0x13, 0x8e, 0xf3, 0xdf, 0x13, 0x07, 0xc2, 0x70, 0xbf, 0x9f, 0x5c, 0xd4, 0x4a, 0xc8, 0x3a, 0xa9, + 0x69, 0x54, 0xc2, 0x1d, 0xbc, 0xa8, 0xaa, 0xaa, 0xea, 0x01, 0xa1, 0x04, 0xf5, 0x08, 0x68, 0xe1, + 0x5a, 0x5d, 0xdf, 0x20, 0x13, 0x2f, 0x8e, 0x45, 0xbc, 0x36, 0xd9, 0xe5, 0x2a, 0xbe, 0x42, 0xfb, + 0xa2, 0xdf, 0xa2, 0x9f, 0xb3, 0xda, 0x3f, 0x4e, 0x6c, 0x12, 0xaa, 0x9e, 0xaa, 0xaa, 0xaf, 0xec, + 0x99, 0x7d, 0x9e, 0x9d, 0xd9, 0xdd, 0x99, 0x67, 0x17, 0xda, 0x31, 0xe5, 0xdc, 0x0f, 0x69, 0x3f, + 0x9d, 0x25, 0x22, 0xc1, 0x9a, 0xfa, 0x78, 0xbf, 0x35, 0xa1, 0x7d, 0x92, 0x70, 0x1e, 0xa5, 0x67, + 0x7a, 0x18, 0x5f, 0x40, 0x8d, 0x25, 0x6c, 0x4c, 0x5d, 0x6b, 0xdb, 0xea, 0x56, 0x89, 0x36, 0xd0, + 0x85, 0xc6, 0x78, 0xe2, 0x33, 0x46, 0xa7, 0x6e, 0x79, 0xdb, 0xea, 0x3a, 0x24, 0x33, 0xb1, 0x07, + 0x15, 0xe1, 0x87, 0x6e, 0x65, 0xdb, 0xea, 0xae, 0xed, 0xb9, 0x7a, 0xf6, 0x7e, 0x61, 0xca, 0xfe, + 0x95, 0x1f, 0x12, 0x09, 0xc2, 0xd7, 0xd0, 0xf4, 0xa7, 0xd1, 0x07, 0x7a, 0xc6, 0x43, 0xb7, 0xba, + 0x6d, 0x75, 0x5b, 0x7b, 0x9b, 0x86, 0x70, 0xa0, 0xdc, 0x1a, 0x3f, 0x2c, 0x91, 0x39, 0x0c, 0xf7, + 0xa0, 0x1e, 0xd3, 0x98, 0xd0, 0x7b, 0xb7, 0xa6, 0x08, 0x59, 0x84, 0x33, 0x1a, 0xdf, 0xd0, 0x19, + 0x9f, 0x44, 0x29, 0xa1, 0xf7, 0x0f, 0x94, 0x8b, 0x61, 0x89, 0x18, 0x24, 0xee, 0x1b, 0x0e, 0x77, + 0xeb, 0x8a, 0xf3, 0xc9, 0x0a, 0x0e, 0x4f, 0x13, 0xc6, 0xe9, 0x9c, 0xc4, 0xb1, 0x0f, 0x8d, 0xc0, + 0x17, 0xbe, 0x4c, 0xad, 0xa1, 0x58, 0x68, 0x58, 0x03, 0xe9, 0x9d, 0x67, 0x96, 0x81, 0xb0, 0x07, + 0xb5, 0x09, 0x9d, 0x4e, 0x13, 0xf7, 0xa7, 0x02, 0x5a, 0xaf, 0x7c, 0x28, 0x47, 0x86, 0x25, 0xa2, + 0x21, 0xb8, 0xa3, 0xe7, 0x1e, 0x44, 0xa1, 0x6b, 0x2b, 0xf4, 0x46, 0x6e, 0xee, 0x41, 0x14, 0xea, + 0xf4, 0x33, 0x4c, 0x96, 0x8a, 0x5c, 0x34, 0x2c, 0xa5, 0xb2, 0x58, 0x6e, 0x06, 0xc2, 0x7d, 0x00, + 0xf9, 0xfb, 0x2e, 0x0d, 0x7c, 0x41, 0xdd, 0xd6, 0x52, 0x04, 0x3d, 0x30, 0x2c, 0x91, 0x1c, 0x0c, + 0x5f, 0xeb, 0x13, 0x3d, 0x8a, 0x03, 0xd7, 0x51, 0x8c, 0xff, 0x19, 0xc6, 0x91, 0x3e, 0xd8, 0xa3, + 0x24, 0x8e, 0x7d, 0x16, 0xc8, 0x38, 0x06, 0x87, 0x2f, 0xa1, 0x46, 0xe3, 0x54, 0x3c, 0xba, 0x6d, + 0x45, 0x70, 0x0c, 0xe1, 0x58, 0xfa, 0xe4, 0x62, 0xd5, 0x20, 0xf6, 0xa0, 0x3a, 0x4e, 0x18, 0x73, + 0xd7, 0x14, 0xe8, 0x45, 0x36, 0x6b, 0xc2, 0xd8, 0x31, 0x17, 0xfe, 0xcd, 0x34, 0xe2, 0x93, 0x61, + 0x89, 0x28, 0x0c, 0xbe, 0x02, 0x9b, 0x0b, 0x5f, 0xd0, 0x53, 0x76, 0x9b, 0xb8, 0xeb, 0x8a, 0xd0, + 0x31, 0x84, 0xcb, 0xcc, 0x3f, 0x2c, 0x91, 0x05, 0x08, 0xbf, 0x03, 0x47, 0x19, 0x66, 0x1b, 0xdc, + 0x4e, 0xe1, 0x84, 0x09, 0x8d, 0x13, 0x41, 0x2f, 0x73, 0x80, 0x61, 0x89, 0x14, 0x08, 0x78, 0x08, + 0x6d, 0x63, 0xeb, 0x12, 0x70, 0x37, 0xd4, 0x0c, 0x5b, 0xab, 0x66, 0x98, 0x17, 0x49, 0x91, 0x82, + 0x6f, 0xa0, 0x3d, 0xa5, 0x7e, 0xa0, 0x6b, 0x49, 0x56, 0x0c, 0x16, 0x6a, 0xf3, 0xed, 0x62, 0x6c, + 0x5e, 0x37, 0x45, 0x02, 0x7e, 0x0d, 0x4e, 0x4a, 0xe9, 0xec, 0x34, 0xa0, 0x4c, 0x44, 0xe2, 0xd1, + 0xdd, 0x2c, 0x74, 0xc3, 0x45, 0x6e, 0x48, 0x2e, 0x20, 0x0f, 0xf5, 0x46, 0x50, 0xb9, 0xf2, 0x43, + 0x6c, 0x83, 0xfd, 0x6e, 0x34, 0x38, 0xfe, 0xfe, 0x74, 0x74, 0x3c, 0xe8, 0x94, 0xd0, 0x86, 0xda, + 0xf1, 0xd9, 0xc5, 0xd5, 0xfb, 0x8e, 0x85, 0x0e, 0x34, 0xcf, 0xc9, 0xc9, 0xf5, 0xf9, 0xe8, 0xed, + 0xfb, 0x4e, 0x59, 0xe2, 0x8e, 0x86, 0x07, 0x23, 0x6d, 0x56, 0xb0, 0x03, 0x8e, 0x32, 0x0f, 0x46, + 0x83, 0xeb, 0x73, 0x72, 0xd2, 0xa9, 0x1e, 0xda, 0xd0, 0x18, 0x27, 0x4c, 0x50, 0x26, 0xbc, 0x5f, + 0x2d, 0xb0, 0xe7, 0xfb, 0x8e, 0x5b, 0xd0, 0x8c, 0xa9, 0xf0, 0x65, 0xcd, 0x28, 0x31, 0x70, 0xc8, + 0xdc, 0xc6, 0x1d, 0xb0, 0x45, 0x14, 0x53, 0x2e, 0xfc, 0x38, 0x55, 0x8a, 0xd0, 0xda, 0x5b, 0xcf, + 0x25, 0x7f, 0x15, 0xc5, 0x94, 0x2c, 0x10, 0x52, 0x54, 0xd2, 0xbb, 0xe8, 0x74, 0xa0, 0x64, 0xc2, + 0x21, 0xda, 0xc0, 0x4f, 0xc1, 0xe6, 0x51, 0xc8, 0x7c, 0xf1, 0x30, 0xa3, 0x4a, 0x0f, 0x1c, 0xb2, + 0x70, 0x78, 0x3d, 0x58, 0x2b, 0x96, 0xa2, 0x14, 0xa1, 0xd4, 0x7f, 0x9c, 0x26, 0x7e, 0x60, 0xf2, + 0xc9, 0x4c, 0xef, 0x07, 0x68, 0x17, 0x0a, 0x0c, 0x3b, 0x50, 0xe1, 0x51, 0x68, 0x60, 0xf2, 0x77, + 0x91, 0x42, 0x39, 0x9f, 0x02, 0x42, 0x75, 0x4c, 0x67, 0xc2, 0xe4, 0xa5, 0xfe, 0xbd, 0x5b, 0x70, + 0xf2, 0x07, 0xf0, 0x4f, 0xe6, 0x2a, 0xec, 0x61, 0xb5, 0xb8, 0x87, 0x5e, 0x08, 0xad, 0x5c, 0x43, + 0x3f, 0x2f, 0xbc, 0x81, 0x12, 0x08, 0xee, 0x96, 0xb7, 0x2b, 0x5d, 0x9b, 0x64, 0x26, 0x76, 0xa1, + 0x11, 0xf3, 0xf0, 0xea, 0x31, 0xa5, 0x46, 0x7c, 0xd7, 0x32, 0x99, 0xd3, 0x5e, 0x92, 0x0d, 0x7b, + 0x11, 0xb4, 0x72, 0xb2, 0xf4, 0x4c, 0xa0, 0x7c, 0xa6, 0xe5, 0x27, 0xa7, 0xfd, 0xf7, 0x43, 0x7d, + 0x00, 0x58, 0x28, 0xce, 0x33, 0x91, 0xba, 0x50, 0x35, 0x51, 0x2a, 0x39, 0x81, 0x28, 0x5c, 0x19, + 0xa4, 0xfa, 0x91, 0x71, 0x6f, 0x75, 0x5c, 0xad, 0xa5, 0xff, 0xe2, 0x56, 0x7e, 0xa5, 0xcf, 0x2c, + 0xbb, 0x2c, 0xbb, 0xc5, 0x8a, 0x6c, 0xcd, 0x89, 0x17, 0xda, 0xbb, 0xa8, 0xd0, 0x53, 0x68, 0x18, + 0x1f, 0xfe, 0x1f, 0xea, 0x9c, 0xde, 0x8f, 0x1e, 0x62, 0x93, 0x9e, 0xb1, 0x64, 0xfd, 0x4c, 0x7c, + 0x3e, 0x51, 0xbb, 0x6f, 0x13, 0xf5, 0x2f, 0x7d, 0x6a, 0xaf, 0x4c, 0x4d, 0xa9, 0xba, 0xf9, 0xc3, + 0x02, 0x27, 0x7f, 0x5f, 0xe2, 0x0e, 0x40, 0x3c, 0xbf, 0xda, 0x4c, 0x22, 0xed, 0xc2, 0x9d, 0x47, + 0x72, 0x80, 0x8f, 0xed, 0xdd, 0x42, 0x97, 0x56, 0x9e, 0x74, 0xa9, 0x2c, 0x9b, 0x28, 0x13, 0x31, + 0x53, 0xe0, 0x99, 0xed, 0xfd, 0x6e, 0xc1, 0xc6, 0x92, 0x16, 0xfe, 0x97, 0xd9, 0x7a, 0x07, 0xd0, + 0xcc, 0x48, 0xf8, 0x19, 0x40, 0xc4, 0xc6, 0xd7, 0xec, 0x41, 0x86, 0x32, 0x47, 0x61, 0x47, 0x6c, + 0x3c, 0x52, 0x8e, 0xdc, 0x29, 0x95, 0xf3, 0xa7, 0xe4, 0x4d, 0x60, 0x63, 0xe9, 0xed, 0x81, 0xdf, + 0xc2, 0x3a, 0xa7, 0xd3, 0x5b, 0x29, 0x9b, 0xb3, 0xd8, 0x17, 0x51, 0xc2, 0xcc, 0xc2, 0x56, 0xbd, + 0x6f, 0xc8, 0x53, 0xac, 0xac, 0xd7, 0x3b, 0x96, 0xfc, 0xc2, 0x54, 0x5d, 0x3a, 0x44, 0x1b, 0xde, + 0x04, 0x70, 0xf9, 0xc5, 0x82, 0x5f, 0x40, 0x4d, 0x3d, 0x8e, 0x5c, 0x4b, 0xb5, 0xcf, 0xca, 0x00, + 0x1a, 0x81, 0x9f, 0x43, 0x35, 0xa0, 0x7e, 0x60, 0x1a, 0x6d, 0x25, 0x52, 0x01, 0xbc, 0x1f, 0xa1, + 0xae, 0x23, 0xc9, 0xe3, 0xa4, 0x2c, 0x48, 0x93, 0x88, 0x09, 0xb5, 0x02, 0x9b, 0xcc, 0xed, 0xbf, + 0x54, 0x88, 0x95, 0x02, 0xef, 0x35, 0xa0, 0xa6, 0x1e, 0x07, 0x5e, 0x1f, 0x70, 0xf9, 0x6a, 0x96, + 0x0d, 0xa9, 0x37, 0x95, 0xab, 0xc5, 0x54, 0x49, 0x66, 0x7a, 0x07, 0xb0, 0xb9, 0xe2, 0x22, 0xc6, + 0x1e, 0x34, 0x4d, 0x3f, 0x71, 0xb3, 0xfc, 0xa7, 0xfd, 0x36, 0x1f, 0xef, 0x7d, 0x03, 0x0d, 0xd3, + 0xbd, 0x4f, 0xaf, 0xca, 0x0e, 0x38, 0x87, 0xd3, 0x64, 0x7c, 0x67, 0xf6, 0xa0, 0x63, 0xe1, 0x3a, + 0xb4, 0x32, 0xb5, 0x3f, 0xe3, 0x61, 0xa7, 0xbc, 0x97, 0x42, 0x5d, 0xeb, 0x11, 0xbe, 0x01, 0x47, + 0xff, 0x5d, 0x8a, 0x19, 0xf5, 0x63, 0x5c, 0x29, 0x57, 0x5b, 0x2b, 0xbd, 0x5e, 0xa9, 0x6b, 0xbd, + 0xb2, 0xf0, 0x25, 0x54, 0x2f, 0x22, 0x16, 0x62, 0xe1, 0xb9, 0xb4, 0x55, 0xb0, 0xbc, 0xd2, 0xe1, + 0x97, 0x3f, 0xf7, 0xc2, 0x48, 0x4c, 0x1e, 0x6e, 0xfa, 0xe3, 0x24, 0xde, 0x9d, 0x3c, 0xa6, 0x74, + 0x36, 0xa5, 0x41, 0x48, 0x67, 0xbb, 0xb7, 0xfe, 0xcd, 0x2c, 0x1a, 0xef, 0x86, 0x6a, 0xea, 0x5d, + 0xc5, 0xba, 0xa9, 0xab, 0xcf, 0xfe, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x03, 0xbd, 0x16, 0x54, + 0xd5, 0x0b, 0x00, 0x00, } diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto index 41c35f1c7b1..199d50ea230 100644 --- a/gossip/proto/message.proto +++ b/gossip/proto/message.proto @@ -77,6 +77,9 @@ message GossipMessage { // Used to indicate intent of peer to become leader LeadershipMessage leadershipMsg = 18; + + // Used to learn of a peer's certificate + PeerIdentity peerIdentity = 19; } } @@ -102,13 +105,25 @@ message ChannelCommand { message ConnEstablish { bytes sig = 1; bytes pkiID = 2; + bytes cert = 3; +} + +// PeerIdentity defines the identity of the peer +// Used to make other peers learn of the identity +// of a certain peer +message PeerIdentity { + bytes sig = 1; + bytes pkiID = 2; + bytes cert = 3; + bytes metadata = 4; } // Messages related to pull mechanism enum MsgType { - UNDEFINED = 0; + UNDEFINED = 0; BlockMessage = 1; + IdentityMsg = 2; } // DataRequest is a message used for a peer to request @@ -167,6 +182,7 @@ message AliveMessage { Member membership = 1; PeerTime timestamp = 2; bytes signature = 3; + bytes identity = 4; } // Leadership Message is sent during leader election to inform diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 4bd18592799..d0e1bb8d908 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -17,17 +17,20 @@ limitations under the License. package state import ( - "bytes" "fmt" "os" "strconv" "testing" "time" + "bytes" + pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" pcomm "github.com/hyperledger/fabric/protos/common" @@ -39,6 +42,24 @@ var ( logger, _ = logging.GetLogger("GossipStateProviderTest") ) +type naiveSecProvider struct { +} + +func (*naiveSecProvider) IsEnabled() bool { + return true +} + +func (*naiveSecProvider) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +func (*naiveSecProvider) Verify(vkID, signature, message []byte) error { + if bytes.Equal(signature, message) { + return nil + } + return fmt.Errorf("Failed verifying") +} + type naiveCryptoService struct { } @@ -54,15 +75,36 @@ func (*naiveCryptoService) IsEnabled() bool { return true } +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + return common.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { return msg, nil } -func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { - if bytes.Equal(signature, message) { - return nil +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) } - return fmt.Errorf("Failed verifying") + return nil } func bootPeers(ids ...int) []string { @@ -110,14 +152,14 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { // Create gossip instance func newGossipInstance(config *gossip.Config, comm comm.Comm) gossip.Gossip { - return gossip.NewGossipService(config, comm, &naiveCryptoService{}) + return gossip.NewGossipService(config, comm, &naiveCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(config.ID)) } // Setup and create basic communication module // need to be used for peer-to-peer communication // between peers and state transfer func newCommInstance(config *gossip.Config) comm.Comm { - comm, err := comm.NewCommInstanceWithServer(config.BindPort, &naiveCryptoService{}, []byte(config.SelfEndpoint)) + comm, err := comm.NewCommInstanceWithServer(config.BindPort, &naiveSecProvider{}, []byte(config.SelfEndpoint)) if err != nil { panic(err) } diff --git a/gossip/util/misc.go b/gossip/util/misc.go index 3b5e0413f3c..297c5ba2a3e 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -130,4 +130,4 @@ func PrintStackTrace() { buf := make([]byte, 1<<16) runtime.Stack(buf, true) fmt.Printf("%s", buf) -} +} \ No newline at end of file