Skip to content

Commit

Permalink
Cleanup structure of autonat.
Browse files Browse the repository at this point in the history
* incoming connections post a channel event - fix libp2p#40
* inbound connections reduce the frequency of probes - address libp2p#35

waiting on libp2p/go-libp2p#747 for detecting local address changes
  • Loading branch information
willscott committed Feb 17, 2020
1 parent 6f8dd31 commit 1f8ed1d
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 165 deletions.
266 changes: 136 additions & 130 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,29 @@ import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// NATStatus is the state of NAT as detected by the ambient service.
type NATStatus int

const (
// NAT status is unknown; this means that the ambient service has not been
// NATStatusUnknown means that the ambient service has not been
// able to decide the presence of NAT in the most recent attempt to test
// dial through known autonat peers. initial state.
NATStatusUnknown NATStatus = iota
// NAT status is publicly dialable
// NATStatusPublic means this node believes it is externally dialable
NATStatusPublic
// NAT status is private network
// NATStatusPrivate means this node believes it is behind a NAT
NATStatusPrivate
)

Expand All @@ -50,62 +51,61 @@ type AmbientAutoNAT struct {
ctx context.Context
host host.Host

getAddrs GetAddrs

mx sync.Mutex
peers map[peer.ID][]ma.Multiaddr
status NATStatus
addr ma.Multiaddr
candidatePeers chan network.Conn
observations chan autoNATResult
status atomic.Value
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
// If only a single autoNAT peer is known, then the confidence increases
// for each failure until it reaches 3.
confidence int
confidence int
lastInbound time.Time
lastProbe time.Time

emitUnknown event.Emitter
emitPublic event.Emitter
emitPrivate event.Emitter
}

// NewAutoNAT creates a new ambient NAT autodiscovery instance attached to a host
// If getAddrs is nil, h.Addrs will be used
func NewAutoNAT(ctx context.Context, h host.Host, getAddrs GetAddrs) AutoNAT {
if getAddrs == nil {
getAddrs = h.Addrs
}
type autoNATResult struct {
NATStatus
address ma.Multiaddr
}

// NewAutoNAT creates a new ambient NAT autodiscovery instance attached to a host
func NewAutoNAT(ctx context.Context, h host.Host) AutoNAT {
emitUnknown, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityUnknown))
emitPublic, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic))
emitPrivate, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPrivate))

as := &AmbientAutoNAT{
ctx: ctx,
host: h,
getAddrs: getAddrs,
peers: make(map[peer.ID][]ma.Multiaddr),
status: NATStatusUnknown,
ctx: ctx,
host: h,
candidatePeers: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),

emitUnknown: emitUnknown,
emitPublic: emitPublic,
emitPrivate: emitPrivate,
}
as.status.Store(autoNATResult{NATStatusUnknown, nil})

h.Network().Notify(as)
go as.background()

return as
}

// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() NATStatus {
as.mx.Lock()
defer as.mx.Unlock()
return as.status
s := as.status.Load().(autoNATResult)
return s.NATStatus
}

