Skip to content

Commit

Permalink
Restructure to remove the need for callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Mar 13, 2019
1 parent f4fa878 commit 7669029
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
33 changes: 25 additions & 8 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,37 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, req *pb.Message) (_ *pb.Message, err error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, req *pb.Message) (*pb.Message, error) {
ps, _, err := dht.getPoolStream(ctx, p)
if err != nil {
return
return nil, err
}
start := time.Now()
reply, err := ps.request(ctx, req)
replyChan, err := ps.request(ctx, req)
if err != nil {
return
ps.reset()
return nil, err
}
onReply := func(reply *pb.Message) {
dht.putPoolStream(ps, p)
dht.updateFromMessage(ctx, p, reply)
dht.peerstore.RecordLatency(p, time.Since(start))
}
select {
case reply, ok := <-replyChan:
if !ok {
return nil, ps.err()
}
onReply(reply)
return reply, nil
case <-ctx.Done():
go func() {
if reply, ok := <-replyChan; ok {
onReply(reply)
}
}()
return nil, ctx.Err()
}
// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, reply)
dht.peerstore.RecordLatency(p, time.Since(start))
return reply, nil
}

func (dht *IpfsDHT) newStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
Expand Down
42 changes: 14 additions & 28 deletions stream_pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@ func (dht *IpfsDHT) newPoolStream(ctx context.Context, p peer.ID) (*poolStream,
r: pbio.NewDelimitedReader(s, inet.MessageSizeMax),
m: make(chan chan *pb.Message, 1),
}
ps.onReaderErr = func() {
ps.reset()
go func() {
ps.reader()
dht.deletePoolStream(ps, p)
}
ps.onReady = func() {
dht.putPoolStream(ps, p)
}
go ps.reader()
ps.reset()
}()
return ps, nil
}

Expand All @@ -81,10 +78,6 @@ type poolStream struct {
}
w bufferedWriteCloser
r pbio.ReadCloser
// Called when the reader dies.
onReaderErr func()
// Called after reading an expected message.
onReady func()

// Synchronizes m and readerErr.
mu sync.Mutex
Expand Down Expand Up @@ -112,10 +105,10 @@ func (me *poolStream) send(m *pb.Message) (err error) {
return nil
}

func (me *poolStream) request(ctx context.Context, req *pb.Message) (*pb.Message, error) {
func (me *poolStream) request(ctx context.Context, req *pb.Message) (<-chan *pb.Message, error) {
replyChan := make(chan *pb.Message, 1)
me.mu.Lock()
if err := me.err(); err != nil {
if err := me.errLocked(); err != nil {
me.mu.Unlock()
return nil, err
}
Expand All @@ -127,18 +120,7 @@ func (me *poolStream) request(ctx context.Context, req *pb.Message) (*pb.Message
}
me.mu.Unlock()
err := me.send(req)
if err != nil {
return nil, err
}
select {
case reply, ok := <-replyChan:
if !ok {
return nil, me.err()
}
return reply, nil
case <-ctx.Done():
return nil, xerrors.Errorf("while waiting for reply: %w", ctx.Err())
}
return replyChan, err
}

// Handles the error returned from the read loop.
Expand All @@ -148,7 +130,6 @@ func (me *poolStream) reader() {
me.readerErr = err
close(me.m)
me.mu.Unlock()
me.onReaderErr()
for mc := range me.m {
close(mc)
}
Expand All @@ -165,15 +146,20 @@ func (me *poolStream) readLoop() error {
select {
case mc := <-me.m:
mc <- &m
me.onReady()
default:
return xerrors.New("read superfluous message")
}
}
}

// A stream has gone bad when the reader has given up.
func (me *poolStream) err() error {
me.mu.Lock()
defer me.mu.Unlock()
return me.errLocked()
}

// A stream has gone bad when the reader has given up.
func (me *poolStream) errLocked() error {
if me.readerErr != nil {
return xerrors.Errorf("reader: %w", me.readerErr)
}
Expand Down

0 comments on commit 7669029

Please sign in to comment.