Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect proxy if yamux session failed #53

Merged
merged 4 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions cmd/apps/therealproxy-client/therealproxy-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package main
import (
"flag"
"net"
"time"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"

"github.com/SkycoinProject/dmsg/cipher"

"github.com/SkycoinProject/skywire-mainnet/internal/netutil"
"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/internal/therealproxy"
"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

var r = netutil.NewRetrier(time.Second, 0, 1)

func main() {
log := app.NewLogger(skyenv.SkyproxyClientName)
therealproxy.Log = log.PackageLogger(skyenv.SkyproxyClientName)
Expand Down Expand Up @@ -48,23 +43,20 @@ func main() {
log.Fatal("Invalid server PubKey: ", err)
}

var conn net.Conn
err = r.Do(func() error {
conn, err = socksApp.Dial(routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)})
return err
})
log.Printf("Serving on %v", *addr)
l, err := net.Listen("tcp", *addr)
if err != nil {
log.Fatal("Failed to dial to a server: ", err)
log.Fatalf("Failed to listen on %v: %v", *addr, err)
}

log.Printf("Connected to %v\n", pk)
remote := routing.Addr{PubKey: pk, Port: routing.Port(skyenv.SkyproxyPort)}

client, err := therealproxy.NewClient(conn)
client, err := therealproxy.NewClient(l, socksApp, remote)
if err != nil {
log.Fatal("Failed to create a new client: ", err)
}

log.Printf("Serving %v\n", addr)

log.Fatal(client.ListenAndServe(*addr))
if err := client.Serve(); err != nil {
log.Warnf("Failed to serve: %v", err)
}
}
2 changes: 1 addition & 1 deletion cmd/skywire-visor/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"log"
"log/syslog"
"net/http"
_ "net/http/pprof" //nolint:gosec
_ "net/http/pprof" // nolint:gosec // TODO: consider removing for security reasons
"os"
"os/signal"
"path/filepath"
Expand Down
103 changes: 99 additions & 4 deletions internal/therealproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/hashicorp/yamux"

"github.com/SkycoinProject/skywire-mainnet/internal/netutil"
"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

// Log is therealproxy package level logger, it can be replaced with a different one from outside the package
Expand All @@ -16,20 +21,110 @@ var Log = logging.MustGetLogger("therealproxy")
type Client struct {
session *yamux.Session
listener net.Listener
app *app.App
addr routing.Addr
}

// NewClient constructs a new Client.
func NewClient(conn net.Conn) (*Client, error) {
func NewClient(lis net.Listener, app *app.App, addr routing.Addr) (*Client, error) {
c := &Client{
listener: lis,
app: app,
addr: addr,
}
if err := c.connect(); err != nil {
return nil, err
}

return c, nil
}

func (c *Client) connect() error {
r := netutil.NewRetrier(time.Second, 0, 1)

var conn net.Conn
err := r.Do(func() error {
var err error
conn, err = c.app.Dial(c.addr)
return err
})
if err != nil {
return fmt.Errorf("failed to dial to a server: %v", err)
}

session, err := yamux.Client(conn, nil)
if err != nil {
return nil, fmt.Errorf("yamux: %s", err)
return fmt.Errorf("failed to create client: %s", err)
}

return &Client{session: session}, nil
c.session = session

return nil
}

// Serve proxies incoming connection to a remote proxy server.
func (c *Client) Serve() error {
for {
conn, err := c.listener.Accept()
if err != nil {
return fmt.Errorf("accept: %s", err)
}

stream := c.createStream()
c.handleStream(conn, stream)
}
}

func (c *Client) createStream() net.Conn {
for {
stream, err := c.session.Open()
if err == nil {
return stream
}

Log.Warnf("Failed to open yamux session: %v", err)

delay := 1 * time.Second
Log.Warnf("Restarting in %v", delay)
time.Sleep(delay)

if err := c.connect(); err != nil {
Log.Warnf("Failed to reconnect, trying again")
}
}
}

func (c *Client) handleStream(in, out net.Conn) {
go func() {
errCh := make(chan error, 2)
go func() {
_, err := io.Copy(out, in)
errCh <- err
}()

go func() {
_, err := io.Copy(in, out)
errCh <- err
}()

for err := range errCh {
if err := in.Close(); err != nil {
Log.WithError(err).Warn("Failed to close connection")
}
if err := out.Close(); err != nil {
Log.WithError(err).Warn("Failed to close stream")
}

if err != nil {
Log.Error("Copy error:", err)
}
}
}()
}

// ListenAndServe start tcp listener on addr and proxies incoming
// ListenAndServe starts tcp listener on addr and proxies incoming
// connection to a remote proxy server.
// TODO: get rid of it
func (c *Client) ListenAndServe(addr string) error {
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved
var stream net.Conn
var err error
Expand Down
2 changes: 1 addition & 1 deletion internal/therealproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Server) Serve(l net.Listener) error {

session, err := yamux.Server(conn, nil)
if err != nil {
return fmt.Errorf("yamux: %s", err)
return fmt.Errorf("yamux server failure: %s", err)
}

go func() {
Expand Down
7 changes: 6 additions & 1 deletion internal/therealproxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/hashicorp/yamux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"
Expand Down Expand Up @@ -49,9 +50,13 @@ func TestProxy(t *testing.T) {
conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)

client, err := NewClient(conn)
session, err := yamux.Client(conn, nil)
require.NoError(t, err)

client := &Client{
session: session,
}

errChan2 := make(chan error)
go func() {
errChan2 <- client.ListenAndServe(":10080")
Expand Down