diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 0091724fff32..29f7b9469499 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -137,36 +137,49 @@ func (mq *msgQueue) runQueue(ctx context.Context) { for { select { case <-mq.work: // there is work to be done - - err := mq.network.ConnectTo(ctx, mq.p) - if err != nil { - log.Noticef("cant connect to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - continue - } - - // grab outgoing message - mq.outlk.Lock() - wlm := mq.out - if wlm == nil || wlm.Empty() { - mq.outlk.Unlock() - continue - } - mq.out = nil - mq.outlk.Unlock() - - // send wantlist updates - err = mq.network.SendMessage(ctx, mq.p, wlm) - if err != nil { - log.Noticef("bitswap send error: %s", err) - // TODO: what do we do if this fails? - } + mq.doWork(ctx) case <-mq.done: return } } } +func (mq *msgQueue) doWork(ctx context.Context) { + // allow a minute for connections + // this includes looking them up in the dht + // dialing them, and handshaking + conctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + err := mq.network.ConnectTo(conctx, mq.p) + if err != nil { + log.Noticef("cant connect to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return + } + + // grab outgoing message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() + + if wlm == nil || wlm.Empty() { + return + } + + sendctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + // send wantlist updates + err = mq.network.SendMessage(sendctx, mq.p, wlm) + if err != nil { + log.Noticef("bitswap send error: %s", err) + // TODO: what do we do if this fails? + return + } +} + func (pm *WantManager) Connected(p peer.ID) { pm.connect <- p }