From 4feab7fbb6399177bcac3f5559a842626eef2825 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 17 Jan 2022 20:32:02 +0530 Subject: [PATCH 1/4] Close stream with defer This commit adds a way to close the stream of dmsg via a defer statement with a go routine in it inside the fun RoundTrip(). The goroutine checks if the response body is closed every one second and if it is closed then we clsoe the stream if not then we keep checking till it's closed or if context is closed. --- dmsgget/dmsgget.go | 2 +- dmsgget/dmsgget_test.go | 5 +++- dmsghttp/examples_test.go | 7 ++++- dmsghttp/http_transport.go | 46 ++++++++++++++++++++++++++++++--- dmsghttp/http_transport_test.go | 10 ++++--- 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/dmsgget/dmsgget.go b/dmsgget/dmsgget.go index 8df7a9a6f..f37080761 100644 --- a/dmsgget/dmsgget.go +++ b/dmsgget/dmsgget.go @@ -110,7 +110,7 @@ func (dg *DmsgGet) Run(ctx context.Context, log *logging.Logger, skStr string, a } defer closeDmsg() - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)} + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} for i := 0; i < dg.dlF.Tries; i++ { log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries) diff --git a/dmsgget/dmsgget_test.go b/dmsgget/dmsgget_test.go index 5139f778d..a841b282d 100644 --- a/dmsgget/dmsgget_test.go +++ b/dmsgget/dmsgget_test.go @@ -172,5 +172,8 @@ func newHTTPClient(t *testing.T, dc disc.APIClient) *http.Client { t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) <-dmsgC.Ready() - return &http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)} + log := logging.MustGetLogger(fmt.Sprintf("http_client")) + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + defer cancel() + return &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} } diff --git a/dmsghttp/examples_test.go b/dmsghttp/examples_test.go index aed628778..d8c47a9d2 100644 --- a/dmsghttp/examples_test.go +++ b/dmsghttp/examples_test.go @@ -12,8 +12,10 @@ import ( "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/cmdutil" "github.com/skycoin/dmsg/disc" "github.com/skycoin/dmsg/dmsghttp" + "github.com/skycoin/skycoin/src/util/logging" ) func ExampleMakeHTTPTransport() { @@ -87,8 +89,11 @@ func ExampleMakeHTTPTransport() { go dmsgC2.Serve(context.Background()) <-dmsgC2.Ready() + log := logging.MustGetLogger(fmt.Sprintf("http_client")) + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + defer cancel() // Run HTTP client. - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC2)} + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC2)} resp, err := httpC.Get(fmt.Sprintf("http://%s:%d/", c1PK.String(), dmsgHTTPPort)) if err != nil { panic(err) diff --git a/dmsghttp/http_transport.go b/dmsghttp/http_transport.go index 4ae90ac98..331a16282 100644 --- a/dmsghttp/http_transport.go +++ b/dmsghttp/http_transport.go @@ -2,8 +2,10 @@ package dmsghttp import ( "bufio" + "context" "fmt" "net/http" + "time" "github.com/skycoin/dmsg" ) @@ -13,12 +15,16 @@ const defaultHTTPPort = uint16(80) // HTTPTransport implements http.RoundTripper // Do not confuse this with a Skywire Transport implementation. type HTTPTransport struct { + ctx context.Context dmsgC *dmsg.Client } // MakeHTTPTransport makes an HTTPTransport. -func MakeHTTPTransport(dmsgC *dmsg.Client) HTTPTransport { - return HTTPTransport{dmsgC: dmsgC} +func MakeHTTPTransport(ctx context.Context, dmsgC *dmsg.Client) HTTPTransport { + return HTTPTransport{ + ctx: ctx, + dmsgC: dmsgC, + } } // RoundTrip implements golang's http package support for alternative HTTP transport protocols. @@ -39,10 +45,42 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { if err != nil { return nil, err } - if err := req.Write(stream); err != nil { return nil, err } bufR := bufio.NewReader(stream) - return http.ReadResponse(bufR, req) + resp, err := http.ReadResponse(bufR, req) + if err != nil { + return nil, err + } + + defer func() { + go test(t.ctx, resp, stream) + }() + + return resp, nil +} + +func test(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := resp.Body.Read(nil) + log := stream.Logger() + log.Errorf("err %v", err) + if err == nil { + // can still read from body so it's not closed + + } else if err != nil && err.Error() == "http: invalid Read on closed Body" { + stream.Close() + return + } + } + } + } diff --git a/dmsghttp/http_transport_test.go b/dmsghttp/http_transport_test.go index 065736f82..e43254005 100644 --- a/dmsghttp/http_transport_test.go +++ b/dmsghttp/http_transport_test.go @@ -15,6 +15,7 @@ import ( "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/cmdutil" "github.com/skycoin/dmsg/disc" ) @@ -63,10 +64,13 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { startHTTPServer(t, server0Results, lis) addr := lis.Addr().String() + log := logging.MustGetLogger(fmt.Sprintf("http_client")) + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + defer cancel() // Arrange: create http clients (in which each http client has an underlying dmsg client). - httpC1 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client1"))} - httpC2 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client2"))} - httpC3 := http.Client{Transport: MakeHTTPTransport(newDmsgClient(t, dc, minSessions, "client3"))} + httpC1 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client1"))} + httpC2 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client2"))} + httpC3 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client3"))} httpC1.Timeout = timeout httpC2.Timeout = timeout httpC3.Timeout = timeout From 3f08d42731685fce3387fbb27f88e1f707b9c55e Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 19 Jan 2022 18:06:04 +0530 Subject: [PATCH 2/4] Minor fixes and changes This commit contains some minor linting fixes. It also removes a test log and renames the func test used in RoundTrip() to closeStream() and adds a warn log in faliure to close stream and adds a comment detailing how we check if the response body is closed. The todo by evanlinjin is also removed as we won't be reusing the streams as we are now closing them after every use. --- dmsgget/dmsgget_test.go | 2 +- dmsghttp/examples_test.go | 3 ++- dmsghttp/http_transport.go | 20 +++++++++----------- dmsghttp/http_transport_test.go | 2 +- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/dmsgget/dmsgget_test.go b/dmsgget/dmsgget_test.go index a841b282d..125c914ff 100644 --- a/dmsgget/dmsgget_test.go +++ b/dmsgget/dmsgget_test.go @@ -172,7 +172,7 @@ func newHTTPClient(t *testing.T, dc disc.APIClient) *http.Client { t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) <-dmsgC.Ready() - log := logging.MustGetLogger(fmt.Sprintf("http_client")) + log := logging.MustGetLogger("http_client") ctx, cancel := cmdutil.SignalContext(context.Background(), log) defer cancel() return &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} diff --git a/dmsghttp/examples_test.go b/dmsghttp/examples_test.go index d8c47a9d2..3f242f279 100644 --- a/dmsghttp/examples_test.go +++ b/dmsghttp/examples_test.go @@ -15,6 +15,7 @@ import ( "github.com/skycoin/dmsg/cmdutil" "github.com/skycoin/dmsg/disc" "github.com/skycoin/dmsg/dmsghttp" + "github.com/skycoin/skycoin/src/util/logging" ) @@ -89,7 +90,7 @@ func ExampleMakeHTTPTransport() { go dmsgC2.Serve(context.Background()) <-dmsgC2.Ready() - log := logging.MustGetLogger(fmt.Sprintf("http_client")) + log := logging.MustGetLogger("http_client") ctx, cancel := cmdutil.SignalContext(context.Background(), log) defer cancel() // Run HTTP client. diff --git a/dmsghttp/http_transport.go b/dmsghttp/http_transport.go index 331a16282..cc55e969c 100644 --- a/dmsghttp/http_transport.go +++ b/dmsghttp/http_transport.go @@ -38,9 +38,6 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { hostAddr.Port = defaultHTTPPort } - // TODO(evanlinjin): In the future, we should implement stream reuse to save bandwidth. - // We do not close the stream here as it is the user's responsibility to close the stream after resp.Body is fully - // read. stream, err := t.dmsgC.DialStream(req.Context(), hostAddr) if err != nil { return nil, err @@ -55,13 +52,13 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { } defer func() { - go test(t.ctx, resp, stream) + go closeStream(t.ctx, resp, stream) }() return resp, nil } -func test(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { +func closeStream(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -72,12 +69,13 @@ func test(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { case <-ticker.C: _, err := resp.Body.Read(nil) log := stream.Logger() - log.Errorf("err %v", err) - if err == nil { - // can still read from body so it's not closed - - } else if err != nil && err.Error() == "http: invalid Read on closed Body" { - stream.Close() + // If error is not nil and is equal to `http: invalid Read on closed Body` + // then it means that the body has been closed so we close the stream + if err != nil && err.Error() == "http: invalid Read on closed Body" { + err := stream.Close() + if err != nil { + log.Warnf("Error closing stream: %v", err) + } return } } diff --git a/dmsghttp/http_transport_test.go b/dmsghttp/http_transport_test.go index e43254005..253436712 100644 --- a/dmsghttp/http_transport_test.go +++ b/dmsghttp/http_transport_test.go @@ -64,7 +64,7 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { startHTTPServer(t, server0Results, lis) addr := lis.Addr().String() - log := logging.MustGetLogger(fmt.Sprintf("http_client")) + log := logging.MustGetLogger("http_client") ctx, cancel := cmdutil.SignalContext(context.Background(), log) defer cancel() // Arrange: create http clients (in which each http client has an underlying dmsg client). From 99786dff2c05a57811d9e207f7b89d9296b0c187 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 21 Jan 2022 13:38:13 +0530 Subject: [PATCH 3/4] Fix closeStream This commit fixes closeStream() in dmsghttp. The error check is changed from using err strings to using errors.IS to check the error http.ErrBodyReadAfterClose. Also a new error is added to the check, io.EOF. Now we check if there is an error and if the error is equal to either http.ErrBodyReadAfterClose or io.EOF. --- dmsghttp/http_transport.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dmsghttp/http_transport.go b/dmsghttp/http_transport.go index cc55e969c..563a310bd 100644 --- a/dmsghttp/http_transport.go +++ b/dmsghttp/http_transport.go @@ -3,7 +3,9 @@ package dmsghttp import ( "bufio" "context" + "errors" "fmt" + "io" "net/http" "time" @@ -69,9 +71,9 @@ func closeStream(ctx context.Context, resp *http.Response, stream *dmsg.Stream) case <-ticker.C: _, err := resp.Body.Read(nil) log := stream.Logger() - // If error is not nil and is equal to `http: invalid Read on closed Body` + // If error is not nil and is equal to ErrBodyReadAfterClose or EOF // then it means that the body has been closed so we close the stream - if err != nil && err.Error() == "http: invalid Read on closed Body" { + if err != nil && (errors.Is(err, http.ErrBodyReadAfterClose) || errors.Is(err, io.EOF)) { err := stream.Close() if err != nil { log.Warnf("Error closing stream: %v", err) From 0dd2aa98c8d1f345cdc86ab126aaf50933bbc099 Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 24 Jan 2022 11:07:19 +0530 Subject: [PATCH 4/4] Fix TestHTTPTransport_RoundTrip This commit fixes the TestHTTPTransport_RoundTrip by removing the timeout from http clients. --- dmsghttp/http_transport_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dmsghttp/http_transport_test.go b/dmsghttp/http_transport_test.go index 253436712..afebd5911 100644 --- a/dmsghttp/http_transport_test.go +++ b/dmsghttp/http_transport_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "testing" - "time" "github.com/sirupsen/logrus" "github.com/skycoin/skycoin/src/util/logging" @@ -26,7 +25,6 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { nSrvs = 5 minSessions = 3 maxSessions = 20 - timeout = time.Second * 5 ) // Ensure HTTP request/response works. @@ -71,9 +69,6 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { httpC1 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client1"))} httpC2 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client2"))} httpC3 := http.Client{Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client3"))} - httpC1.Timeout = timeout - httpC2.Timeout = timeout - httpC3.Timeout = timeout // Act: http clients send requests concurrently. // - client1 sends "/index.html" requests.