Skip to content

Commit

Permalink
swarm: change maps with multiaddress keys to use strings (#2284)
Browse files Browse the repository at this point in the history
* swarm: change maps with multiaddress keys to use strings

* fix test

* fix more flakiness
  • Loading branch information
sukunrt authored and marten-seemann committed May 30, 2023
1 parent 6926113 commit 40978ee
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 17 deletions.
6 changes: 3 additions & 3 deletions p2p/host/routed/routed.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@ func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
}

// Build lookup map
lookup := make(map[ma.Multiaddr]struct{}, len(addrs))
lookup := make(map[string]struct{}, len(addrs))
for _, addr := range addrs {
lookup[addr] = struct{}{}
lookup[string(addr.Bytes())] = struct{}{}
}

// if there's any address that's not in the previous set
// of addresses, try to connect again. If all addresses
// where known previously we return the original error.
for _, newAddr := range newAddrs {
if _, found := lookup[newAddr]; found {
if _, found := lookup[string(newAddr.Bytes())]; found {
continue
}

Expand Down
28 changes: 14 additions & 14 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type dialResponse struct {
}

type pendRequest struct {
req dialRequest // the original request
err *DialError // dial error accumulator
addrs map[ma.Multiaddr]struct{} // pending addr dials
req dialRequest // the original request
err *DialError // dial error accumulator
addrs map[string]struct{} // pending address to dial. The key is a multiaddr
}

type addrDial struct {
Expand All @@ -47,7 +47,7 @@ type dialWorker struct {
reqch <-chan dialRequest
reqno int
requests map[int]*pendRequest
pending map[ma.Multiaddr]*addrDial
pending map[string]*addrDial // pending addresses to dial. The key is a multiaddr
resch chan dialResult

connected bool // true when a connection has been successfully established
Expand All @@ -67,7 +67,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest) *dialWorker {
peer: p,
reqch: reqch,
requests: make(map[int]*pendRequest),
pending: make(map[ma.Multiaddr]*addrDial),
pending: make(map[string]*addrDial),
resch: make(chan dialResult),
}
}
Expand Down Expand Up @@ -109,10 +109,10 @@ loop:
pr := &pendRequest{
req: req,
err: &DialError{Peer: w.peer},
addrs: make(map[ma.Multiaddr]struct{}),
addrs: make(map[string]struct{}),
}
for _, a := range addrs {
pr.addrs[a] = struct{}{}
pr.addrs[string(a.Bytes())] = struct{}{}
}

// check if any of the addrs has been successfully dialed and accumulate
Expand All @@ -121,7 +121,7 @@ loop:
var tojoin []*addrDial

for _, a := range addrs {
ad, ok := w.pending[a]
ad, ok := w.pending[string(a.Bytes())]
if !ok {
todial = append(todial, a)
continue
Expand All @@ -136,7 +136,7 @@ loop:
if ad.err != nil {
// dial to this addr errored, accumulate the error
pr.err.recordErr(a, ad.err)
delete(pr.addrs, a)
delete(pr.addrs, string(a.Bytes()))
continue
}

Expand Down Expand Up @@ -167,7 +167,7 @@ loop:

if len(todial) > 0 {
for _, a := range todial {
w.pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}}
w.pending[string(a.Bytes())] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}}
}

w.nextDial = append(w.nextDial, todial...)
Expand All @@ -180,7 +180,7 @@ loop:
case <-w.triggerDial:
for _, addr := range w.nextDial {
// spawn the dial
ad := w.pending[addr]
ad := w.pending[string(addr.Bytes())]
err := w.s.dialNextAddr(ad.ctx, w.peer, addr, w.resch)
if err != nil {
w.dispatchError(ad, err)
Expand All @@ -195,7 +195,7 @@ loop:
w.connected = true
}

ad := w.pending[res.Addr]
ad := w.pending[string(res.Addr.Bytes())]

if res.Conn != nil {
// we got a connection, add it to the swarm
Expand Down Expand Up @@ -250,7 +250,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
// accumulate the error
pr.err.recordErr(ad.addr, err)

delete(pr.addrs, ad.addr)
delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
Expand All @@ -274,7 +274,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
// it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff
// regresses without this.
if err == ErrDialBackoff {
delete(w.pending, ad.addr)
delete(w.pending, string(ad.addr.Bytes()))
}
}

Expand Down
66 changes: 66 additions & 0 deletions p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -342,3 +343,68 @@ func TestDialWorkerLoopConcurrentFailureStress(t *testing.T) {
close(reqch)
worker.wg.Wait()
}

func TestDialWorkerLoopAddrDedup(t *testing.T) {
s1 := makeSwarm(t)
s2 := makeSwarm(t)
defer s1.Close()
defer s2.Close()
t1 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000))
t2 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000))

// acceptAndClose accepts a connection and closes it
acceptAndClose := func(a ma.Multiaddr, ch chan struct{}, closech chan struct{}) {
list, err := manet.Listen(a)
if err != nil {
t.Error(err)
return
}
go func() {
ch <- struct{}{}
for {
conn, err := list.Accept()
if err != nil {
return
}
ch <- struct{}{}
conn.Close()
}
}()
<-closech
list.Close()
}
ch := make(chan struct{}, 1)
closeCh := make(chan struct{})
go acceptAndClose(t1, ch, closeCh)
defer close(closeCh)
<-ch // the routine has started listening on addr

s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t1}, peerstore.PermanentAddrTTL)

reqch := make(chan dialRequest)
resch := make(chan dialResponse, 2)

worker := newDialWorker(s1, s2.LocalPeer(), reqch)
go worker.loop()
defer worker.wg.Wait()
defer close(reqch)

reqch <- dialRequest{ctx: context.Background(), resch: resch}
<-ch
<-resch
// Need to clear backoff otherwise the dial attempt would not be made
s1.Backoff().Clear(s2.LocalPeer())

s1.Peerstore().ClearAddrs(s2.LocalPeer())
s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t2}, peerstore.PermanentAddrTTL)

reqch <- dialRequest{ctx: context.Background(), resch: resch}
select {
case r := <-resch:
require.Error(t, r.err)
case <-ch:
t.Errorf("didn't expect a connection attempt")
case <-time.After(5 * time.Second):
t.Errorf("expected a fail response")
}
}

0 comments on commit 40978ee

Please sign in to comment.