Skip to content

Commit

Permalink
discovery: swap mdns lib for grandcat/zeroconf
Browse files Browse the repository at this point in the history
This commit makes the minimal amount of changes to switch us from
whyrusleeping/mdns to grandcat/zeroconf. Of note, rather than asking the
host.Host which addrs we're available to listen on, we push this to the
zeroconf library (which runs across available ifaces).
  • Loading branch information
rargulati committed Feb 27, 2018
1 parent bd3e85c commit a238ddb
Showing 1 changed file with 30 additions and 57 deletions.
87 changes: 30 additions & 57 deletions p2p/discovery/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package discovery

import (
"context"
"errors"
"io"
"io/ioutil"
golog "log"
"net"
"strings"
"sync"
"time"

mdns "github.com/grandcat/zeroconf"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/whyrusleeping/mdns"
)

var log = logging.Logger("mdns")
Expand All @@ -34,72 +34,36 @@ type Notifee interface {
}

type mdnsService struct {
server *mdns.Server
service *mdns.MDNSService
host host.Host
tag string
server *mdns.Server
host host.Host
tag string

lk sync.Mutex
notifees []Notifee
interval time.Duration
}

func getDialableListenAddrs(ph host.Host) ([]*net.TCPAddr, error) {
var out []*net.TCPAddr
for _, addr := range ph.Addrs() {
na, err := manet.ToNetAddr(addr)
if err != nil {
continue
}
tcp, ok := na.(*net.TCPAddr)
if ok {
out = append(out, tcp)
}
}
if len(out) == 0 {
return nil, errors.New("failed to find good external addr from peerhost")
}
return out, nil
}

func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Duration, serviceTag string) (Service, error) {

// TODO: dont let mdns use logging...
golog.SetOutput(ioutil.Discard)

var ipaddrs []net.IP
port := 4001

addrs, err := getDialableListenAddrs(peerhost)
if err != nil {
log.Warning(err)
} else {
port = addrs[0].Port
for _, a := range addrs {
ipaddrs = append(ipaddrs, a.IP)
}
}

myid := peerhost.ID().Pretty()

info := []string{myid}
if serviceTag == "" {
serviceTag = ServiceTag
}
service, err := mdns.NewMDNSService(myid, serviceTag, "", "", port, ipaddrs, info)
if err != nil {
return nil, err
}

// Create the mDNS server, defer shutdown
server, err := mdns.NewServer(&mdns.Config{Zone: service})
server, err := mdns.Register(myid, serviceTag, "", port, info, nil)
if err != nil {
return nil, err
}

s := &mdnsService{
server: server,
service: service,
host: peerhost,
interval: interval,
tag: serviceTag,
Expand All @@ -111,34 +75,38 @@ func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Durat
}

func (m *mdnsService) Close() error {
return m.server.Shutdown()
m.server.Shutdown()
// grandcat/zerconf swallows error, satisfy interface
return nil
}

func (m *mdnsService) pollForEntries(ctx context.Context) {

ticker := time.NewTicker(m.interval)
for {
resolver, err := mdns.NewResolver(nil)
if err != nil {
log.Error("Failed to initialize resolver:", err)
}

//execute mdns query right away at method call and then with every tick
entriesCh := make(chan *mdns.ServiceEntry, 16)
go func() {
for entry := range entriesCh {
go func(results <-chan *mdns.ServiceEntry) {
for entry := range results {
m.handleEntry(entry)
}
}()
}(entriesCh)

log.Debug("starting mdns query")
qp := &mdns.QueryParam{
Domain: "local",
Entries: entriesCh,
Service: m.tag,
Timeout: time.Second * 5,
}

err := mdns.Query(qp)
if err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

if err := resolver.Browse(ctx, m.tag, "local", entriesCh); err != nil {
log.Error("mdns lookup error: ", err)
}
close(entriesCh)

log.Debug("mdns query complete")

select {
Expand All @@ -152,8 +120,13 @@ func (m *mdnsService) pollForEntries(ctx context.Context) {
}

func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
log.Debugf("Handling MDNS entry: %s:%d %s", e.AddrV4, e.Port, e.Info)
mpeer, err := peer.IDB58Decode(e.Info)
// pull out the txt
info := strings.Join(e.Text, "|")

// addripv4 potential issue
log.Debugf("Handling MDNS entry: %s:%d %s", e.AddrIPv4[0], e.Port, info)

mpeer, err := peer.IDB58Decode(info)
if err != nil {
log.Warning("Error parsing peer ID from mdns entry: ", err)
return
Expand All @@ -165,7 +138,7 @@ func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
}

maddr, err := manet.FromNetAddr(&net.TCPAddr{
IP: e.AddrV4,
IP: e.AddrIPv4[0],
Port: e.Port,
})
if err != nil {
Expand Down

0 comments on commit a238ddb

Please sign in to comment.