Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat(wantlist): differentiate types
Browse files Browse the repository at this point in the history
Seperate want list into differentiated types - session tracking and regular

fix #13
  • Loading branch information
hannahhoward committed Feb 20, 2019
1 parent a273438 commit 0d996be
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 161 deletions.
4 changes: 2 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx)
Expand Down
185 changes: 101 additions & 84 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package messagequeue

import (
"context"
"sync"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -23,32 +22,48 @@ type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

type request interface {
handle(mq *MessageQueue)
}

// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
p peer.ID

outlk sync.Mutex
out bsmsg.BitSwapMessage
ctx context.Context
p peer.ID
network MessageNetwork
wl *wantlist.ThreadSafe

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
newRequests chan request
outgoingMessages chan bsmsg.BitSwapMessage
done chan struct{}

// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
sender bsnet.MessageSender
}

type messageRequest struct {
entries []*bsmsg.Entry
ses uint64
}

type wantlistRequest struct {
wl *wantlist.SessionTrackedWantlist
}

// New creats a new MessageQueue.
func New(p peer.ID, network MessageNetwork) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
refcnt: 1,
newRequests: make(chan request, 16),
outgoingMessages: make(chan bsmsg.BitSwapMessage),
done: make(chan struct{}),
}
}

Expand All @@ -71,58 +86,55 @@ func (mq *MessageQueue) RefDecrement() bool {

// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
return
}
select {
case mq.work <- struct{}{}:
default:
case mq.newRequests <- &messageRequest{entries, ses}:
case <-mq.ctx.Done():
}
}

// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
if len(initialEntries) > 0 {
if mq.out == nil {
mq.out = bsmsg.New(false)
}

for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
mq.out.AddEntry(e.Cid, e.Priority)
}
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
wl := wantlist.NewSessionTrackedWantlist()
initialWants.CopyWants(wl)

select {
case mq.work <- struct{}{}:
default:
}
select {
case mq.newRequests <- &wantlistRequest{wl}:
case <-mq.ctx.Done():
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
func (mq *MessageQueue) Startup() {
go mq.runQueue()
go mq.sendMessages()
}

// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}

func (mq *MessageQueue) runQueue(ctx context.Context) {
func (mq *MessageQueue) runQueue() {
outgoingMessages := func() chan bsmsg.BitSwapMessage {
if mq.nextMessage == nil {
return nil
}
return mq.outgoingMessages
}

for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case newRequest := <-mq.newRequests:
newRequest.handle(mq)
case outgoingMessages() <- mq.nextMessage:
mq.nextMessage = nil
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
case <-mq.ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
Expand All @@ -131,72 +143,86 @@ func (mq *MessageQueue) runQueue(ctx context.Context) {
}
}

func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
var work bool
mq.outlk.Lock()
defer mq.outlk.Unlock()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
func (mr *messageRequest) handle(mq *MessageQueue) {
mq.addEntries(mr.entries, mr.ses)
}

func (wr *wantlistRequest) handle(mq *MessageQueue) {
initialWants := wr.wl
initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) {
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}

return work
}

func (mq *MessageQueue) doWork(ctx context.Context) {

wlm := mq.extractOutgoingMessage()
if wlm == nil || wlm.Empty() {
return
func (mq *MessageQueue) sendMessages() {
for {
select {
case nextMessage := <-mq.outgoingMessages:
mq.sendMessage(nextMessage)
case <-mq.done:
return
case <-mq.ctx.Done():
return
}
}
}

func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {

// NB: only open a stream if we actually have data to send
err := mq.initializeSender(ctx)
err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

// send wantlist updates
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(ctx, wlm) {
if mq.attemptSendAndRecovery(message) {
return
}
}
}

func (mq *MessageQueue) initializeSender(ctx context.Context) error {
func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}

func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(ctx, wlm)
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}
Expand All @@ -208,14 +234,14 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
select {
case <-mq.done:
return true
case <-ctx.Done():
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.initializeSender(ctx)
err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
Expand All @@ -235,15 +261,6 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
return false
}

func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
return wlm
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
Expand Down
15 changes: 7 additions & 8 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet
return fmn.messageSender, nil
}
return nil, fmn.messageSenderError

}

type fakeMessageSender struct {
Expand Down Expand Up @@ -77,12 +76,12 @@ func TestStartupAndShutdown(t *testing.T) {
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
messageQueue := New(ctx, peerID, fakenet)
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)

messageQueue.Startup(ctx)
messageQueue.AddWantlist(wl.Entries())
messageQueue.Startup()
messageQueue.AddWantlist(wl)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
Expand Down Expand Up @@ -119,11 +118,11 @@ func TestSendingMessagesDeduped(t *testing.T) {
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
messageQueue := New(ctx, peerID, fakenet)
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx)
messageQueue.Startup()

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2)
Expand All @@ -142,13 +141,13 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
messageQueue := New(ctx, peerID, fakenet)
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx)
messageQueue.Startup()

messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2)
Expand Down
Loading

0 comments on commit 0d996be

Please sign in to comment.