From e66a08340d70ea3a7c6ac1f83c0b19a218c03f37 Mon Sep 17 00:00:00 2001 From: ersonp Date: Wed, 19 Jan 2022 17:47:41 +0530 Subject: [PATCH 1/3] Fix MakeHTTPTransport This commit adds ctx to the MakeHTTPTransport func. --- pkg/visor/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index aa5a3a88c2..bf74f305fd 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -186,7 +186,7 @@ func initDmsgHTTP(ctx context.Context, v *Visor, log *logging.Logger) error { return fmt.Errorf("failed to start dmsg: %w", err) } - dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgDC)} + dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgDC)} v.pushCloseStack("dmsg_http", func() error { closeDmsgDC() From 2757eab332fa755a049d73e696ccb5152162fe03 Mon Sep 17 00:00:00 2001 From: ersonp Date: Fri, 21 Jan 2022 13:17:20 +0530 Subject: [PATCH 2/3] Fix isNonceValid This commit fixes the func isNonceValid() in httpauth. The func isNonceValid() used to replace the original body of the response with the body from ioutil.NopCloser() which caused the res.Body.Close() to not work and so the stream in dmsghttp did not close. Now instead of replacing the body of the response we instead create a new response var auxResp and send it back. Since it is a copy of the original response and not a pointer, the changes to the body of auxResp does not affect the original response. --- internal/httpauth/client.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/httpauth/client.go b/internal/httpauth/client.go index 745058d976..4fb184781d 100644 --- a/internal/httpauth/client.go +++ b/internal/httpauth/client.go @@ -135,7 +135,7 @@ func (c *Client) do(client *http.Client, req *http.Request) (*http.Response, err return nil, err } - isNonceValid, err := isNonceValid(resp) + resp, isNonceValid, err := isNonceValid(resp, c.log) if err != nil { return nil, err } @@ -259,26 +259,28 @@ func (c *Client) IncrementNonce() { // isNonceValid checks if `res` contains an invalid nonce error. // The error is occurred if status code equals to `http.StatusUnauthorized` // and body contains `invalidNonceErrorMessage`. -func isNonceValid(res *http.Response) (bool, error) { +func isNonceValid(res *http.Response, log *logging.Logger) (*http.Response, bool, error) { var serverResponse HTTPResponse + var auxResp http.Response auxRespBody, err := ioutil.ReadAll(res.Body) if err != nil { - return false, err + return nil, false, err } if err := res.Body.Close(); err != nil { - return false, err + return nil, false, err } - res.Body = ioutil.NopCloser(bytes.NewBuffer(auxRespBody)) + auxResp = *res + auxResp.Body = ioutil.NopCloser(bytes.NewBuffer(auxRespBody)) if err := json.Unmarshal(auxRespBody, &serverResponse); err != nil || serverResponse.Error == nil { - return true, nil + return &auxResp, true, nil } isAuthorized := serverResponse.Error.Code != http.StatusUnauthorized hasValidNonce := serverResponse.Error.Message != invalidNonceErrorMessage - return isAuthorized && hasValidNonce, nil + return &auxResp, isAuthorized && hasValidNonce, nil } func sanitizedAddr(addr string) string { From 4ffdb9583a50c0db8d331c5aec30e77678d312c1 Mon Sep 17 00:00:00 2001 From: ersonp Date: Tue, 25 Jan 2022 16:59:14 +0530 Subject: [PATCH 3/3] Update dmsg This commit updates the dmsg@develop and fixes linting. --- go.mod | 2 +- go.sum | 4 +- internal/httpauth/client.go | 4 +- vendor/github.com/skycoin/dmsg/client.go | 1 + .../skycoin/dmsg/dmsgget/dmsgget.go | 2 +- .../skycoin/dmsg/dmsghttp/http_transport.go | 52 ++++++++++++++++--- .../github.com/skycoin/dmsg/dmsghttp/util.go | 10 ++-- vendor/modules.txt | 2 +- 8 files changed, 60 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index a257d1ba2c..c0d932999e 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/schollz/progressbar/v2 v2.15.0 github.com/shirou/gopsutil/v3 v3.21.4 github.com/sirupsen/logrus v1.8.1 - github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be + github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 github.com/skycoin/skycoin v0.27.1 github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 diff --git a/go.sum b/go.sum index b8d1625b5b..bf8c88af12 100644 --- a/go.sum +++ b/go.sum @@ -435,8 +435,8 @@ github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5k github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be h1:M+f0VZ7I7Yt/sI4tcKFIpU0nPWdRKz5e0+xLjvjm6iI= -github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be/go.mod h1:EgRg8fy5RjF67OJlh9w+vhq3+Phyn6AXKSedkzhf1ww= +github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 h1:Ja0OyTBk00akywqJbPpPV6wyBX9NtjpyQD64G7oH4RQ= +github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3/go.mod h1:EgRg8fy5RjF67OJlh9w+vhq3+Phyn6AXKSedkzhf1ww= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.26.0/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= diff --git a/internal/httpauth/client.go b/internal/httpauth/client.go index 4fb184781d..e10082f1cf 100644 --- a/internal/httpauth/client.go +++ b/internal/httpauth/client.go @@ -135,7 +135,7 @@ func (c *Client) do(client *http.Client, req *http.Request) (*http.Response, err return nil, err } - resp, isNonceValid, err := isNonceValid(resp, c.log) + resp, isNonceValid, err := isNonceValid(resp) if err != nil { return nil, err } @@ -259,7 +259,7 @@ func (c *Client) IncrementNonce() { // isNonceValid checks if `res` contains an invalid nonce error. // The error is occurred if status code equals to `http.StatusUnauthorized` // and body contains `invalidNonceErrorMessage`. -func isNonceValid(res *http.Response, log *logging.Logger) (*http.Response, bool, error) { +func isNonceValid(res *http.Response) (*http.Response, bool, error) { var serverResponse HTTPResponse var auxResp http.Response diff --git a/vendor/github.com/skycoin/dmsg/client.go b/vendor/github.com/skycoin/dmsg/client.go index dcf9940e9b..dd7b85bed6 100644 --- a/vendor/github.com/skycoin/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/client.go @@ -339,6 +339,7 @@ func (ce *Client) EnsureSession(ctx context.Context, entry *disc.Entry) error { // If session with server of pk already exists, skip. if _, ok := ce.clientSession(ce.porter, entry.Static); ok { + ce.log.WithField("remote_pk", entry.Static).Info("Session already exists...") return nil } diff --git a/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go b/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go index 8df7a9a6f9..f370807611 100644 --- a/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go +++ b/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go @@ -110,7 +110,7 @@ func (dg *DmsgGet) Run(ctx context.Context, log *logging.Logger, skStr string, a } defer closeDmsg() - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)} + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} for i := 0; i < dg.dlF.Tries; i++ { log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries) diff --git a/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go b/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go index 4ae90ac981..563a310bdf 100644 --- a/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go +++ b/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go @@ -2,8 +2,12 @@ package dmsghttp import ( "bufio" + "context" + "errors" "fmt" + "io" "net/http" + "time" "github.com/skycoin/dmsg" ) @@ -13,12 +17,16 @@ const defaultHTTPPort = uint16(80) // HTTPTransport implements http.RoundTripper // Do not confuse this with a Skywire Transport implementation. type HTTPTransport struct { + ctx context.Context dmsgC *dmsg.Client } // MakeHTTPTransport makes an HTTPTransport. -func MakeHTTPTransport(dmsgC *dmsg.Client) HTTPTransport { - return HTTPTransport{dmsgC: dmsgC} +func MakeHTTPTransport(ctx context.Context, dmsgC *dmsg.Client) HTTPTransport { + return HTTPTransport{ + ctx: ctx, + dmsgC: dmsgC, + } } // RoundTrip implements golang's http package support for alternative HTTP transport protocols. @@ -32,17 +40,47 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { hostAddr.Port = defaultHTTPPort } - // TODO(evanlinjin): In the future, we should implement stream reuse to save bandwidth. - // We do not close the stream here as it is the user's responsibility to close the stream after resp.Body is fully - // read. stream, err := t.dmsgC.DialStream(req.Context(), hostAddr) if err != nil { return nil, err } - if err := req.Write(stream); err != nil { return nil, err } bufR := bufio.NewReader(stream) - return http.ReadResponse(bufR, req) + resp, err := http.ReadResponse(bufR, req) + if err != nil { + return nil, err + } + + defer func() { + go closeStream(t.ctx, resp, stream) + }() + + return resp, nil +} + +func closeStream(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := resp.Body.Read(nil) + log := stream.Logger() + // If error is not nil and is equal to ErrBodyReadAfterClose or EOF + // then it means that the body has been closed so we close the stream + if err != nil && (errors.Is(err, http.ErrBodyReadAfterClose) || errors.Is(err, io.EOF)) { + err := stream.Close() + if err != nil { + log.Warnf("Error closing stream: %v", err) + } + return + } + } + } + } diff --git a/vendor/github.com/skycoin/dmsg/dmsghttp/util.go b/vendor/github.com/skycoin/dmsg/dmsghttp/util.go index 63b9226ef9..160a17ae7b 100644 --- a/vendor/github.com/skycoin/dmsg/dmsghttp/util.go +++ b/vendor/github.com/skycoin/dmsg/dmsghttp/util.go @@ -37,7 +37,7 @@ func GetServers(ctx context.Context, dmsgDisc string, log *logging.Logger) (entr // UpdateServers is used to update the servers in the direct client. func UpdateServers(ctx context.Context, dClient disc.APIClient, dmsgDisc string, dmsgC *dmsg.Client, log *logging.Logger) (entries []*disc.Entry) { dmsgclient := disc.NewHTTP(dmsgDisc, &http.Client{}, log) - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Minute * 10) defer ticker.Stop() for { select { @@ -49,9 +49,13 @@ func UpdateServers(ctx context.Context, dClient disc.APIClient, dmsgDisc string, log.WithError(err).Error("Error getting dmsg-servers.") break } + log.Infof("Servers found : %v.", len(servers)) for _, server := range servers { - dClient.PostEntry(ctx, server) //nolint - dmsgC.EnsureSession(ctx, server) //nolint + dClient.PostEntry(ctx, server) //nolint + err := dmsgC.EnsureSession(ctx, server) + if err != nil { + log.WithField("remote_pk", server.Static).WithError(err).Warn("Failed to establish session.") + } } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 0677e280da..d6729ffc0b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -230,7 +230,7 @@ github.com/shirou/gopsutil/v3/process ## explicit; go 1.13 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be +# github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 ## explicit; go 1.16 github.com/skycoin/dmsg github.com/skycoin/dmsg/buildinfo