diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index facd5715bd8..0f8593d0743 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr") var SupportedTransportStrings = []string{ "/ip4/tcp", "/ip6/tcp", - // "/ip4/udp/utp", disabled because the lib is broken - // "/ip6/udp/utp", disabled because the lib is broken + "/ip4/udp/utp", + "/ip6/udp/utp", // "/ip4/udp/udt", disabled because the lib doesnt work on arm // "/ip6/udp/udt", disabled because the lib doesnt work on arm } diff --git a/p2p/net/swarm/addr/addr_test.go b/p2p/net/swarm/addr/addr_test.go index eb843ffc097..91b89067061 100644 --- a/p2p/net/swarm/addr/addr_test.go +++ b/p2p/net/swarm/addr/addr_test.go @@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) { bad := []ma.Multiaddr{ newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local @@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"), newMultiaddr(t, "/ip6/::1/tcp/1234"), + newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -39,18 +39,12 @@ func TestFilterAddrs(t *testing.T) { if AddrUsable(a, false) { t.Errorf("addr %s should be unusable", a) } - if AddrUsable(a, true) { - t.Errorf("addr %s should be unusable", a) - } } for _, a := range good { if !AddrUsable(a, false) { t.Errorf("addr %s should be usable", a) } - if !AddrUsable(a, true) { - t.Errorf("addr %s should be usable", a) - } } subtestAddrsEqual(t, FilterUsableAddrs(bad), []ma.Multiaddr{}) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0c6271fc10e..f12ca1a5380 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -91,13 +91,16 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, } s := &Swarm{ - swarm: ps.NewSwarm(PSTransport), - local: local, - peers: peers, - ctx: ctx, - dialT: DialTimeout, - notifs: make(map[inet.Notifiee]ps.Notifiee), - transports: []transport.Transport{transport.NewTCPTransport()}, + swarm: ps.NewSwarm(PSTransport), + local: local, + peers: peers, + ctx: ctx, + dialT: DialTimeout, + notifs: make(map[inet.Notifiee]ps.Notifiee), + transports: []transport.Transport{ + transport.NewTCPTransport(), + transport.NewUtpTransport(), + }, bwc: bwc, fdRateLimit: make(chan struct{}, concurrentFdDials), Filters: filter.NewFilters(), diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index b75b491c42b..95b6e66e072 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -25,7 +25,6 @@ func TestFilterAddrs(t *testing.T) { bad := []ma.Multiaddr{ m("/ip4/1.2.3.4/udp/1234"), // unreliable m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm m("/ip6/fe80::1/tcp/0"), // link local m("/ip6/fe80::100/tcp/1234"), // link local @@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ m("/ip4/127.0.0.1/tcp/0"), m("/ip6/::1/tcp/0"), + m("/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -41,13 +41,13 @@ func TestFilterAddrs(t *testing.T) { // test filters for _, a := range bad { - if addrutil.AddrUsable(a, true) { + if addrutil.AddrUsable(a, false) { t.Errorf("addr %s should be unusable", a) } } for _, a := range good { - if !addrutil.AddrUsable(a, true) { + if !addrutil.AddrUsable(a, false) { t.Errorf("addr %s should be usable", a) } } diff --git a/p2p/net/transport/utp.go b/p2p/net/transport/utp.go new file mode 100644 index 00000000000..162816fecac --- /dev/null +++ b/p2p/net/transport/utp.go @@ -0,0 +1,148 @@ +package transport + +import ( + "net" + "sync" + + utp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/anacrolix/utp" + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp" +) + +type UtpTransport struct { + sockLock sync.Mutex + sockets map[string]*UtpSocket +} + +func NewUtpTransport() *UtpTransport { + return &UtpTransport{ + sockets: make(map[string]*UtpSocket), + } +} + +func (d *UtpTransport) Matches(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +type UtpSocket struct { + s *utp.Socket + laddr ma.Multiaddr + transport Transport +} + +func (t *UtpTransport) Listen(laddr ma.Multiaddr) (Listener, error) { + t.sockLock.Lock() + defer t.sockLock.Unlock() + s, ok := t.sockets[laddr.String()] + if ok { + return s, nil + } + + ns, err := t.newConn(laddr) + if err != nil { + return nil, err + } + + t.sockets[laddr.String()] = ns + return ns, nil +} + +func (t *UtpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) { + t.sockLock.Lock() + defer t.sockLock.Unlock() + s, ok := t.sockets[laddr.String()] + if ok { + return s, nil + } + + ns, err := t.newConn(laddr, opts...) + if err != nil { + return nil, err + } + + t.sockets[laddr.String()] = ns + return ns, nil +} + +func (t *UtpTransport) newConn(addr ma.Multiaddr, opts ...DialOpt) (*UtpSocket, error) { + network, netaddr, err := manet.DialArgs(addr) + if err != nil { + return nil, err + } + + s, err := utp.NewSocket("udp"+network[3:], netaddr) + if err != nil { + return nil, err + } + + laddr, err := manet.FromNetAddr(mautp.MakeAddr(s.LocalAddr())) + if err != nil { + return nil, err + } + + return &UtpSocket{ + s: s, + laddr: laddr, + transport: t, + }, nil +} + +func (s *UtpSocket) Dial(raddr ma.Multiaddr) (Conn, error) { + _, addr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + con, err := s.s.Dial(addr) + if err != nil { + return nil, err + } + + mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con}) + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: mnc, + transport: s.transport, + }, nil +} + +func (s *UtpSocket) Accept() (Conn, error) { + c, err := s.s.Accept() + if err != nil { + return nil, err + } + + mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: c}) + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: mnc, + transport: s.transport, + }, nil +} + +func (s *UtpSocket) Matches(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +func (t *UtpSocket) Close() error { + return t.s.Close() +} + +func (t *UtpSocket) Addr() net.Addr { + return t.s.Addr() +} + +func (t *UtpSocket) Multiaddr() ma.Multiaddr { + return t.laddr +} + +var _ Transport = (*UtpTransport)(nil) diff --git a/test/sharness/t0130-multinode.sh b/test/sharness/t0130-multinode.sh index 7ba364ec5bd..fa9b9d91c15 100755 --- a/test/sharness/t0130-multinode.sh +++ b/test/sharness/t0130-multinode.sh @@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" ' run_basic_test +test_expect_success "set up utp testbed" ' + iptb init -n 5 -p 0 -f --bootstrap=none --utp +' + +run_basic_test + test_done