Skip to content

Commit

Permalink
Merge pull request #125 from libp2p/filters
Browse files Browse the repository at this point in the history
add support for multiaddr filtering
  • Loading branch information
marten-seemann committed Mar 17, 2020
2 parents 28e61ca + 29c56d8 commit 43458b9
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 32 deletions.
57 changes: 46 additions & 11 deletions p2p/transport/quic/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"fmt"
"io/ioutil"
mrand "math/rand"
"net"
"time"

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -62,12 +64,12 @@ var _ = Describe("Connection", func() {
})

It("handshakes on IPv4", func() {
serverTransport, err := NewTransport(serverKey, nil)
serverTransport, err := NewTransport(serverKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil)
clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -86,12 +88,12 @@ var _ = Describe("Connection", func() {
})

It("handshakes on IPv6", func() {
serverTransport, err := NewTransport(serverKey, nil)
serverTransport, err := NewTransport(serverKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln := runServer(serverTransport, "/ip6/::1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil)
clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -110,12 +112,12 @@ var _ = Describe("Connection", func() {
})

It("opens and accepts streams", func() {
serverTransport, err := NewTransport(serverKey, nil)
serverTransport, err := NewTransport(serverKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil)
clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -139,11 +141,11 @@ var _ = Describe("Connection", func() {
It("fails if the peer ID doesn't match", func() {
thirdPartyID, _ := createPeer()

serverTransport, err := NewTransport(serverKey, nil)
serverTransport, err := NewTransport(serverKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")

clientTransport, err := NewTransport(clientKey, nil)
clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
// dial, but expect the wrong peer ID
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), thirdPartyID)
Expand All @@ -161,14 +163,47 @@ var _ = Describe("Connection", func() {
Eventually(done).Should(BeClosed())
})

It("filters addresses", func() {
filters := filter.NewFilters()
ipNet := net.IPNet{
IP: net.IPv4(127, 0, 0, 1),
Mask: net.IPv4Mask(255, 255, 255, 255),
}
filters.AddFilter(ipNet, filter.ActionDeny)
testMA, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic")
Expect(err).ToNot(HaveOccurred())
Expect(filters.AddrBlocked(testMA)).To(BeTrue())

serverTransport, err := NewTransport(serverKey, nil, filters)
Expect(err).ToNot(HaveOccurred())
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())

// make sure that connection attempts fails
quicConfig.HandshakeTimeout = 250 * time.Millisecond
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())

// now allow the address and make sure the connection goes through
quicConfig.HandshakeTimeout = 2 * time.Second
filters.AddFilter(ipNet, filter.ActionAccept)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
conn.Close()
})

It("dials to two servers at the same time", func() {
serverID2, serverKey2 := createPeer()

serverTransport, err := NewTransport(serverKey, nil)
serverTransport, err := NewTransport(serverKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln1 := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
serverTransport2, err := NewTransport(serverKey2, nil)
defer ln1.Close()
serverTransport2, err := NewTransport(serverKey2, nil, nil)
Expect(err).ToNot(HaveOccurred())
ln2 := runServer(serverTransport2, "/ip4/127.0.0.1/udp/0/quic")
defer ln2.Close()
Expand All @@ -194,7 +229,7 @@ var _ = Describe("Connection", func() {
}
}()

clientTransport, err := NewTransport(clientKey, nil)
clientTransport, err := NewTransport(clientKey, nil, nil)
Expect(err).ToNot(HaveOccurred())
c1, err := clientTransport.Dial(context.Background(), ln1.Multiaddr(), serverID)
Expect(err).ToNot(HaveOccurred())
Expand Down
34 changes: 34 additions & 0 deletions p2p/transport/quic/filtered_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package libp2pquic

import (
"net"

filter "github.com/libp2p/go-maddr-filter"
)

type filteredConn struct {
net.PacketConn

filters *filter.Filters
}

func newFilteredConn(c net.PacketConn, filters *filter.Filters) net.PacketConn {
return &filteredConn{PacketConn: c, filters: filters}
}

func (c *filteredConn) ReadFrom(b []byte) (n int, addr net.Addr, rerr error) {
for {
n, addr, rerr = c.PacketConn.ReadFrom(b)
// Short Header packet, see https://tools.ietf.org/html/draft-ietf-quic-invariants-07#section-4.2.
if n < 1 || b[0]&0x80 == 0 {
return
}
maddr, err := toQuicMultiaddr(addr)
if err != nil {
panic(err)
}
if !c.filters.AddrBlocked(maddr) {
return
}
}
}
15 changes: 11 additions & 4 deletions p2p/transport/quic/libp2pquic_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
mrand "math/rand"
"runtime/pprof"
"strings"
"testing"
"time"

"github.com/lucas-clemente/quic-go"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestLibp2pQuicTransport(t *testing.T) {
Expand All @@ -22,8 +23,11 @@ var _ = BeforeSuite(func() {
mrand.Seed(GinkgoRandomSeed())
})

var garbageCollectIntervalOrig time.Duration
var maxUnusedDurationOrig time.Duration
var (
garbageCollectIntervalOrig time.Duration
maxUnusedDurationOrig time.Duration
origQuicConfig *quic.Config
)

func isGarbageCollectorRunning() bool {
var b bytes.Buffer
Expand All @@ -37,10 +41,13 @@ var _ = BeforeEach(func() {
maxUnusedDurationOrig = maxUnusedDuration
garbageCollectInterval = 50 * time.Millisecond
maxUnusedDuration = 0
origQuicConfig = quicConfig
quicConfig = quicConfig.Clone()
})

var _ = AfterEach(func() {
Eventually(isGarbageCollectorRunning).Should(BeFalse())
garbageCollectInterval = garbageCollectIntervalOrig
maxUnusedDuration = maxUnusedDurationOrig
quicConfig = origQuicConfig
})
2 changes: 1 addition & 1 deletion p2p/transport/quic/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("Listener", func() {
Expect(err).ToNot(HaveOccurred())
key, err := ic.UnmarshalRsaPrivateKey(x509.MarshalPKCS1PrivateKey(rsaKey))
Expect(err).ToNot(HaveOccurred())
t, err = NewTransport(key, nil)
t, err = NewTransport(key, nil, nil)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
16 changes: 12 additions & 4 deletions p2p/transport/quic/reuse_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net"
"sync"
"time"

filter "github.com/libp2p/go-maddr-filter"
)

// Constant. Defined as variables to simplify testing.
Expand All @@ -20,7 +22,10 @@ type reuseConn struct {
unusedSince time.Time
}

func newReuseConn(conn net.PacketConn) *reuseConn {
func newReuseConn(conn net.PacketConn, filters *filter.Filters) *reuseConn {
if filters != nil {
conn = newFilteredConn(conn, filters)
}
return &reuseConn{PacketConn: conn}
}

Expand Down Expand Up @@ -49,15 +54,18 @@ func (c *reuseConn) ShouldGarbageCollect(now time.Time) bool {
type reuseBase struct {
mutex sync.Mutex

filters *filter.Filters

garbageCollectorRunning bool

unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
}

func newReuseBase() reuseBase {
func newReuseBase(filters *filter.Filters) reuseBase {
return reuseBase{
filters: filters,
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
}
Expand Down Expand Up @@ -139,7 +147,7 @@ func (r *reuseBase) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP)
if err != nil {
return nil, err
}
rconn := newReuseConn(conn)
rconn := newReuseConn(conn, r.filters)
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
return rconn, nil
}
Expand All @@ -151,7 +159,7 @@ func (r *reuseBase) Listen(network string, laddr *net.UDPAddr) (*reuseConn, erro
}
localAddr := conn.LocalAddr().(*net.UDPAddr)

rconn := newReuseConn(conn)
rconn := newReuseConn(conn, r.filters)
rconn.IncreaseCount()

r.mutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/quic/reuse_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Reuse (on Linux)", func() {

BeforeEach(func() {
var err error
reuse, err = newReuse()
reuse, err = newReuse(nil)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
6 changes: 4 additions & 2 deletions p2p/transport/quic/reuse_not_win.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package libp2pquic
import (
"net"

filter "github.com/libp2p/go-maddr-filter"

"github.com/vishvananda/netlink"
)

Expand All @@ -14,15 +16,15 @@ type reuse struct {
handle *netlink.Handle // Only set on Linux. nil on other systems.
}

func newReuse() (*reuse, error) {
func newReuse(filters *filter.Filters) (*reuse, error) {
handle, err := netlink.NewHandle(SupportedNlFamilies...)
if err == netlink.ErrNotImplemented {
handle = nil
} else if err != nil {
return nil, err
}
return &reuse{
reuseBase: newReuseBase(),
reuseBase: newReuseBase(filters),
handle: handle,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/quic/reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("Reuse", func() {

BeforeEach(func() {
var err error
reuse, err = newReuse()
reuse, err = newReuse(nil)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
10 changes: 7 additions & 3 deletions p2p/transport/quic/reuse_win.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

package libp2pquic

import "net"
import (
"net"

filter "github.com/libp2p/go-maddr-filter"
)

type reuse struct {
reuseBase
}

func newReuse() (*reuse, error) {
return &reuse{reuseBase: newReuseBase()}, nil
func newReuse(filters *filter.Filters) (*reuse, error) {
return &reuse{reuseBase: newReuseBase(filters)}, nil
}

func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
Expand Down
11 changes: 6 additions & 5 deletions p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/pnet"
tpt "github.com/libp2p/go-libp2p-core/transport"
p2ptls "github.com/libp2p/go-libp2p-tls"
filter "github.com/libp2p/go-maddr-filter"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
Expand All @@ -36,12 +37,12 @@ type connManager struct {
reuseUDP6 *reuse
}

func newConnManager() (*connManager, error) {
reuseUDP4, err := newReuse()
func newConnManager(filters *filter.Filters) (*connManager, error) {
reuseUDP4, err := newReuse(filters)
if err != nil {
return nil, err
}
reuseUDP6, err := newReuse()
reuseUDP6, err := newReuse(filters)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -89,7 +90,7 @@ type transport struct {
var _ tpt.Transport = &transport{}

// NewTransport creates a new QUIC transport
func NewTransport(key ic.PrivKey, psk pnet.PSK) (tpt.Transport, error) {
func NewTransport(key ic.PrivKey, psk pnet.PSK, filters *filter.Filters) (tpt.Transport, error) {
if len(psk) > 0 {
log.Error("QUIC doesn't support private networks yet.")
return nil, errors.New("QUIC doesn't support private networks yet")
Expand All @@ -102,7 +103,7 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK) (tpt.Transport, error) {
if err != nil {
return nil, err
}
connManager, err := newConnManager()
connManager, err := newConnManager(filters)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 43458b9

Please sign in to comment.