Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AppStats: Connection Duration Addition to API #908

Merged
merged 10 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- added transport_setup_nodes field to transport section
- added MinHops field to V1Routing section of config
- added `skywire-cli config` subcommand
- added connection_duration field to `/api/visor/{pk}/apps/vpn-client/connections`

## 0.2.1 - 2020.04.07

Expand Down
63 changes: 48 additions & 15 deletions internal/vpn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,6 +53,8 @@ type Client struct {
tunMu sync.Mutex
tun TUNDevice
tunCreated bool

connectedDuration int64
}

// NewClient creates VPN client instance.
Expand Down Expand Up @@ -176,10 +179,13 @@ func (c *Client) Serve() error {
fmt.Printf("Failed to close TUN: %v\n", err)
}

fmt.Println("Closed TUN")
c.log.Info("Closing TUN")
}()

defer c.setAppStatus(ClientStatusShuttingDown)
defer func() {
c.setAppStatus(ClientStatusShuttingDown)
c.resetConnDuration()
}()

c.setAppStatus(ClientStatusConnecting)

Expand All @@ -205,8 +211,10 @@ func (c *Client) Serve() error {
case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs,
errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound:
c.setAppError(err)
c.resetConnDuration()
return err
default:
c.resetConnDuration()
c.setAppStatus(ClientStatusReconnecting)
c.setAppError(errTimeout)
fmt.Println("\nConnection broke, reconnecting...")
Expand Down Expand Up @@ -252,6 +260,17 @@ func (c *Client) AddDirectRoute(ip net.IP) error {
return c.setupDirectRoute(ip)
}

func (c *Client) removeDirectRouteFn(ip net.IP, i int) error {
c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...)

if err := c.setSysPrivileges(); err != nil {
return fmt.Errorf("failed to setup system privileges: %w", err)
}
defer c.releaseSysPrivileges()

return c.removeDirectRoute(ip)
}

