diff --git a/cmd/apps/therealproxy-client/therealproxy-client.go b/cmd/apps/therealproxy-client/therealproxy-client.go index 1469deb68..53cf3ff45 100644 --- a/cmd/apps/therealproxy-client/therealproxy-client.go +++ b/cmd/apps/therealproxy-client/therealproxy-client.go @@ -6,20 +6,15 @@ package main import ( "flag" "net" - "time" - - "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/dmsg/cipher" - "github.com/SkycoinProject/skywire-mainnet/internal/netutil" + "github.com/SkycoinProject/skywire-mainnet/internal/skyenv" "github.com/SkycoinProject/skywire-mainnet/internal/therealproxy" "github.com/SkycoinProject/skywire-mainnet/pkg/app" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" ) -var r = netutil.NewRetrier(time.Second, 0, 1) - func main() { log := app.NewLogger(skyenv.SkyproxyClientName) therealproxy.Log = log.PackageLogger(skyenv.SkyproxyClientName) @@ -48,23 +43,20 @@ func main() { log.Fatal("Invalid server PubKey: ", err) } - var conn net.Conn - err = r.Do(func() error { - conn, err = socksApp.Dial(routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)}) - return err - }) + log.Printf("Serving on %v", *addr) + l, err := net.Listen("tcp", *addr) if err != nil { - log.Fatal("Failed to dial to a server: ", err) + log.Fatalf("Failed to listen on %v: %v", *addr, err) } - log.Printf("Connected to %v\n", pk) + remote := routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)} - client, err := therealproxy.NewClient(conn) + client, err := therealproxy.NewClient(l, socksApp, remote) if err != nil { log.Fatal("Failed to create a new client: ", err) } - log.Printf("Serving %v\n", addr) - - log.Fatal(client.ListenAndServe(*addr)) + if err := client.Serve(); err != nil { + log.Warnf("Failed to serve: %v", err) + } } diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index cc177b7ee..bc0235bcb 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -11,7 +11,7 @@ import ( "log" "log/syslog" "net/http" - _ "net/http/pprof" //nolint:gosec + _ "net/http/pprof" // nolint:gosec // TODO: consider removing for security reasons "os" "os/signal" "path/filepath" diff --git a/internal/therealproxy/client.go b/internal/therealproxy/client.go index 171fcc9ad..b0135dc1f 100644 --- a/internal/therealproxy/client.go +++ b/internal/therealproxy/client.go @@ -4,9 +4,14 @@ import ( "fmt" "io" "net" + "time" "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/hashicorp/yamux" + + "github.com/SkycoinProject/skywire-mainnet/internal/netutil" + "github.com/SkycoinProject/skywire-mainnet/pkg/app" + "github.com/SkycoinProject/skywire-mainnet/pkg/routing" ) // Log is therealproxy package level logger, it can be replaced with a different one from outside the package @@ -16,20 +21,110 @@ var Log = logging.MustGetLogger("therealproxy") type Client struct { session *yamux.Session listener net.Listener + app *app.App + addr routing.Addr } // NewClient constructs a new Client. -func NewClient(conn net.Conn) (*Client, error) { +func NewClient(lis net.Listener, app *app.App, addr routing.Addr) (*Client, error) { + c := &Client{ + listener: lis, + app: app, + addr: addr, + } + if err := c.connect(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Client) connect() error { + r := netutil.NewRetrier(time.Second, 0, 1) + + var conn net.Conn + err := r.Do(func() error { + var err error + conn, err = c.app.Dial(c.addr) + return err + }) + if err != nil { + return fmt.Errorf("failed to dial to a server: %v", err) + } + session, err := yamux.Client(conn, nil) if err != nil { - return nil, fmt.Errorf("yamux: %s", err) + return fmt.Errorf("failed to create client: %s", err) } - return &Client{session: session}, nil + c.session = session + + return nil +} + +// Serve proxies incoming connection to a remote proxy server. +func (c *Client) Serve() error { + for { + conn, err := c.listener.Accept() + if err != nil { + return fmt.Errorf("accept: %s", err) + } + + stream := c.createStream() + c.handleStream(conn, stream) + } +} + +func (c *Client) createStream() net.Conn { + for { + stream, err := c.session.Open() + if err == nil { + return stream + } + + Log.Warnf("Failed to open yamux session: %v", err) + + delay := 1 * time.Second + Log.Warnf("Restarting in %v", delay) + time.Sleep(delay) + + if err := c.connect(); err != nil { + Log.Warnf("Failed to reconnect, trying again") + } + } +} + +func (c *Client) handleStream(in, out net.Conn) { + go func() { + errCh := make(chan error, 2) + go func() { + _, err := io.Copy(out, in) + errCh <- err + }() + + go func() { + _, err := io.Copy(in, out) + errCh <- err + }() + + for err := range errCh { + if err := in.Close(); err != nil { + Log.WithError(err).Warn("Failed to close connection") + } + if err := out.Close(); err != nil { + Log.WithError(err).Warn("Failed to close stream") + } + + if err != nil { + Log.Error("Copy error:", err) + } + } + }() } -// ListenAndServe start tcp listener on addr and proxies incoming +// ListenAndServe starts tcp listener on addr and proxies incoming // connection to a remote proxy server. +// TODO: get rid of it func (c *Client) ListenAndServe(addr string) error { var stream net.Conn var err error diff --git a/internal/therealproxy/server.go b/internal/therealproxy/server.go index ee1474c55..ac5c4e215 100644 --- a/internal/therealproxy/server.go +++ b/internal/therealproxy/server.go @@ -43,7 +43,7 @@ func (s *Server) Serve(l net.Listener) error { session, err := yamux.Server(conn, nil) if err != nil { - return fmt.Errorf("yamux: %s", err) + return fmt.Errorf("yamux server failure: %s", err) } go func() { diff --git a/internal/therealproxy/server_test.go b/internal/therealproxy/server_test.go index 9ba3dc114..96ad6c5a5 100644 --- a/internal/therealproxy/server_test.go +++ b/internal/therealproxy/server_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/SkycoinProject/skycoin/src/util/logging" + "github.com/hashicorp/yamux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/nettest" @@ -49,9 +50,13 @@ func TestProxy(t *testing.T) { conn, err := net.Dial("tcp", l.Addr().String()) require.NoError(t, err) - client, err := NewClient(conn) + session, err := yamux.Client(conn, nil) require.NoError(t, err) + client := &Client{ + session: session, + } + errChan2 := make(chan error) go func() { errChan2 <- client.ListenAndServe(":10080")