diff --git a/go.mod b/go.mod index 935783167..430f93165 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/schollz/progressbar/v2 v2.15.0 github.com/shirou/gopsutil/v3 v3.21.4 github.com/sirupsen/logrus v1.8.1 - github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be + github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 github.com/skycoin/skycoin v0.27.1 github.com/skycoin/yamux v0.0.0-20200803175205-571ceb89da9f github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 diff --git a/go.sum b/go.sum index 248035c4d..82286543c 100644 --- a/go.sum +++ b/go.sum @@ -427,8 +427,8 @@ github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5k github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be h1:M+f0VZ7I7Yt/sI4tcKFIpU0nPWdRKz5e0+xLjvjm6iI= -github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be/go.mod h1:EgRg8fy5RjF67OJlh9w+vhq3+Phyn6AXKSedkzhf1ww= +github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 h1:Ja0OyTBk00akywqJbPpPV6wyBX9NtjpyQD64G7oH4RQ= +github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3/go.mod h1:EgRg8fy5RjF67OJlh9w+vhq3+Phyn6AXKSedkzhf1ww= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA= github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o= github.com/skycoin/skycoin v0.26.0/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= diff --git a/internal/httpauth/client.go b/internal/httpauth/client.go index 745058d97..e10082f1c 100644 --- a/internal/httpauth/client.go +++ b/internal/httpauth/client.go @@ -135,7 +135,7 @@ func (c *Client) do(client *http.Client, req *http.Request) (*http.Response, err return nil, err } - isNonceValid, err := isNonceValid(resp) + resp, isNonceValid, err := isNonceValid(resp) if err != nil { return nil, err } @@ -259,26 +259,28 @@ func (c *Client) IncrementNonce() { // isNonceValid checks if `res` contains an invalid nonce error. // The error is occurred if status code equals to `http.StatusUnauthorized` // and body contains `invalidNonceErrorMessage`. -func isNonceValid(res *http.Response) (bool, error) { +func isNonceValid(res *http.Response) (*http.Response, bool, error) { var serverResponse HTTPResponse + var auxResp http.Response auxRespBody, err := ioutil.ReadAll(res.Body) if err != nil { - return false, err + return nil, false, err } if err := res.Body.Close(); err != nil { - return false, err + return nil, false, err } - res.Body = ioutil.NopCloser(bytes.NewBuffer(auxRespBody)) + auxResp = *res + auxResp.Body = ioutil.NopCloser(bytes.NewBuffer(auxRespBody)) if err := json.Unmarshal(auxRespBody, &serverResponse); err != nil || serverResponse.Error == nil { - return true, nil + return &auxResp, true, nil } isAuthorized := serverResponse.Error.Code != http.StatusUnauthorized hasValidNonce := serverResponse.Error.Message != invalidNonceErrorMessage - return isAuthorized && hasValidNonce, nil + return &auxResp, isAuthorized && hasValidNonce, nil } func sanitizedAddr(addr string) string { diff --git a/pkg/visor/init.go b/pkg/visor/init.go index aa5a3a88c..bf74f305f 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -186,7 +186,7 @@ func initDmsgHTTP(ctx context.Context, v *Visor, log *logging.Logger) error { return fmt.Errorf("failed to start dmsg: %w", err) } - dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgDC)} + dmsgHTTP := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgDC)} v.pushCloseStack("dmsg_http", func() error { closeDmsgDC() diff --git a/vendor/github.com/skycoin/dmsg/client.go b/vendor/github.com/skycoin/dmsg/client.go index dcf9940e9..dd7b85bed 100644 --- a/vendor/github.com/skycoin/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/client.go @@ -339,6 +339,7 @@ func (ce *Client) EnsureSession(ctx context.Context, entry *disc.Entry) error { // If session with server of pk already exists, skip. if _, ok := ce.clientSession(ce.porter, entry.Static); ok { + ce.log.WithField("remote_pk", entry.Static).Info("Session already exists...") return nil } diff --git a/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go b/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go index 8df7a9a6f..f37080761 100644 --- a/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go +++ b/vendor/github.com/skycoin/dmsg/dmsgget/dmsgget.go @@ -110,7 +110,7 @@ func (dg *DmsgGet) Run(ctx context.Context, log *logging.Logger, skStr string, a } defer closeDmsg() - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(dmsgC)} + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} for i := 0; i < dg.dlF.Tries; i++ { log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries) diff --git a/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go b/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go index 4ae90ac98..563a310bd 100644 --- a/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go +++ b/vendor/github.com/skycoin/dmsg/dmsghttp/http_transport.go @@ -2,8 +2,12 @@ package dmsghttp import ( "bufio" + "context" + "errors" "fmt" + "io" "net/http" + "time" "github.com/skycoin/dmsg" ) @@ -13,12 +17,16 @@ const defaultHTTPPort = uint16(80) // HTTPTransport implements http.RoundTripper // Do not confuse this with a Skywire Transport implementation. type HTTPTransport struct { + ctx context.Context dmsgC *dmsg.Client } // MakeHTTPTransport makes an HTTPTransport. -func MakeHTTPTransport(dmsgC *dmsg.Client) HTTPTransport { - return HTTPTransport{dmsgC: dmsgC} +func MakeHTTPTransport(ctx context.Context, dmsgC *dmsg.Client) HTTPTransport { + return HTTPTransport{ + ctx: ctx, + dmsgC: dmsgC, + } } // RoundTrip implements golang's http package support for alternative HTTP transport protocols. @@ -32,17 +40,47 @@ func (t HTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { hostAddr.Port = defaultHTTPPort } - // TODO(evanlinjin): In the future, we should implement stream reuse to save bandwidth. - // We do not close the stream here as it is the user's responsibility to close the stream after resp.Body is fully - // read. stream, err := t.dmsgC.DialStream(req.Context(), hostAddr) if err != nil { return nil, err } - if err := req.Write(stream); err != nil { return nil, err } bufR := bufio.NewReader(stream) - return http.ReadResponse(bufR, req) + resp, err := http.ReadResponse(bufR, req) + if err != nil { + return nil, err + } + + defer func() { + go closeStream(t.ctx, resp, stream) + }() + + return resp, nil +} + +func closeStream(ctx context.Context, resp *http.Response, stream *dmsg.Stream) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := resp.Body.Read(nil) + log := stream.Logger() + // If error is not nil and is equal to ErrBodyReadAfterClose or EOF + // then it means that the body has been closed so we close the stream + if err != nil && (errors.Is(err, http.ErrBodyReadAfterClose) || errors.Is(err, io.EOF)) { + err := stream.Close() + if err != nil { + log.Warnf("Error closing stream: %v", err) + } + return + } + } + } + } diff --git a/vendor/github.com/skycoin/dmsg/dmsghttp/util.go b/vendor/github.com/skycoin/dmsg/dmsghttp/util.go index 63b9226ef..160a17ae7 100644 --- a/vendor/github.com/skycoin/dmsg/dmsghttp/util.go +++ b/vendor/github.com/skycoin/dmsg/dmsghttp/util.go @@ -37,7 +37,7 @@ func GetServers(ctx context.Context, dmsgDisc string, log *logging.Logger) (entr // UpdateServers is used to update the servers in the direct client. func UpdateServers(ctx context.Context, dClient disc.APIClient, dmsgDisc string, dmsgC *dmsg.Client, log *logging.Logger) (entries []*disc.Entry) { dmsgclient := disc.NewHTTP(dmsgDisc, &http.Client{}, log) - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Minute * 10) defer ticker.Stop() for { select { @@ -49,9 +49,13 @@ func UpdateServers(ctx context.Context, dClient disc.APIClient, dmsgDisc string, log.WithError(err).Error("Error getting dmsg-servers.") break } + log.Infof("Servers found : %v.", len(servers)) for _, server := range servers { - dClient.PostEntry(ctx, server) //nolint - dmsgC.EnsureSession(ctx, server) //nolint + dClient.PostEntry(ctx, server) //nolint + err := dmsgC.EnsureSession(ctx, server) + if err != nil { + log.WithField("remote_pk", server.Static).WithError(err).Warn("Failed to establish session.") + } } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index a9872be9c..02928c65c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -198,7 +198,7 @@ github.com/shirou/gopsutil/v3/process ## explicit; go 1.13 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v0.0.0-20211229130221-70e9ab64c1be +# github.com/skycoin/dmsg v0.0.0-20220125112430-1dfce6ea8ef3 ## explicit; go 1.16 github.com/skycoin/dmsg github.com/skycoin/dmsg/buildinfo