diff --git a/exchange/bitswap/testnet/peernet.go b/exchange/bitswap/testnet/peernet.go index ef152172e61..93429ef4edd 100644 --- a/exchange/bitswap/testnet/peernet.go +++ b/exchange/bitswap/testnet/peernet.go @@ -1,7 +1,7 @@ package bitswap import ( - context "context" + "context" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" mockrouting "github.com/ipfs/go-ipfs/routing/mock" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" @@ -37,4 +37,4 @@ func (pn *peernet) HasPeer(p peer.ID) bool { return false } -var _ Network = &peernet{} +var _ Network = (*peernet)(nil) diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index c41edb554b4..133ea395da7 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -9,16 +9,21 @@ import ( mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" + routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" ) +var log = logging.Logger("bstestnet") + func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ clients: make(map[peer.ID]bsnet.Receiver), delay: d, routingserver: rs, + conns: make(map[string]struct{}), } } @@ -26,6 +31,7 @@ type network struct { clients map[peer.ID]bsnet.Receiver routingserver mockrouting.Server delay delay.D + conns map[string]struct{} } func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { @@ -149,7 +155,22 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { if !nc.network.HasPeer(p) { return errors.New("no such peer in network") } + tag := tagForPeers(nc.local, p) + if _, ok := nc.network.conns[tag]; ok { + log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)") + return nil + } + nc.network.conns[tag] = struct{}{} + // TODO: add handling for disconnects + nc.network.clients[p].PeerConnected(nc.local) nc.Receiver.PeerConnected(p) return nil } + +func tagForPeers(a, b peer.ID) string { + if a < b { + return string(a + b) + } + return string(b + a) +} diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 1b19bdd47df..1b1fcf20a13 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -59,7 +59,7 @@ func (g *SessionGenerator) Instances(n int) []Instance { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - inst.Exchange.PeerConnected(oinst.Peer) + inst.Exchange.network.ConnectTo(context.Background(), oinst.Peer) } } return instances diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 780282a740f..4ae12f4998a 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -187,15 +187,6 @@ func (mq *msgQueue) runQueue(ctx context.Context) { } func (mq *msgQueue) doWork(ctx context.Context) { - if mq.sender == nil { - err := mq.openSender(ctx) - if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return - } - } - // grab outgoing message mq.outlk.Lock() wlm := mq.out @@ -206,6 +197,16 @@ func (mq *msgQueue) doWork(ctx context.Context) { mq.out = nil mq.outlk.Unlock() + // NB: only open a stream if we actually have data to send + if mq.sender == nil { + err := mq.openSender(ctx) + if err != nil { + log.Infof("cant open message sender to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return + } + } + // send wantlist updates for { // try to send this message until we fail. err := mq.sender.SendMsg(ctx, wlm)