Skip to content

Commit

Permalink
Merge pull request #149 from ersonp/fix/close-dmsghttp-stream-defer
Browse files Browse the repository at this point in the history
Close stream with defer
  • Loading branch information
jdknives authored Jan 25, 2022
2 parents f62bd46 + 0dd2aa9 commit 1dfce6e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
2 changes: 1 addition & 1 deletion dmsgget/dmsgget.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (dg *DmsgGet) Run(ctx context.Context, log *logging.Logger, skStr string, a
}
defer closeDmsg()

httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)}
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)}

for i := 0; i < dg.dlF.Tries; i++ {
log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries)
Expand Down
5 changes: 4 additions & 1 deletion dmsgget/dmsgget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,8 @@ func newHTTPClient(t *testing.T, dc disc.APIClient) *http.Client {
t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) })
<-dmsgC.Ready()

return &http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)}
log := logging.MustGetLogger("http_client")
ctx, cancel := cmdutil.SignalContext(context.Background(), log)
defer cancel()
return &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)}
}
8 changes: 7 additions & 1 deletion dmsghttp/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/cmdutil"
"github.com/skycoin/dmsg/disc"
"github.com/skycoin/dmsg/dmsghttp"

"github.com/skycoin/skycoin/src/util/logging"
)

func ExampleMakeHTTPTransport() {
Expand Down Expand Up @@ -87,8 +90,11 @@ func ExampleMakeHTTPTransport() {
go dmsgC2.Serve(context.Background())
<-dmsgC2.Ready()

log := logging.MustGetLogger("http_client")
ctx, cancel := cmdutil.SignalContext(context.Background(), log)
defer cancel()
// Run HTTP client.
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC2)}
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC2)}
resp, err := httpC.Get(fmt.Sprintf("http://%s:%d/", c1PK.String(), dmsgHTTPPort))
if err != nil {
panic(err)
Expand Down
52 changes: 45 additions & 7 deletions dmsghttp/http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package dmsghttp

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/skycoin/dmsg"
)
Expand All @@ -13,12 +17,16 @@ const defaultHTTPPort = uint16(80)
// HTTPTransport implements http.RoundTripper
// Do not confuse this with a Skywire Transport implementation.
type HTTPTransport struct {
ctx context.Context
dmsgC *dmsg.Client
}

// MakeHTTPTransport makes an HTTPTransport.
func MakeHTTPTransport(dmsgC *dmsg.Client) HTTPTransport {
return HTTPTransport{dmsgC: dmsgC}
func MakeHTTPTransport(ctx context.Context, dmsgC *dmsg.Client) HTTPTransport {
return HTTPTransport{
ctx: ctx,
dmsgC: dmsgC,
}
}

// RoundTrip implements golang's http package support for alternative HTTP transport protocols.
Expand All @@ -32,17 +40,47 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) {
hostAddr.Port = defaultHTTPPort
}

// TODO(evanlinjin): In the future, we should implement stream reuse to save bandwidth.
// We do not close the stream here as it is the user's responsibility to close the stream after resp.Body is fully
// read.
stream, err := t.dmsgC.DialStream(req.Context(), hostAddr)
if err != nil {
return nil, err
}

if err := req.Write(stream); err != nil {
return nil, err
}
bufR := bufio.NewReader(stream)
return http.ReadResponse(bufR, req)
resp, err := http.ReadResponse(bufR, req)
if err != nil {
return nil, err
}

defer func() {
go closeStream(t.ctx, resp, stream)
}()

return resp, nil
}

func closeStream(ctx context.Context, resp *http.Response, stream *dmsg.Stream) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, err := resp.Body.Read(nil)
log := stream.Logger()
// If error is not nil and is equal to ErrBodyReadAfterClose or EOF
// then it means that the body has been closed so we close the stream
if err != nil && (errors.Is(err, http.ErrBodyReadAfterClose) || errors.Is(err, io.EOF)) {
err := stream.Close()
if err != nil {
log.Warnf("Error closing stream: %v", err)
}
return
}
}
}

}
15 changes: 7 additions & 8 deletions dmsghttp/http_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/skycoin/src/util/logging"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/cmdutil"
"github.com/skycoin/dmsg/disc"
)

Expand All @@ -25,7 +25,6 @@ func TestHTTPTransport_RoundTrip(t *testing.T) {
nSrvs = 5
minSessions = 3
maxSessions = 20
timeout = time.Second * 5
)

// Ensure HTTP request/response works.
Expand Down Expand Up @@ -63,13 +62,13 @@ func TestHTTPTransport_RoundTrip(t *testing.T) {
startHTTPServer(t, server0Results, lis)
addr := lis.Addr().String()

log := logging.MustGetLogger("http_client")
ctx, cancel := cmdutil.SignalContext(context.Background(), log)
defer cancel()
// Arrange: create http clients (in which each http client has an underlying dmsg client).
httpC1 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client1"))}
httpC2 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client2"))}
httpC3 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client3"))}
httpC1.Timeout = timeout
httpC2.Timeout = timeout
httpC3.Timeout = timeout
httpC1 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client1"))}
httpC2 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client2"))}
httpC3 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client3"))}

// Act: http clients send requests concurrently.
// - client1 sends "/index.html" requests.
Expand Down

0 comments on commit 1dfce6e

Please sign in to comment.