Skip to content

Commit

Permalink
exchange signed routing records in identify (#747)
Browse files Browse the repository at this point in the history
*  Exchange signed routing records in identify


Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
  • Loading branch information
yusefnapora and aarshkshah1992 committed Apr 30, 2020
1 parent c833e2c commit 077a818
Show file tree
Hide file tree
Showing 8 changed files with 548 additions and 149 deletions.
94 changes: 70 additions & 24 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@ package basichost

import (
"context"
"errors"
"io"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"

"github.com/libp2p/go-eventbus"
inat "github.com/libp2p/go-libp2p-nat"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

logging "github.com/ipfs/go-log"

"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
Expand Down Expand Up @@ -93,6 +96,9 @@ type BasicHost struct {
}

addrChangeChan chan struct{}

signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -150,10 +156,21 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}

cab, ok := peerstore.GetCertifiedAddrBook(net.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab

h.signKey = h.Peerstore().PrivKey(h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}

if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}
Expand Down Expand Up @@ -221,12 +238,12 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}

h, err := NewHost(context.Background(), net, hostopts)
h.Start()
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
panic(err)
}
h.Start()

return h
}
Expand Down Expand Up @@ -327,39 +344,68 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return &evt
}

func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (*record.Envelope, error) {
current := make([]multiaddr.Multiaddr, 0, len(evt.Current))
for _, a := range evt.Current {
current = append(current, a.Address)
}

rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{h.ID(), current})
return record.Seal(rec, h.signKey)
}

func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr

emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
// nothing to do if both are nil..defensive check
if currentAddrs == nil && lastAddrs == nil {
return
}

changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)

if changeEvt == nil {
return
}

// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(changeEvt)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = *sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}

// emit addr change event on the bus
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}

// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

// initialize lastAddrs
lastAddrs := h.Addrs()

for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr

select {
case <-ticker.C:
case <-h.addrChangeChan:
case <-h.ctx.Done():
return
}

// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
if changeEvt != nil {
lastAddrs = addrs
}

if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}
}

Expand Down
131 changes: 102 additions & 29 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@ import (
"context"
"io"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
"github.com/libp2p/go-libp2p-core/test"

"github.com/libp2p/go-eventbus"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -102,16 +105,35 @@ func TestProtocolHandlerEvents(t *testing.T) {
}
defer sub.Close()

assert := func(added, removed []protocol.ID) {
var next event.EvtLocalProtocolsUpdated
select {
case evt := <-sub.Out():
next = evt.(event.EvtLocalProtocolsUpdated)
break
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
// the identify service adds new protocol handlers shortly after the host
// starts. this helps us filter those events out, since they're unrelated
// to the test.
isIdentify := func(evt event.EvtLocalProtocolsUpdated) bool {
for _, p := range evt.Added {
if p == identify.ID || p == identify.IDPush {
return true
}
}
return false
}

nextEvent := func() event.EvtLocalProtocolsUpdated {
for {
select {
case evt := <-sub.Out():
next := evt.(event.EvtLocalProtocolsUpdated)
if isIdentify(next) {
continue
}
return next
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
}
}
}

assert := func(added, removed []protocol.ID) {
next := nextEvent()
if !reflect.DeepEqual(added, next.Added) {
t.Errorf("expected added: %v; received: %v", added, next.Added)
}
Expand Down Expand Up @@ -460,11 +482,10 @@ func TestAddrResolution(t *testing.T) {
_ = h.Connect(tctx, *pi)

addrs := h.Peerstore().Addrs(pi.ID)
sort.Sort(sortedMultiaddrs(addrs))

if len(addrs) != 2 || !addrs[0].Equal(addr1) || !addrs[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs)
}
require.Len(t, addrs, 2)
require.Contains(t, addrs, addr1)
require.Contains(t, addrs, addr2)
}

func TestAddrResolutionRecursive(t *testing.T) {
Expand Down Expand Up @@ -515,11 +536,9 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi1)

addrs1 := h.Peerstore().Addrs(pi1.ID)
sort.Sort(sortedMultiaddrs(addrs1))

if len(addrs1) != 2 || !addrs1[0].Equal(addr1) || !addrs1[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs1)
}
require.Len(t, addrs1, 2)
require.Contains(t, addrs1, addr1)
require.Contains(t, addrs1, addr2)

pi2, err := peer.AddrInfoFromP2pAddr(p2paddr2)
if err != nil {
Expand All @@ -529,11 +548,49 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi2)

addrs2 := h.Peerstore().Addrs(pi2.ID)
sort.Sort(sortedMultiaddrs(addrs2))
require.Len(t, addrs2, 1)
require.Contains(t, addrs2, addr1)
}

func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
ctx := context.Background()
taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")}

if len(addrs2) != 1 || !addrs2[0].Equal(addr1) {
t.Fatalf("expected [%s], got %+v", addr1, addrs2)
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
return taddrs
}))
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
if err != nil {
t.Error(err)
}
defer sub.Close()
// wait for the host background thread to start
time.Sleep(1 * time.Second)

expected := event.EvtLocalAddressesUpdated{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
Removed: []event.UpdatedAddress{}}

// assert we get expected event
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expected, evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, evt)
}

// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, taddrs, rc.Addrs)

// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, taddrs, rc.Addrs)
}

func TestHostAddrChangeDetection(t *testing.T) {
Expand Down Expand Up @@ -611,9 +668,18 @@ func TestHostAddrChangeDetection(t *testing.T) {
h.SignalAddressChange()
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt)
}

// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, addrSets[i], rc.Addrs)

// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, addrSets[i], rc.Addrs)
}
}

Expand Down Expand Up @@ -672,10 +738,17 @@ func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
updatedAddrsEqual(a.Removed, b.Removed)
}

type sortedMultiaddrs []ma.Multiaddr

func (sma sortedMultiaddrs) Len() int { return len(sma) }
func (sma sortedMultiaddrs) Swap(i, j int) { sma[i], sma[j] = sma[j], sma[i] }
func (sma sortedMultiaddrs) Less(i, j int) bool {
return bytes.Compare(sma[i].Bytes(), sma[j].Bytes()) == 1
func peerRecordFromEnvelope(t *testing.T, ev record.Envelope) *peer.PeerRecord {
t.Helper()
rec, err := ev.Record()
if err != nil {
t.Fatalf("error getting PeerRecord from event: %v", err)
return nil
}
peerRec, ok := rec.(*peer.PeerRecord)
if !ok {
t.Fatalf("wrong type for peer record")
return nil
}
return peerRec
}
Loading

0 comments on commit 077a818

Please sign in to comment.