diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e3c862977..60cb0ce82e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - added transport_setup_nodes field to transport section - added MinHops field to V1Routing section of config - added `skywire-cli config` subcommand +- added connection_duration field to `/api/visor/{pk}/apps/vpn-client/connections` ## 0.2.1 - 2020.04.07 diff --git a/internal/vpn/client.go b/internal/vpn/client.go index 0bd0b3e2b2..381742be35 100644 --- a/internal/vpn/client.go +++ b/internal/vpn/client.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -52,6 +53,8 @@ type Client struct { tunMu sync.Mutex tun TUNDevice tunCreated bool + + connectedDuration int64 } // NewClient creates VPN client instance. @@ -176,10 +179,13 @@ func (c *Client) Serve() error { fmt.Printf("Failed to close TUN: %v\n", err) } - fmt.Println("Closed TUN") + c.log.Info("Closing TUN") }() - defer c.setAppStatus(ClientStatusShuttingDown) + defer func() { + c.setAppStatus(ClientStatusShuttingDown) + c.resetConnDuration() + }() c.setAppStatus(ClientStatusConnecting) @@ -205,8 +211,10 @@ func (c *Client) Serve() error { case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs, errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound: c.setAppError(err) + c.resetConnDuration() return err default: + c.resetConnDuration() c.setAppStatus(ClientStatusReconnecting) c.setAppError(errTimeout) fmt.Println("\nConnection broke, reconnecting...") @@ -252,6 +260,17 @@ func (c *Client) AddDirectRoute(ip net.IP) error { return c.setupDirectRoute(ip) } +func (c *Client) removeDirectRouteFn(ip net.IP, i int) error { + c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...) + + if err := c.setSysPrivileges(); err != nil { + return fmt.Errorf("failed to setup system privileges: %w", err) + } + defer c.releaseSysPrivileges() + + return c.removeDirectRoute(ip) +} + // RemoveDirectRoute removes direct route. Packets destined to `ip` will // go through VPN. func (c *Client) RemoveDirectRoute(ip net.IP) error { @@ -260,17 +279,9 @@ func (c *Client) RemoveDirectRoute(ip net.IP) error { for i, storedIP := range c.directIPs { if ip.Equal(storedIP) { - c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...) - - if err := c.setSysPrivileges(); err != nil { - return fmt.Errorf("failed to setup system privileges: %w", err) - } - defer c.releaseSysPrivileges() - - if err := c.removeDirectRoute(ip); err != nil { + if err := c.removeDirectRouteFn(ip, i); err != nil { return err } - break } } @@ -397,6 +408,8 @@ func (c *Client) serveConn(conn net.Conn) error { } c.setAppStatus(ClientStatusRunning) + c.resetConnDuration() + t := time.NewTicker(time.Second) defer func() { if !c.cfg.Killswitch { @@ -427,10 +440,19 @@ func (c *Client) serveConn(conn net.Conn) error { }() // only one side may fail here, so we wait till at least one fails - select { - case <-connToTunDoneCh: - case <-tunToConnCh: - case <-c.closeC: +serveLoop: + for { + select { + case <-connToTunDoneCh: + break serveLoop + case <-tunToConnCh: + break serveLoop + case <-c.closeC: + break serveLoop + case <-t.C: + atomic.AddInt64(&c.connectedDuration, 1) + c.setConnectionDuration() + } } // here we setup system privileges again, so deferred calls may be done safely @@ -727,6 +749,12 @@ func (c *Client) setAppStatus(status ClientStatus) { } } +func (c *Client) setConnectionDuration() { + if err := c.appCl.SetConnectionDuration(atomic.LoadInt64(&c.connectedDuration)); err != nil { + fmt.Printf("Failed to set connection duration: %v\n", err) + } +} + func (c *Client) setAppError(appErr error) { if err := c.appCl.SetError(appErr.Error()); err != nil { fmt.Printf("Failed to set error %v: %v\n", appErr, err) @@ -743,6 +771,11 @@ func (c *Client) isClosed() bool { return false } +func (c *Client) resetConnDuration() { + atomic.StoreInt64(&c.connectedDuration, 0) + c.setConnectionDuration() +} + func ipFromEnv(key string) (net.IP, error) { ip, ok, err := IPFromEnv(key) if err != nil { diff --git a/pkg/app/appserver/mock_proc_manager.go b/pkg/app/appserver/mock_proc_manager.go index b4c6cc568d..66135ebce5 100644 --- a/pkg/app/appserver/mock_proc_manager.go +++ b/pkg/app/appserver/mock_proc_manager.go @@ -152,6 +152,27 @@ func (_m *MockProcManager) Start(conf appcommon.ProcConfig) (appcommon.ProcID, e return r0, r1 } +// Stats provides a mock function with given fields: appName +func (_m *MockProcManager) Stats(appName string) (AppStats, error) { + ret := _m.Called(appName) + + var r0 AppStats + if rf, ok := ret.Get(0).(func(string) AppStats); ok { + r0 = rf(appName) + } else { + r0 = ret.Get(0).(AppStats) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(appName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Stop provides a mock function with given fields: appName func (_m *MockProcManager) Stop(appName string) error { ret := _m.Called(appName) diff --git a/pkg/app/appserver/mock_rpc_ingress_client.go b/pkg/app/appserver/mock_rpc_ingress_client.go index 7293180e01..5d42d2aa58 100644 --- a/pkg/app/appserver/mock_rpc_ingress_client.go +++ b/pkg/app/appserver/mock_rpc_ingress_client.go @@ -142,6 +142,20 @@ func (_m *MockRPCIngressClient) Read(connID uint16, b []byte) (int, error) { return r0, r1 } +// SetConnectionDuration provides a mock function with given fields: dur +func (_m *MockRPCIngressClient) SetConnectionDuration(dur int64) error { + ret := _m.Called(dur) + + var r0 error + if rf, ok := ret.Get(0).(func(int64) error); ok { + r0 = rf(dur) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SetDeadline provides a mock function with given fields: connID, d func (_m *MockRPCIngressClient) SetDeadline(connID uint16, d time.Time) error { ret := _m.Called(connID, d) diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index c57a01af65..9a0cab4a13 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -53,8 +53,12 @@ type Proc struct { statusMx sync.RWMutex status string - errMx sync.RWMutex - err string + // connection duration (i.e. when vpn client is connected, the app will set the connection duration) + connDuration int64 + connDurationMu sync.RWMutex + + errMx sync.RWMutex + err string } // NewProc constructs `Proc`. @@ -280,6 +284,20 @@ func (p *Proc) SetDetailedStatus(status string) { p.status = status } +// SetConnectionDuration sets the proc's connection duration +func (p *Proc) SetConnectionDuration(dur int64) { + p.connDurationMu.Lock() + defer p.connDurationMu.Unlock() + p.connDuration = dur +} + +// ConnectionDuration gets proc's connection duration +func (p *Proc) ConnectionDuration() int64 { + p.connDurationMu.RLock() + defer p.connDurationMu.RUnlock() + return p.connDuration +} + // DetailedStatus gets proc's detailed status. func (p *Proc) DetailedStatus() string { p.statusMx.RLock() @@ -306,13 +324,14 @@ func (p *Proc) Error() string { // ConnectionSummary sums up the connection stats. type ConnectionSummary struct { - IsAlive bool `json:"is_alive"` - Latency time.Duration `json:"latency"` - UploadSpeed uint32 `json:"upload_speed"` - DownloadSpeed uint32 `json:"download_speed"` - BandwidthSent uint64 `json:"bandwidth_sent"` - BandwidthReceived uint64 `json:"bandwidth_received"` - Error string `json:"error"` + IsAlive bool `json:"is_alive"` + Latency time.Duration `json:"latency"` + UploadSpeed uint32 `json:"upload_speed"` + DownloadSpeed uint32 `json:"download_speed"` + BandwidthSent uint64 `json:"bandwidth_sent"` + BandwidthReceived uint64 `json:"bandwidth_received"` + Error string `json:"error"` + ConnectionDuration int64 `json:"connection_duration,omitempty"` } // ConnectionsSummary returns all of the proc's connections stats. @@ -348,12 +367,13 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary { } summaries = append(summaries, ConnectionSummary{ - IsAlive: skywireConn.IsAlive(), - Latency: skywireConn.Latency(), - UploadSpeed: skywireConn.UploadSpeed(), - DownloadSpeed: skywireConn.DownloadSpeed(), - BandwidthSent: skywireConn.BandwidthSent(), - BandwidthReceived: skywireConn.BandwidthReceived(), + IsAlive: skywireConn.IsAlive(), + Latency: skywireConn.Latency(), + UploadSpeed: skywireConn.UploadSpeed(), + DownloadSpeed: skywireConn.DownloadSpeed(), + BandwidthSent: skywireConn.BandwidthSent(), + BandwidthReceived: skywireConn.BandwidthReceived(), + ConnectionDuration: p.ConnectionDuration(), }) return true diff --git a/pkg/app/appserver/rpc_ingress_client.go b/pkg/app/appserver/rpc_ingress_client.go index acfe42d021..f0ef354b5b 100644 --- a/pkg/app/appserver/rpc_ingress_client.go +++ b/pkg/app/appserver/rpc_ingress_client.go @@ -15,6 +15,7 @@ import ( // RPCIngressClient describes RPC interface to communicate with the server. type RPCIngressClient interface { SetDetailedStatus(status string) error + SetConnectionDuration(dur int64) error SetError(appErr string) error Dial(remote appnet.Addr) (connID uint16, localPort routing.Port, err error) Listen(local appnet.Addr) (uint16, error) @@ -47,6 +48,11 @@ func (c *rpcIngressClient) SetDetailedStatus(status string) error { return c.rpc.Call(c.formatMethod("SetDetailedStatus"), &status, nil) } +// SetConnectionDuration sets the connection duration for an app +func (c *rpcIngressClient) SetConnectionDuration(dur int64) error { + return c.rpc.Call(c.formatMethod("SetConnectionDuration"), dur, nil) +} + // SetError sets error of an app. func (c *rpcIngressClient) SetError(appErr string) error { return c.rpc.Call(c.formatMethod("SetError"), &appErr, nil) diff --git a/pkg/app/appserver/rpc_ingress_gateway.go b/pkg/app/appserver/rpc_ingress_gateway.go index c95e584163..f162257cb7 100644 --- a/pkg/app/appserver/rpc_ingress_gateway.go +++ b/pkg/app/appserver/rpc_ingress_gateway.go @@ -85,12 +85,17 @@ func (r *RPCIngressGateway) SetDetailedStatus(status *string, _ *struct{}) (err return nil } +// SetConnectionDuration sets the connection duration of an app (vpn-client in this instance) +func (r *RPCIngressGateway) SetConnectionDuration(dur int64, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "SetConnectionDuration", dur)(nil, &err) + r.proc.SetConnectionDuration(dur) + return nil +} + // SetError sets error of an app. func (r *RPCIngressGateway) SetError(appErr *string, _ *struct{}) (err error) { defer rpcutil.LogCall(r.log, "SetError", appErr)(nil, &err) - r.proc.SetError(*appErr) - return nil } diff --git a/pkg/app/client.go b/pkg/app/client.go index 6ce8756c43..1a99e4fdb4 100644 --- a/pkg/app/client.go +++ b/pkg/app/client.go @@ -68,6 +68,11 @@ func (c *Client) SetDetailedStatus(status string) error { return c.rpcC.SetDetailedStatus(status) } +// SetConnectionDuration sets the detailed app connection duration within the visor. +func (c *Client) SetConnectionDuration(dur int64) error { + return c.rpcC.SetConnectionDuration(dur) +} + // SetError sets app error within the visor. func (c *Client) SetError(appErr string) error { return c.rpcC.SetError(appErr) diff --git a/pkg/app/launcher/launcher.go b/pkg/app/launcher/launcher.go index 3f7496f353..304337a6fb 100644 --- a/pkg/app/launcher/launcher.go +++ b/pkg/app/launcher/launcher.go @@ -196,7 +196,10 @@ func (l *Launcher) AppStates() []*AppState { } if proc, ok := l.procM.ProcByName(app.Name); ok { state.DetailedStatus = proc.DetailedStatus() - state.Status = AppStatusRunning + connSummary := proc.ConnectionsSummary() + if connSummary != nil { + state.Status = AppStatusRunning + } } states = append(states, state) } diff --git a/pkg/setup/setupclient/mock_route_group_dialer.go b/pkg/setup/setupclient/mock_route_group_dialer.go new file mode 100644 index 0000000000..8e38eff154 --- /dev/null +++ b/pkg/setup/setupclient/mock_route_group_dialer.go @@ -0,0 +1,40 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package setupclient + +import ( + context "context" + + dmsg "github.com/skycoin/dmsg" + cipher "github.com/skycoin/dmsg/cipher" + logging "github.com/skycoin/skycoin/src/util/logging" + mock "github.com/stretchr/testify/mock" + + routing "github.com/skycoin/skywire/pkg/routing" +) + +// MockRouteGroupDialer is an autogenerated mock type for the RouteGroupDialer type +type MockRouteGroupDialer struct { + mock.Mock +} + +// Dial provides a mock function with given fields: ctx, log, dmsgC, setupNodes, req +func (_m *MockRouteGroupDialer) Dial(ctx context.Context, log *logging.Logger, dmsgC *dmsg.Client, setupNodes []cipher.PubKey, req routing.BidirectionalRoute) (routing.EdgeRules, error) { + ret := _m.Called(ctx, log, dmsgC, setupNodes, req) + + var r0 routing.EdgeRules + if rf, ok := ret.Get(0).(func(context.Context, *logging.Logger, *dmsg.Client, []cipher.PubKey, routing.BidirectionalRoute) routing.EdgeRules); ok { + r0 = rf(ctx, log, dmsgC, setupNodes, req) + } else { + r0 = ret.Get(0).(routing.EdgeRules) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *logging.Logger, *dmsg.Client, []cipher.PubKey, routing.BidirectionalRoute) error); ok { + r1 = rf(ctx, log, dmsgC, setupNodes, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/transport/network/addrresolver/mock_api_client.go b/pkg/transport/network/addrresolver/mock_api_client.go index a6cd7281d6..c078b497f4 100644 --- a/pkg/transport/network/addrresolver/mock_api_client.go +++ b/pkg/transport/network/addrresolver/mock_api_client.go @@ -29,13 +29,13 @@ func (_m *MockAPIClient) BindSTCPR(ctx context.Context, port string) error { return r0 } -// BindSUDPH provides a mock function with given fields: filter +// BindSUDPH provides a mock function with given fields: filter, handshake func (_m *MockAPIClient) BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error) { - ret := _m.Called(filter) + ret := _m.Called(filter, handshake) var r0 <-chan RemoteVisor - if rf, ok := ret.Get(0).(func(*pfilter.PacketFilter) <-chan RemoteVisor); ok { - r0 = rf(filter) + if rf, ok := ret.Get(0).(func(*pfilter.PacketFilter, Handshake) <-chan RemoteVisor); ok { + r0 = rf(filter, handshake) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(<-chan RemoteVisor) @@ -43,8 +43,8 @@ func (_m *MockAPIClient) BindSUDPH(filter *pfilter.PacketFilter, handshake Hands } var r1 error - if rf, ok := ret.Get(1).(func(*pfilter.PacketFilter) error); ok { - r1 = rf(filter) + if rf, ok := ret.Get(1).(func(*pfilter.PacketFilter, Handshake) error); ok { + r1 = rf(filter, handshake) } else { r1 = ret.Error(1) } @@ -87,20 +87,20 @@ func (_m *MockAPIClient) Health(ctx context.Context) (int, error) { return r0, r1 } -// Resolve provides a mock function with given fields: ctx, tType, pk -func (_m *MockAPIClient) Resolve(ctx context.Context, tType string, pk cipher.PubKey) (VisorData, error) { - ret := _m.Called(ctx, tType, pk) +// Resolve provides a mock function with given fields: ctx, netType, pk +func (_m *MockAPIClient) Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error) { + ret := _m.Called(ctx, netType, pk) var r0 VisorData if rf, ok := ret.Get(0).(func(context.Context, string, cipher.PubKey) VisorData); ok { - r0 = rf(ctx, tType, pk) + r0 = rf(ctx, netType, pk) } else { r0 = ret.Get(0).(VisorData) } var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, cipher.PubKey) error); ok { - r1 = rf(ctx, tType, pk) + r1 = rf(ctx, netType, pk) } else { r1 = ret.Error(1) }