Skip to content

Commit

Permalink
Merge pull request #908 from alexadhy/vpn-conn-duration
Browse files Browse the repository at this point in the history
AppStats: Connection Duration Addition to API
  • Loading branch information
jdknives authored Oct 12, 2021
2 parents bb755a4 + 6d015ee commit 244adc2
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- added transport_setup_nodes field to transport section
- added MinHops field to V1Routing section of config
- added `skywire-cli config` subcommand
- added connection_duration field to `/api/visor/{pk}/apps/vpn-client/connections`

## 0.2.1 - 2020.04.07

Expand Down
63 changes: 48 additions & 15 deletions internal/vpn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,6 +53,8 @@ type Client struct {
tunMu sync.Mutex
tun TUNDevice
tunCreated bool

connectedDuration int64
}

// NewClient creates VPN client instance.
Expand Down Expand Up @@ -176,10 +179,13 @@ func (c *Client) Serve() error {
fmt.Printf("Failed to close TUN: %v\n", err)
}

fmt.Println("Closed TUN")
c.log.Info("Closing TUN")
}()

defer c.setAppStatus(ClientStatusShuttingDown)
defer func() {
c.setAppStatus(ClientStatusShuttingDown)
c.resetConnDuration()
}()

c.setAppStatus(ClientStatusConnecting)

Expand All @@ -205,8 +211,10 @@ func (c *Client) Serve() error {
case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs,
errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound:
c.setAppError(err)
c.resetConnDuration()
return err
default:
c.resetConnDuration()
c.setAppStatus(ClientStatusReconnecting)
c.setAppError(errTimeout)
fmt.Println("\nConnection broke, reconnecting...")
Expand Down Expand Up @@ -252,6 +260,17 @@ func (c *Client) AddDirectRoute(ip net.IP) error {
return c.setupDirectRoute(ip)
}

func (c *Client) removeDirectRouteFn(ip net.IP, i int) error {
c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...)

if err := c.setSysPrivileges(); err != nil {
return fmt.Errorf("failed to setup system privileges: %w", err)
}
defer c.releaseSysPrivileges()

return c.removeDirectRoute(ip)
}

