Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream pooling #271

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ type IpfsDHT struct {
ctx context.Context
proc goprocess.Process

strmap map[peer.ID]*messageSender
smlk sync.Mutex

plk sync.Mutex
streamPoolMu sync.Mutex
streamPool map[peer.ID]map[*poolStream]struct{}

plk sync.Mutex
protocols []protocol.ID // DHT protocols
client bool

client bool
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -145,12 +145,12 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
streamPool: make(map[peer.ID]map[*poolStream]struct{}),
}
}

Expand Down
12 changes: 4 additions & 8 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,11 @@ func (dht *IpfsDHT) randomWalk(ctx context.Context) error {

// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
bslog := func(msg string) {
logger.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
}
bslog("start")
defer bslog("end")
defer logger.EventBegin(ctx, "dhtRunBootstrap").Done()

doQuery := func(n int, target string, f func(context.Context) error) error {
logger.Infof("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target)
logger.Infof("starting bootstrap query (%d/%d) to %s (rt_len=%d)", n, cfg.Queries, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished bootstrap query (%d/%d) to %s (rt_len=%d)", n, cfg.Queries, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel()
err := f(queryCtx)
Expand Down
250 changes: 24 additions & 226 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"fmt"
"io"
"sync"
"log"
"time"

ggio "github.com/gogo/protobuf/io"
Expand Down Expand Up @@ -112,40 +112,41 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {

ms, err := dht.messageSenderForPeer(p)
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, req *pb.Message) (_ *pb.Message, err error) {
defer func(started time.Time) {
log.Printf("time taken to send request: %v: err=%v", time.Since(started), err)
}(time.Now())
ps, err := dht.getPoolStream(ctx, p)
if err != nil {
return nil, err
return
}

defer dht.putPoolStream(ps, p)
start := time.Now()

rpmes, err := ms.SendRequest(ctx, pmes)
reply, err := ps.request(ctx, req)
if err != nil {
return nil, err
return
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

dht.updateFromMessage(ctx, p, reply)
dht.peerstore.RecordLatency(p, time.Since(start))
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
return rpmes, nil
return reply, nil
}

func (dht *IpfsDHT) newStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
return dht.host.NewStream(ctx, p, dht.protocols...)
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ms, err := dht.messageSenderForPeer(p)
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) (err error) {
defer func(started time.Time) {
log.Printf("time taken to send message: %v: err=%v", time.Since(started), err)
}(time.Now())
ps, err := dht.getPoolStream(ctx, p)
if err != nil {
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
return err
return
}
logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
defer dht.putPoolStream(ps, p)
return ps.send(pmes)
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
Expand All @@ -156,206 +157,3 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
}
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
if ok {
dht.smlk.Unlock()
return ms, nil
}
ms = &messageSender{p: p, dht: dht}
dht.strmap[p] = ms
dht.smlk.Unlock()

if err := ms.prepOrInvalidate(); err != nil {
dht.smlk.Lock()
defer dht.smlk.Unlock()

if msCur, ok := dht.strmap[p]; ok {
// Changed. Use the new one, old one is invalid and
// not in the map so we can just throw it away.
if ms != msCur {
return msCur, nil
}
// Not changed, remove the now invalid stream from the
// map.
delete(dht.strmap, p)
}
// Invalid but not in map. Must have been removed by a disconnect.
return nil, err
}
// All ready to go.
return ms, nil
}

type messageSender struct {
s inet.Stream
r ggio.ReadCloser
w bufferedWriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT

invalid bool
singleMes int
}

// invalidate is called before this messageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then
// forgotten (leaving the stream open).
func (ms *messageSender) invalidate() {
ms.invalid = true
if ms.s != nil {
ms.s.Reset()
ms.s = nil
}
}

func (ms *messageSender) prepOrInvalidate() error {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
ms.invalidate()
return err
}
return nil
}

func (ms *messageSender) prep() error {
if ms.invalid {
return fmt.Errorf("message sender has been invalidated")
}
if ms.s != nil {
return nil
}

nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
}

ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.w = newBufferedDelimitedWriter(nstr)
ms.s = nstr

return nil
}

// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
for {
if err := ms.prep(); err != nil {
return err
}

if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

if retry {
logger.Info("error writing message, bailing: ", err)
return err
} else {
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
}

logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
}

return nil
}
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
for {
if err := ms.prep(); err != nil {
return nil, err
}

if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

if retry {
logger.Info("error writing message, bailing: ", err)
return nil, err
} else {
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
}

mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
ms.s = nil

if retry {
logger.Info("error reading message, bailing: ", err)
return nil, err
} else {
logger.Info("error reading message, trying again: ", err)
retry = true
continue
}
}

logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
}

return mes, nil
}
}

func (ms *messageSender) writeMsg(pmes *pb.Message) error {
if err := ms.w.WriteMsg(pmes); err != nil {
return err
}
return ms.w.Flush()
}

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r ggio.ReadCloser) {
errc <- r.ReadMsg(mes)
}(ms.r)

t := time.NewTimer(dhtReadMessageTimeout)
defer t.Stop()

select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
case <-t.C:
return ErrReadTimeout
}
}
22 changes: 0 additions & 22 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,28 +458,6 @@ func TestValueGetInvalid(t *testing.T) {
testSetGet("valid", "newer", nil)
}

func TestInvalidMessageSenderTracking(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dht := setupDHT(ctx, t, false)
defer dht.Close()

foo := peer.ID("asdasd")
_, err := dht.messageSenderForPeer(foo)
if err == nil {
t.Fatal("that shouldnt have succeeded")
}

dht.smlk.Lock()
mscnt := len(dht.strmap)
dht.smlk.Unlock()

if mscnt > 0 {
t.Fatal("should have no message senders in map")
}
}

func TestProvides(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx, cancel := context.WithCancel(context.Background())
Expand Down
15 changes: 0 additions & 15 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,6 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
}

dht.routingTable.Remove(p)

dht.smlk.Lock()
defer dht.smlk.Unlock()
ms, ok := dht.strmap[p]
if !ok {
return
}
delete(dht.strmap, p)

// Do this asynchronously as ms.lk can block for a while.
go func() {
ms.lk.Lock()
defer ms.lk.Unlock()
ms.invalidate()
}()
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down
Loading