Skip to content

Commit

Permalink
use event bus to trigger identify/push
Browse files Browse the repository at this point in the history
This changes BasicHost to emit EvtLocalAddressesUpdated events
when it detects changes to the local listen addresses instead of
directly calling IDService.Push.

These events are consumed by a new routingStateManager component,
which build a new SignedRoutingState record and sends it on the
event bus in an EvtLocalRoutingStateUpdated event.

Finally, the identify service listens for the updated routing
state events and triggers an identify/push.
  • Loading branch information
yusefnapora committed Dec 20, 2019
1 parent 3ebe3ad commit b4f30c2
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 36 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ require (
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-log v1.0.0
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p-autonat v0.1.1
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-circuit v0.1.4
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211501-15623e878c25
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211544-b57c5023fbf9
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.1
Expand Down
19 changes: 12 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZ
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.0 h1:BW3LQIiZzpNyolt84yvKNCd3FU+AK4VDw1hnHR+1aiI=
github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA=
Expand Down Expand Up @@ -133,8 +134,8 @@ github.com/libp2p/go-libp2p-core v0.2.2/go.mod h1:8fcwTbsG2B+lTgRJ1ICZtiM5GWCWZV
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.2.6-0.20191121175514-5fa975301271/go.mod h1:xDyprN8hkMpX27XQ1bBnYtuSuaCUFvKm+Q6gltHZCHE=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211501-15623e878c25 h1:Yzu98yHyavbHX1udrzM0xMN/06osM+6OKtKlzTuBMVQ=
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211501-15623e878c25/go.mod h1:3PDCxmuTTy+gyLxQz7Tf7lOf36n7WUECOJ9CPWx0S6A=
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211544-b57c5023fbf9 h1:Doq/tZH8j+OGxNwRmYJoLkyLE/WrhoU67vcV1YLO2WA=
github.com/libp2p/go-libp2p-core v0.3.1-0.20191220211544-b57c5023fbf9/go.mod h1:KJbT3ekTPccG9QUSa5vc/0QVZXPE9+3TVlyqESRQic0=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
Expand Down Expand Up @@ -203,10 +204,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.1.12 h1:WMhc1ik4LNkTg8U9l3hI1LvxKmIL+f1+WV/SZtCbDDA=
Expand Down Expand Up @@ -263,8 +262,9 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -292,7 +292,6 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/whyrusleeping/go-logging v0.0.1 h1:fwpzlmT0kRC/Fmd0MdmGgJG/CXIZ6gFq46FQZjprUcc=
github.com/whyrusleeping/go-logging v0.0.1/go.mod h1:lDPYj54zutzG1XYfHAhcc7oNXEburHQBn+Iqd4yS4vE=
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
Expand All @@ -306,6 +305,12 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
54 changes: 33 additions & 21 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type BasicHost struct {
lastAddrs []ma.Multiaddr
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}
}

Expand Down Expand Up @@ -141,6 +142,9 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
return nil, err
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
if h.natmgr != nil {
Expand All @@ -150,6 +154,7 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
h.cmgr.Close()
}
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
return h.Network().Close()
})

Expand Down Expand Up @@ -295,21 +300,29 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}

// PushIdentify pushes an identify update through the identify push protocol
// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent if so.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
push := false
func (h *BasicHost) CheckForAddressChanges() {
changed := false

h.mx.Lock()
addrs := h.Addrs()
if !sameAddrs(addrs, h.lastAddrs) {
push = true
added, removed := addrDiff(h.lastAddrs, addrs)
if len(added) != 0 || len(removed) != 0 {
changed = true
h.lastAddrs = addrs
}
h.mx.Unlock()

if push {
h.ids.Push()
if changed {
err := h.emitters.evtLocalAddrsUpdated.Emit(event.EvtLocalAddressesUpdated{
Added: added,
Removed: removed,
})
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}
}

Expand All @@ -329,32 +342,31 @@ func (h *BasicHost) background(p goprocess.Process) {
for {
select {
case <-ticker.C:
h.PushIdentify()
h.CheckForAddressChanges()

case <-p.Closing():
return
}
}
}

func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}
func addrDiff(old, new []ma.Multiaddr) (added, removed []ma.Multiaddr) {
oldmap := make(map[string]ma.Multiaddr, len(old))

bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
for _, addr := range old {
oldmap[string(addr.Bytes())] = addr
}

for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
for _, addr := range new {
_, ok := oldmap[string(addr.Bytes())]
if !ok {
return false
added = append(added, addr)
}
delete(oldmap, string(addr.Bytes()))
}

return true
for _, addr := range oldmap {
removed = append(removed, addr)
}
return added, removed
}

// ID returns the (local) peer.ID associated with this Host
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (ar *AutoRelay) background(ctx context.Context) {
ar.cachedAddrs = nil
ar.mx.Unlock()
push = false
ar.host.PushIdentify()
ar.host.CheckForAddressChanges()
}

select {
Expand Down
25 changes: 20 additions & 5 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type IDService struct {
useSignedAddrs bool

subscriptions struct {
localProtocolsUpdated event.Subscription
localProtocolsUpdated event.Subscription
localRoutingStateUpdated event.Subscription
}
emitters struct {
evtPeerProtocolsUpdated event.Emitter
Expand Down Expand Up @@ -124,7 +125,13 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
if err != nil {
log.Warningf("identify service not subscribed to local protocol handlers updates; err: %s", err)
} else {
go s.handleEvents()
go s.handleEvents(s.subscriptions.localProtocolsUpdated, s.handleProtosChanged)
}
s.subscriptions.localRoutingStateUpdated, err = h.EventBus().Subscribe(&event.EvtLocalPeerRoutingStateUpdated{}, eventbus.BufSize(128))
if err != nil {
log.Warnf("identify service not subscribed to local routing state changes; err: %s", err)
} else {
go s.handleEvents(s.subscriptions.localRoutingStateUpdated, s.handleRoutingStateUpdated)
}

s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
Expand All @@ -144,8 +151,7 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
return s
}

func (ids *IDService) handleEvents() {
sub := ids.subscriptions.localProtocolsUpdated
func (ids *IDService) handleEvents(sub event.Subscription, handler func(interface{})) {
defer func() {
_ = sub.Close()
// drain the channel.
Expand All @@ -159,13 +165,22 @@ func (ids *IDService) handleEvents() {
if !more {
return
}
ids.fireProtocolDelta(evt.(event.EvtLocalProtocolsUpdated))
handler(evt)
case <-ids.ctx.Done():
return
}
}
}

func (ids *IDService) handleProtosChanged(evt interface{}) {
ids.fireProtocolDelta(evt.(event.EvtLocalProtocolsUpdated))
}

func (ids *IDService) handleRoutingStateUpdated(evt interface{}) {
log.Debug("triggering push based on routing state updated event")
ids.Push()
}

// OwnObservedAddrs returns the addresses peers have reported we've dialed from
func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr {
return ids.observedAddrs.Addrs()
Expand Down

0 comments on commit b4f30c2

Please sign in to comment.