func (as *AmbientAutoNAT) updateStatus(s NATStatus) {
as.status = s
switch s {
func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load().(autoNATResult)
switch status.NATStatus {
case NATStatusUnknown:
as.emitUnknown.Emit(event.EvtLocalRoutabilityUnknown{})
case NATStatusPublic:
Expand All @@ -115,158 +115,164 @@ func (as *AmbientAutoNAT) updateStatus(s NATStatus) {
}
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
as.mx.Lock()
defer as.mx.Unlock()

if as.status != NATStatusPublic {
s := as.status.Load().(autoNATResult)
if s.NATStatus != NATStatusPublic {
return nil, errors.New("NAT Status is not public")
}

return as.addr, nil
return s.address, nil
}

func (as *AmbientAutoNAT) background() {
// wait a bit for the node to come online and establish some connections
// before starting autodetection
select {
case <-time.After(AutoNATBootDelay):
case <-as.ctx.Done():
return
}

delay := AutoNATBootDelay
for {
as.autodetect()

delay := AutoNATRefreshInterval
if as.status == NATStatusUnknown {
delay = AutoNATRetryInterval
}

select {
// new connection occured.
case conn := <-as.candidatePeers:
if conn.Stat().Direction == network.DirInbound && manet.IsPublicAddr(conn.RemoteMultiaddr()) {
as.lastInbound = time.Now()
}
// TODO: network changed.

// probe finished.
case result := <-as.observations:
as.recordObservation(result)
case <-time.After(delay):
case <-as.ctx.Done():
return
}
}
}

func (as *AmbientAutoNAT) autodetect() {
peers := as.getPeers()

if len(peers) == 0 {
log.Debugf("skipping NAT auto detection; no autonat peers")
return
}

cli := NewAutoNATClient(as.host, as.getAddrs)
ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout)
defer cancel()

var result struct {
sync.Mutex
private int
public int
pubaddr ma.Multiaddr
delay = as.scheduleProbe()
}
}

probe := 3 - as.confidence
if probe == 0 {
probe = 1
}
if probe > len(peers) {
probe = len(peers)
// scheduleProbe calculates when the next probe should be scheduled for,
// and launches it if that time is now.
func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// Our baseline is a probe every 'AutoNATRefreshInterval'
// This is modulated by:
// * recent inbound connections make us willing to wait up to 2x longer between probes.
// * low confidence makes us speed up between probes.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)

nextProbe := fixedNow
if !as.lastProbe.IsZero() {
untilNext := AutoNATRefreshInterval
if currentStatus.NATStatus == NATStatusUnknown {
untilNext = AutoNATRetryInterval
} else if currentStatus.NATStatus == NATStatusPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if as.confidence < 3 {
untilNext = AutoNATRetryInterval
}
nextProbe = as.lastProbe.Add(untilNext)
}

var wg sync.WaitGroup

for _, pi := range peers[:probe] {
wg.Add(1)
go func(pi peer.AddrInfo) {
defer wg.Done()

as.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
a, err := cli.DialBack(ctx, pi.ID)

switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
result.Lock()
result.public++
result.pubaddr = a
result.Unlock()

case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result.Lock()
result.private++
result.Unlock()

default:
log.Debugf("Dialback error through %s: %s", pi.ID.Pretty(), err)
}
}(pi)
if fixedNow.After(nextProbe) || fixedNow == nextProbe {
as.lastProbe = fixedNow
go as.probeNextPeer()
return AutoNATRetryInterval
}
return nextProbe.Sub(fixedNow)
}

wg.Wait()

as.mx.Lock()
if result.public > 0 {
// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load().(autoNATResult)
if observation.NATStatus == NATStatusPublic {
log.Debugf("NAT status is public")
if as.status == NATStatusPrivate {
if currentStatus.NATStatus == NATStatusPrivate {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
} else if as.confidence < 3 {
as.confidence++
}
as.addr = result.pubaddr
as.updateStatus(NATStatusPublic)
} else if result.private > 0 {
if observation.address != nil {
if currentStatus.address != nil && !observation.address.Equal(currentStatus.address) {
as.confidence--
}
as.status.Store(observation)
}
if currentStatus.address != nil || observation.address != nil {
as.emitStatus()
}
} else if observation.NATStatus == NATStatusPrivate {
log.Debugf("NAT status is private")
if as.status == NATStatusPublic {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if currentStatus.NATStatus == NATStatusPublic {
if as.confidence < 1 {
as.confidence--
} else {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
as.emitStatus()
}
} else if as.confidence < 3 {
as.confidence++
as.status.Store(observation)
as.emitStatus()
}
as.addr = nil
as.updateStatus(NATStatusPrivate)
} else if as.confidence > 0 {
// don't just flip to unknown, reduce confidence first
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.addr = nil
as.updateStatus(NATStatusUnknown)
as.status.Store(autoNATResult{NATStatusUnknown, nil})
as.emitStatus()
}
as.mx.Unlock()
}

func (as *AmbientAutoNAT) getPeers() []peer.AddrInfo {
as.mx.Lock()
defer as.mx.Unlock()
func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
cli := NewAutoNATClient(as.host)
ctx, cancel := context.WithTimeout(as.ctx, AutoNATRequestTimeout)
defer cancel()

if len(as.peers) == 0 {
return nil
as.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
a, err := cli.DialBack(ctx, pi.ID)

switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
as.observations <- autoNATResult{NATStatusPublic, a}
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
as.observations <- autoNATResult{NATStatusPrivate, nil}
default:
as.observations <- autoNATResult{NATStatusUnknown, nil}
}
}

var connected, others []peer.AddrInfo
func (as *AmbientAutoNAT) probeNextPeer() {
peers := as.host.Network().Peers()
if len(peers) == 0 {
return
}

for p, addrs := range as.peers {
connected := make([]peer.AddrInfo, 0, len(peers))
others := make([]peer.AddrInfo, 0, len(peers))

for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
if as.host.Network().Connectedness(p) == network.Connected {
connected = append(connected, peer.AddrInfo{ID: p, Addrs: addrs})
connected = append(connected, info)
} else {
others = append(others, peer.AddrInfo{ID: p, Addrs: addrs})
others = append(others, info)
}
}
// TODO: track and exclude recently probed peers.

shufflePeers(connected)

if len(connected) < 3 {
if len(connected) > 0 {
as.probe(&connected[0])
return
} else if len(others) > 0 {
shufflePeers(others)
return append(connected, others...)
} else {
return connected
as.probe(&others[0])
}
}

Expand Down
6 changes: 2 additions & 4 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Mes

func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, AutoNAT) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
a := NewAutoNAT(ctx, h, nil)
a.(*AmbientAutoNAT).mx.Lock()
a.(*AmbientAutoNAT).peers[ash.ID()] = ash.Addrs()
a.(*AmbientAutoNAT).mx.Unlock()
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
a := NewAutoNAT(ctx, h)
return h, a
}

Expand Down
Loading

0 comments on commit 1f8ed1d

Please sign in to comment.