// RemoveDirectRoute removes direct route. Packets destined to `ip` will
// go through VPN.
func (c *Client) RemoveDirectRoute(ip net.IP) error {
Expand All @@ -260,17 +279,9 @@ func (c *Client) RemoveDirectRoute(ip net.IP) error {

for i, storedIP := range c.directIPs {
if ip.Equal(storedIP) {
c.directIPs = append(c.directIPs[:i], c.directIPs[i+1:]...)

if err := c.setSysPrivileges(); err != nil {
return fmt.Errorf("failed to setup system privileges: %w", err)
}
defer c.releaseSysPrivileges()

if err := c.removeDirectRoute(ip); err != nil {
if err := c.removeDirectRouteFn(ip, i); err != nil {
return err
}

break
}
}
Expand Down Expand Up @@ -397,6 +408,8 @@ func (c *Client) serveConn(conn net.Conn) error {
}

c.setAppStatus(ClientStatusRunning)
c.resetConnDuration()
t := time.NewTicker(time.Second)

defer func() {
if !c.cfg.Killswitch {
Expand Down Expand Up @@ -427,10 +440,19 @@ func (c *Client) serveConn(conn net.Conn) error {
}()

// only one side may fail here, so we wait till at least one fails
select {
case <-connToTunDoneCh:
case <-tunToConnCh:
case <-c.closeC:
serveLoop:
for {
select {
case <-connToTunDoneCh:
break serveLoop
case <-tunToConnCh:
break serveLoop
case <-c.closeC:
break serveLoop
case <-t.C:
atomic.AddInt64(&c.connectedDuration, 1)
c.setConnectionDuration()
}
}

// here we setup system privileges again, so deferred calls may be done safely
Expand Down Expand Up @@ -727,6 +749,12 @@ func (c *Client) setAppStatus(status ClientStatus) {
}
}

func (c *Client) setConnectionDuration() {
if err := c.appCl.SetConnectionDuration(atomic.LoadInt64(&c.connectedDuration)); err != nil {
fmt.Printf("Failed to set connection duration: %v\n", err)
}
}

func (c *Client) setAppError(appErr error) {
if err := c.appCl.SetError(appErr.Error()); err != nil {
fmt.Printf("Failed to set error %v: %v\n", appErr, err)
Expand All @@ -743,6 +771,11 @@ func (c *Client) isClosed() bool {
return false
}

func (c *Client) resetConnDuration() {
atomic.StoreInt64(&c.connectedDuration, 0)
c.setConnectionDuration()
}

func ipFromEnv(key string) (net.IP, error) {
ip, ok, err := IPFromEnv(key)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/app/appserver/mock_proc_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/app/appserver/mock_rpc_ingress_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 35 additions & 15 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ type Proc struct {

statusMx sync.RWMutex
status string
errMx sync.RWMutex
err string
// connection duration (i.e. when vpn client is connected, the app will set the connection duration)
connDuration int64
connDurationMu sync.RWMutex

errMx sync.RWMutex
err string
}

// NewProc constructs `Proc`.
Expand Down Expand Up @@ -280,6 +284,20 @@ func (p *Proc) SetDetailedStatus(status string) {
p.status = status
}

// SetConnectionDuration sets the proc's connection duration
func (p *Proc) SetConnectionDuration(dur int64) {
p.connDurationMu.Lock()
defer p.connDurationMu.Unlock()
p.connDuration = dur
}

// ConnectionDuration gets proc's connection duration
func (p *Proc) ConnectionDuration() int64 {
p.connDurationMu.RLock()
defer p.connDurationMu.RUnlock()
return p.connDuration
}

// DetailedStatus gets proc's detailed status.
func (p *Proc) DetailedStatus() string {
p.statusMx.RLock()
Expand All @@ -306,13 +324,14 @@ func (p *Proc) Error() string {

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
ConnectionDuration int64 `json:"connection_duration,omitempty"`
}

// ConnectionsSummary returns all of the proc's connections stats.
Expand Down Expand Up @@ -348,12 +367,13 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary {
}

summaries = append(summaries, ConnectionSummary{
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
ConnectionDuration: p.ConnectionDuration(),
})

return true
Expand Down
6 changes: 6 additions & 0 deletions pkg/app/appserver/rpc_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// RPCIngressClient describes RPC interface to communicate with the server.
type RPCIngressClient interface {
SetDetailedStatus(status string) error
SetConnectionDuration(dur int64) error
SetError(appErr string) error
Dial(remote appnet.Addr) (connID uint16, localPort routing.Port, err error)
Listen(local appnet.Addr) (uint16, error)
Expand Down Expand Up @@ -47,6 +48,11 @@ func (c *rpcIngressClient) SetDetailedStatus(status string) error {
return c.rpc.Call(c.formatMethod("SetDetailedStatus"), &status, nil)
}

// SetConnectionDuration sets the connection duration for an app
func (c *rpcIngressClient) SetConnectionDuration(dur int64) error {
return c.rpc.Call(c.formatMethod("SetConnectionDuration"), dur, nil)
}

// SetError sets error of an app.
func (c *rpcIngressClient) SetError(appErr string) error {
return c.rpc.Call(c.formatMethod("SetError"), &appErr, nil)
Expand Down
9 changes: 7 additions & 2 deletions pkg/app/appserver/rpc_ingress_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@ func (r *RPCIngressGateway) SetDetailedStatus(status *string, _ *struct{}) (err
return nil
}

// SetConnectionDuration sets the connection duration of an app (vpn-client in this instance)
func (r *RPCIngressGateway) SetConnectionDuration(dur int64, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "SetConnectionDuration", dur)(nil, &err)
r.proc.SetConnectionDuration(dur)
return nil
}

// SetError sets error of an app.
func (r *RPCIngressGateway) SetError(appErr *string, _ *struct{}) (err error) {
defer rpcutil.LogCall(r.log, "SetError", appErr)(nil, &err)

r.proc.SetError(*appErr)

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (c *Client) SetDetailedStatus(status string) error {
return c.rpcC.SetDetailedStatus(status)
}

// SetConnectionDuration sets the detailed app connection duration within the visor.
func (c *Client) SetConnectionDuration(dur int64) error {
return c.rpcC.SetConnectionDuration(dur)
}

// SetError sets app error within the visor.
func (c *Client) SetError(appErr string) error {
return c.rpcC.SetError(appErr)
Expand Down
5 changes: 4 additions & 1 deletion pkg/app/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (l *Launcher) AppStates() []*AppState {
}
if proc, ok := l.procM.ProcByName(app.Name); ok {
state.DetailedStatus = proc.DetailedStatus()
state.Status = AppStatusRunning
connSummary := proc.ConnectionsSummary()
if connSummary != nil {
state.Status = AppStatusRunning
}
}
states = append(states, state)
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/setup/setupclient/mock_route_group_dialer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading