Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#74 from ipfs/feat/differentiate_wa…
Browse files Browse the repository at this point in the history
…ntList

More specific wantlists

This commit was moved from ipfs/go-bitswap@ab7ddf0
  • Loading branch information
Stebalien authored Feb 20, 2019
2 parents 386036d + 81c5613 commit 477e735
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 167 deletions.
4 changes: 2 additions & 2 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx)
Expand Down
21 changes: 13 additions & 8 deletions bitswap/decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
defer partner.activelk.Unlock()

var priority int
newEntries := make([]*wantlist.Entry, 0, len(entries))
newEntries := make([]*peerRequestTaskEntry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
Expand All @@ -75,7 +75,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if entry.Priority > priority {
priority = entry.Priority
}
newEntries = append(newEntries, entry)
newEntries = append(newEntries, &peerRequestTaskEntry{entry, false})
}

if len(newEntries) == 0 {
Expand All @@ -86,7 +86,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func(e []*wantlist.Entry) {
Done: func(e []*peerRequestTaskEntry) {
tl.lock.Lock()
for _, entry := range e {
partner.TaskDone(entry.Cid)
Expand Down Expand Up @@ -117,10 +117,10 @@ func (tl *prq) Pop() *peerRequestTask {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)

newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
newEntries := make([]*peerRequestTaskEntry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.Trash {
if entry.trash {
continue
}
partner.requests--
Expand Down Expand Up @@ -150,7 +150,7 @@ func (tl *prq) Remove(k cid.Cid, p peer.ID) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.Trash = true
entry.trash = true
break
}
}
Expand Down Expand Up @@ -197,13 +197,18 @@ func (tl *prq) thawRound() {
}
}

type peerRequestTaskEntry struct {
*wantlist.Entry
// trash in a book-keeping field
trash bool
}
type peerRequestTask struct {
Entries []*wantlist.Entry
Entries []*peerRequestTaskEntry
Priority int
Target peer.ID

// A callback to signal that this task has been completed
Done func([]*wantlist.Entry)
Done func([]*peerRequestTaskEntry)

// created marks the time that the task was added to the queue
created time.Time
Expand Down
181 changes: 99 additions & 82 deletions bitswap/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package messagequeue

import (
"context"
"sync"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -23,86 +22,99 @@ type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

type request interface {
handle(mq *MessageQueue)
}

// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
p peer.ID

outlk sync.Mutex
out bsmsg.BitSwapMessage
ctx context.Context
p peer.ID
network MessageNetwork
wl *wantlist.ThreadSafe

sender bsnet.MessageSender
newRequests chan request
outgoingMessages chan bsmsg.BitSwapMessage
done chan struct{}

// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
sender bsnet.MessageSender
}

type messageRequest struct {
entries []*bsmsg.Entry
ses uint64
}

work chan struct{}
done chan struct{}
type wantlistRequest struct {
wl *wantlist.SessionTrackedWantlist
}

// New creats a new MessageQueue.
func New(p peer.ID, network MessageNetwork) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
newRequests: make(chan request, 16),
outgoingMessages: make(chan bsmsg.BitSwapMessage),
done: make(chan struct{}),
}
}

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
return
}
select {
case mq.work <- struct{}{}:
default:
case mq.newRequests <- &messageRequest{entries, ses}:
case <-mq.ctx.Done():
}
}

// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
if len(initialEntries) > 0 {
if mq.out == nil {
mq.out = bsmsg.New(false)
}
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
wl := wantlist.NewSessionTrackedWantlist()
initialWants.CopyWants(wl)

for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
mq.out.AddEntry(e.Cid, e.Priority)
}

select {
case mq.work <- struct{}{}:
default:
}
select {
case mq.newRequests <- &wantlistRequest{wl}:
case <-mq.ctx.Done():
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
func (mq *MessageQueue) Startup() {
go mq.runQueue()
go mq.sendMessages()
}

// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}

func (mq *MessageQueue) runQueue(ctx context.Context) {
func (mq *MessageQueue) runQueue() {
outgoingMessages := func() chan bsmsg.BitSwapMessage {
if mq.nextMessage == nil {
return nil
}
return mq.outgoingMessages
}

for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case newRequest := <-mq.newRequests:
newRequest.handle(mq)
case outgoingMessages() <- mq.nextMessage:
mq.nextMessage = nil
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
case <-mq.ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
Expand All @@ -111,72 +123,86 @@ func (mq *MessageQueue) runQueue(ctx context.Context) {
}
}

func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
var work bool
mq.outlk.Lock()
defer mq.outlk.Unlock()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
func (mr *messageRequest) handle(mq *MessageQueue) {
mq.addEntries(mr.entries, mr.ses)
}

func (wr *wantlistRequest) handle(mq *MessageQueue) {
initialWants := wr.wl
initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) {
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

return work
}

func (mq *MessageQueue) doWork(ctx context.Context) {

wlm := mq.extractOutgoingMessage()
if wlm == nil || wlm.Empty() {
return
func (mq *MessageQueue) sendMessages() {
for {
select {
case nextMessage := <-mq.outgoingMessages:
mq.sendMessage(nextMessage)
case <-mq.done:
return
case <-mq.ctx.Done():
return
}
}
}

func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {

// NB: only open a stream if we actually have data to send
err := mq.initializeSender(ctx)
err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

// send wantlist updates
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(ctx, wlm) {
if mq.attemptSendAndRecovery(message) {
return
}
}
}

func (mq *MessageQueue) initializeSender(ctx context.Context) error {
func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}

func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(ctx, wlm)
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}
Expand All @@ -188,14 +214,14 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
select {
case <-mq.done:
return true
case <-ctx.Done():
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.initializeSender(ctx)
err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
Expand All @@ -215,15 +241,6 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
return false
}

func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
return wlm
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
Expand Down
Loading

0 comments on commit 477e735

Please sign in to comment.