// RemoveDirectRoute removes direct route. Packets destined to `ip` will
// go through VPN.
func (c *Client) RemoveDirectRoute(ip net.IP) error {
Expand All @@ -260,17 +279,9 @@ func (c *Client) RemoveDirectRoute(ip net.IP) error {

for i, storedIP := range c.directIPs {
if ip.Equal(storedIP) {
c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...)

if err := c.setSysPrivileges(); err != nil {
return fmt.Errorf("failed to setup system privileges: %w", err)
}
defer c.releaseSysPrivileges()

if err := c.removeDirectRoute(ip); err != nil {
if err := c.removeDirectRouteFn(ip, i); err != nil {
return err
}

break
}
}
Expand Down Expand Up @@ -397,6 +408,8 @@ func (c *Client) serveConn(conn net.Conn) error {
}

c.setAppStatus(ClientStatusRunning)
c.resetConnDuration()
t := time.NewTicker(time.Second)

defer func() {
if !c.cfg.Killswitch {
Expand Down Expand Up @@ -427,10 +440,19 @@ func (c *Client) serveConn(conn net.Conn) error {
}()

// only one side may fail here, so we wait till at least one fails
select {
case <-connToTunDoneCh:
case <-tunToConnCh:
case <-c.closeC:
serveLoop:
for {
select {
case <-connToTunDoneCh:
break serveLoop
case <-tunToConnCh:
break serveLoop
case <-c.closeC:
break serveLoop
case <-t.C:
atomic.AddInt64(&c.connectedDuration, 1)
c.setConnectionDuration()
}
}

// here we setup system privileges again, so deferred calls may be done safely
Expand Down Expand Up @@ -727,6 +749,12 @@ func (c *Client) setAppStatus(status ClientStatus) {
}
}

func (c *Client) setConnectionDuration() {
if err := c.appCl.SetConnectionDuration(atomic.LoadInt64(&c.connectedDuration)); err != nil {
fmt.Printf("Failed to set connection duration: %v\n", err)
}
}

func (c *Client) setAppError(appErr error) {
if err := c.appCl.SetError(appErr.Error()); err != nil {
fmt.Printf("Failed to set error %v: %v\n", appErr, err)
Expand All @@ -743,6 +771,11 @@ func (c *Client) isClosed() bool {
return false
}

func (c *Client) resetConnDuration() {
atomic.StoreInt64(&c.connectedDuration, 0)
c.setConnectionDuration()
}

func ipFromEnv(key string) (net.IP, error) {
ip, ok, err := IPFromEnv(key)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/app/appserver/mock_proc_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/app/appserver/mock_rpc_ingress_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 35 additions & 15 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ type Proc struct {

statusMx sync.RWMutex
status string
errMx sync.RWMutex
err string
// connection duration (i.e. when vpn client is connected, the app will set the connection duration)
connDuration int64
connDurationMu sync.RWMutex

errMx sync.RWMutex
err string
}

// NewProc constructs `Proc`.
Expand Down Expand Up @@ -280,6 +284,20 @@ func (p *Proc) SetDetailedStatus(status string) {
p.status = status
}

// SetConnectionDuration sets the proc's connection duration
func (p *Proc) SetConnectionDuration(dur int64) {
p.connDurationMu.Lock()
defer p.connDurationMu.Unlock()
p.connDuration = dur
}

// ConnectionDuration gets proc's connection duration
func (p *Proc) ConnectionDuration() int64 {
p.connDurationMu.RLock()
defer p.connDurationMu.RUnlock()
return p.connDuration
}

// DetailedStatus gets proc's detailed status.
func (p *Proc) DetailedStatus() string {
p.statusMx.RLock()
Expand All @@ -306,13 +324,14 @@ func (p *Proc) Error() string {

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
ConnectionDuration int64 `json:"connection_duration,omitempty"`
}

// ConnectionsSummary returns all of the proc's connections stats.
Expand Down Expand Up @@ -348,12 +367,13 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary {
}

summaries = append(summaries, ConnectionSummary{
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
ConnectionDuration: p.ConnectionDuration(),
})

return true
Expand Down
6 changes: 6 additions & 0 deletions pkg/app/appserver/rpc_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// RPCIngressClient describes RPC interface to communicate with the server.
type RPCIngressClient interface {
SetDetailedStatus(status string) error
SetConnectionDuration(dur int64) error
SetError(appErr string) error
Dial(remote appnet.Addr) (connID uint16, localPort routing.Port, err error)
Listen(local appnet.Addr) (uint16, error)
Expand Down Expand Up @@ -47,6 +48,11 @@ func (c *rpcIngressClient) SetDetailedStatus(status string) error {
return c.rpc.Call(c.formatMethod("SetDetailedStatus"), &status, nil)
}

// SetConnectionDuration sets the connection duration for an app
func (c *rpcIngressClient) SetConnectionDuration(dur int64) error {
return c.rpc.Call(c.formatMethod("SetConnectionDuration"), dur, nil)
}

// SetError sets error of an app.
func (c *rpcIngressClient) SetError(appErr string) error {
return c.rpc.Call(c.formatMethod("SetError"), &appErr, nil)
Expand Down
9 changes: 7 additions & 2 deletions pkg/app/appserver/rpc_ingress_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@ func (r *RPCIngressGateway) SetDetailedStatus(status *string, _ *struct{}) (err
return nil
}

// SetConnectionDuration sets the connection duration of an app (vpn-client in this instance)
func (r *RPCIngressGateway) SetConnectionDuration(dur int64, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "SetConnectionDuration", dur)(nil, &err)
r.proc.SetConnectionDuration(dur)
return nil
}

// SetError sets error of an app.
func (r *RPCIngressGateway) SetError(appErr *string, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "SetError", appErr)(nil, &err)

r.proc.SetError(*appErr)

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (c *Client) SetDetailedStatus(status string) error {
return c.rpcC.SetDetailedStatus(status)
}

// SetConnectionDuration sets the detailed app connection duration within the visor.
func (c *Client) SetConnectionDuration(dur int64) error {
return c.rpcC.SetConnectionDuration(dur)
}

// SetError sets app error within the visor.
func (c *Client) SetError(appErr string) error {
return c.rpcC.SetError(appErr)
Expand Down
5 changes: 4 additions & 1 deletion pkg/app/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (l *Launcher) AppStates() []*AppState {
}
if proc, ok := l.procM.ProcByName(app.Name); ok {
state.DetailedStatus = proc.DetailedStatus()
state.Status = AppStatusRunning
connSummary := proc.ConnectionsSummary()
if connSummary != nil {
state.Status = AppStatusRunning
}
}
states = append(states, state)
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/setup/setupclient/mock_route_group_dialer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 244adc2

Please sign in to comment.