From 65202be69a8be07b382a967f8c13eb257bcf137a Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 7 Nov 2019 16:19:38 +0300 Subject: [PATCH 1/3] Reconnect proxy if yamux session failed --- .../therealproxy-client.go | 26 ++--- internal/therealproxy/client.go | 94 ++++++++++++++++++- internal/therealproxy/server.go | 2 +- 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/cmd/apps/therealproxy-client/therealproxy-client.go b/cmd/apps/therealproxy-client/therealproxy-client.go index 1469deb687..a9191f6259 100644 --- a/cmd/apps/therealproxy-client/therealproxy-client.go +++ b/cmd/apps/therealproxy-client/therealproxy-client.go @@ -6,20 +6,15 @@ package main import ( "flag" "net" - "time" - - "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/dmsg/cipher" - "github.com/SkycoinProject/skywire-mainnet/internal/netutil" + "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/skywire-mainnet/internal/therealproxy" "github.com/SkycoinProject/skywire-mainnet/pkg/app" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" ) -var r = netutil.NewRetrier(time.Second, 0, 1) - func main() { log := app.NewLogger(skyenv.SkyproxyClientName) therealproxy.Log = log.PackageLogger(skyenv.SkyproxyClientName) @@ -48,23 +43,20 @@ func main() { log.Fatal("Invalid server PubKey: ", err) } - var conn net.Conn - err = r.Do(func() error { - conn, err = socksApp.Dial(routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)}) - return err - }) + log.Printf("Serving on %v", *addr) + l, err := net.Listen("tcp", *addr) if err != nil { - log.Fatal("Failed to dial to a server: ", err) + log.Fatal("Failed to listen on %v: %v", *addr, err) } - log.Printf("Connected to %v\n", pk) + remote := routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)} - client, err := therealproxy.NewClient(conn) + client, err := therealproxy.NewClient(l, socksApp, remote) if err != nil { log.Fatal("Failed to create a new client: ", err) } - log.Printf("Serving %v\n", addr) - - log.Fatal(client.ListenAndServe(*addr)) + if err := client.Serve(); err != nil { + log.Warnf("Failed to serve: %v", err) + } } diff --git a/internal/therealproxy/client.go b/internal/therealproxy/client.go index 171fcc9ade..60efcf18a8 100644 --- a/internal/therealproxy/client.go +++ b/internal/therealproxy/client.go @@ -4,9 +4,14 @@ import ( "fmt" "io" "net" + "time" "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/hashicorp/yamux" + + "github.com/SkycoinProject/skywire-mainnet/internal/netutil" + "github.com/SkycoinProject/skywire-mainnet/pkg/app" + "github.com/SkycoinProject/skywire-mainnet/pkg/routing" ) // Log is therealproxy package level logger, it can be replaced with a different one from outside the package @@ -16,16 +21,99 @@ var Log = logging.MustGetLogger("therealproxy") type Client struct { session *yamux.Session listener net.Listener + app *app.App + addr routing.Addr } // NewClient constructs a new Client. -func NewClient(conn net.Conn) (*Client, error) { +func NewClient(lis net.Listener, app *app.App, addr routing.Addr) (*Client, error) { + c := &Client{ + listener: lis, + app: app, + addr: addr, + } + if err := c.connect(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Client) connect() error { + r := netutil.NewRetrier(time.Second, 0, 1) + + var conn net.Conn + err := r.Do(func() error { + var err error + conn, err = c.app.Dial(c.addr) + return err + }) + if err != nil { + return fmt.Errorf("failed to dial to a server: %v", err) + } + session, err := yamux.Client(conn, nil) if err != nil { - return nil, fmt.Errorf("yamux: %s", err) + return fmt.Errorf("failed to create client: %s", err) } - return &Client{session: session}, nil + c.session = session + + return nil +} + +func (c *Client) Serve() error { + var stream net.Conn + + for { + conn, err := c.listener.Accept() + if err != nil { + return fmt.Errorf("accept: %s", err) + } + + for { + stream, err = c.session.Open() + if err == nil { + break + } + + Log.Warnf("Failed to open yamux session: %v", err) + + delay := 1 * time.Second + Log.Warnf("Restarting in %v", delay) + time.Sleep(delay) + + if err := c.connect(); err != nil { + Log.Warnf("Failed to reconnect, trying again") + } + } + + go func() { + errCh := make(chan error, 2) + go func() { + _, err := io.Copy(stream, conn) + errCh <- err + }() + + go func() { + _, err := io.Copy(conn, stream) + errCh <- err + }() + + for err := range errCh { + if err := conn.Close(); err != nil { + Log.WithError(err).Warn("Failed to close connection") + } + if err := stream.Close(); err != nil { + Log.WithError(err).Warn("Failed to close stream") + } + + if err != nil { + Log.Error("Copy error:", err) + } + } + }() + } } // ListenAndServe start tcp listener on addr and proxies incoming diff --git a/internal/therealproxy/server.go b/internal/therealproxy/server.go index ee1474c55e..ac5c4e2157 100644 --- a/internal/therealproxy/server.go +++ b/internal/therealproxy/server.go @@ -43,7 +43,7 @@ func (s *Server) Serve(l net.Listener) error { session, err := yamux.Server(conn, nil) if err != nil { - return fmt.Errorf("yamux: %s", err) + return fmt.Errorf("yamux server failure: %s", err) } go func() { From 0576ed09054a8759018d71b8706ea1004ac24a06 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 7 Nov 2019 16:54:17 +0300 Subject: [PATCH 2/3] Fix tests and linter errors --- cmd/apps/therealproxy-client/therealproxy-client.go | 2 +- cmd/skywire-visor/commands/root.go | 2 +- internal/therealproxy/client.go | 4 +++- internal/therealproxy/server_test.go | 7 ++++++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/apps/therealproxy-client/therealproxy-client.go b/cmd/apps/therealproxy-client/therealproxy-client.go index a9191f6259..53cf3ff453 100644 --- a/cmd/apps/therealproxy-client/therealproxy-client.go +++ b/cmd/apps/therealproxy-client/therealproxy-client.go @@ -46,7 +46,7 @@ func main() { log.Printf("Serving on %v", *addr) l, err := net.Listen("tcp", *addr) if err != nil { - log.Fatal("Failed to listen on %v: %v", *addr, err) + log.Fatalf("Failed to listen on %v: %v", *addr, err) } remote := routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)} diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index 49e35b2a27..97f50c0c65 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -10,7 +10,7 @@ import ( "log" "log/syslog" "net/http" - _ "net/http/pprof" // used for HTTP profiling + _ "net/http/pprof" // nolint:gosec // TODO: consider removing for security reasons "os" "os/signal" "path/filepath" diff --git a/internal/therealproxy/client.go b/internal/therealproxy/client.go index 60efcf18a8..f97416bcf5 100644 --- a/internal/therealproxy/client.go +++ b/internal/therealproxy/client.go @@ -62,6 +62,7 @@ func (c *Client) connect() error { return nil } +// Serve proxies incoming connection to a remote proxy server. func (c *Client) Serve() error { var stream net.Conn @@ -116,8 +117,9 @@ func (c *Client) Serve() error { } } -// ListenAndServe start tcp listener on addr and proxies incoming +// ListenAndServe starts tcp listener on addr and proxies incoming // connection to a remote proxy server. +// TODO: get rid of it func (c *Client) ListenAndServe(addr string) error { var stream net.Conn var err error diff --git a/internal/therealproxy/server_test.go b/internal/therealproxy/server_test.go index 9ba3dc1148..96ad6c5a51 100644 --- a/internal/therealproxy/server_test.go +++ b/internal/therealproxy/server_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/SkycoinProject/skycoin/src/util/logging" + "github.com/hashicorp/yamux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/nettest" @@ -49,9 +50,13 @@ func TestProxy(t *testing.T) { conn, err := net.Dial("tcp", l.Addr().String()) require.NoError(t, err) - client, err := NewClient(conn) + session, err := yamux.Client(conn, nil) require.NoError(t, err) + client := &Client{ + session: session, + } + errChan2 := make(chan error) go func() { errChan2 <- client.ListenAndServe(":10080") From f0ec69975d35fbfa47c4bd4064301825069f91c7 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 7 Nov 2019 17:14:03 +0300 Subject: [PATCH 3/3] Refactoring --- internal/therealproxy/client.go | 75 ++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/internal/therealproxy/client.go b/internal/therealproxy/client.go index f97416bcf5..b0135dc1f5 100644 --- a/internal/therealproxy/client.go +++ b/internal/therealproxy/client.go @@ -64,57 +64,62 @@ func (c *Client) connect() error { // Serve proxies incoming connection to a remote proxy server. func (c *Client) Serve() error { - var stream net.Conn - for { conn, err := c.listener.Accept() if err != nil { return fmt.Errorf("accept: %s", err) } - for { - stream, err = c.session.Open() - if err == nil { - break - } + stream := c.createStream() + c.handleStream(conn, stream) + } +} - Log.Warnf("Failed to open yamux session: %v", err) +func (c *Client) createStream() net.Conn { + for { + stream, err := c.session.Open() + if err == nil { + return stream + } - delay := 1 * time.Second - Log.Warnf("Restarting in %v", delay) - time.Sleep(delay) + Log.Warnf("Failed to open yamux session: %v", err) - if err := c.connect(); err != nil { - Log.Warnf("Failed to reconnect, trying again") - } + delay := 1 * time.Second + Log.Warnf("Restarting in %v", delay) + time.Sleep(delay) + + if err := c.connect(); err != nil { + Log.Warnf("Failed to reconnect, trying again") } + } +} +func (c *Client) handleStream(in, out net.Conn) { + go func() { + errCh := make(chan error, 2) go func() { - errCh := make(chan error, 2) - go func() { - _, err := io.Copy(stream, conn) - errCh <- err - }() + _, err := io.Copy(out, in) + errCh <- err + }() - go func() { - _, err := io.Copy(conn, stream) - errCh <- err - }() + go func() { + _, err := io.Copy(in, out) + errCh <- err + }() - for err := range errCh { - if err := conn.Close(); err != nil { - Log.WithError(err).Warn("Failed to close connection") - } - if err := stream.Close(); err != nil { - Log.WithError(err).Warn("Failed to close stream") - } + for err := range errCh { + if err := in.Close(); err != nil { + Log.WithError(err).Warn("Failed to close connection") + } + if err := out.Close(); err != nil { + Log.WithError(err).Warn("Failed to close stream") + } - if err != nil { - Log.Error("Copy error:", err) - } + if err != nil { + Log.Error("Copy error:", err) } - }() - } + } + }() } // ListenAndServe starts tcp listener on addr and proxies incoming