Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Improve connect op metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Oct 5, 2023
1 parent f903805 commit 29fe5d8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
75 changes: 72 additions & 3 deletions crew/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

var (
newConnectOp *metrics.Counter
connectOpCnt *metrics.Counter
connectOpCntError *metrics.Counter
connectOpCntBadRequest *metrics.Counter
connectOpCntCanceled *metrics.Counter
connectOpCntFailed *metrics.Counter
connectOpCntConnected *metrics.Counter
connectOpCntRateLimited *metrics.Counter

connectOpIncomingBytes *metrics.Counter
connectOpOutgoingBytes *metrics.Counter

Expand All @@ -29,9 +36,9 @@ func registerMetrics() (err error) {
return nil
}

// Connect Op Stats.
// Connect Op Stats on client.

newConnectOp, err = metrics.NewCounter(
connectOpCnt, err = metrics.NewCounter(
"spn/op/connect/total",
nil,
&metrics.Options{
Expand All @@ -45,6 +52,68 @@ func registerMetrics() (err error) {
return err
}

// Connect Op Stats on server.

connectOpCntOptions := &metrics.Options{
Name: "SPN Total Connect Operations",
Permission: api.PermitUser,
Persist: true,
}

connectOpCntError, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "error"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntBadRequest, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "bad_request"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntCanceled, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "canceled"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntFailed, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "failed"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntConnected, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "connected"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntRateLimited, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "rate_limited"},
connectOpCntOptions,
)
if err != nil {
return err
}

_, err = metrics.NewGauge(
"spn/op/connect/active",
nil,
Expand Down
23 changes: 18 additions & 5 deletions crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func init() {
// NewConnectOp starts a new connect operation.
func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()
connectOpCnt.Inc()

// Create request.
request := &ConnectRequest{
Expand Down Expand Up @@ -168,9 +168,6 @@ func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
}

func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()

// Check if we are running a public hub.
if !conf.PublicHub() {
return nil, terminal.ErrPermissionDenied.With("connecting is only allowed on public hubs")
Expand All @@ -180,14 +177,17 @@ func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container)
request := &ConnectRequest{}
_, err := dsd.Load(data.CompileData(), request)
if err != nil {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrMalformedData.With("failed to parse connect request: %w", err)
}
if request.QueueSize == 0 || request.QueueSize > terminal.MaxQueueSize {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("invalid queue size of %d", request.QueueSize)
}

// Check if IP seems valid.
if len(request.IP) != net.IPv4len && len(request.IP) != net.IPv6len {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("ip address is not valid")
}

Expand All @@ -213,6 +213,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
if sessionTerm, ok := op.t.(terminal.SessionTerminal); ok {
session = sessionTerm.GetSession()
} else {
connectOpCntError.Inc()
log.Errorf("spn/crew: %T is not a session terminal, aborting op %s#%d", op.t, op.t.FmtID(), op.ID())
op.Stop(op, terminal.ErrInternalError.With("no session available"))
return nil
Expand All @@ -225,6 +226,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {

// If context was canceled, stop operation.
if cancelErr != nil {
connectOpCntCanceled.Inc()
op.Stop(op, terminal.ErrCanceled.With(cancelErr.Error()))
}

Expand All @@ -235,11 +237,14 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
func (op *ConnectOp) setup(session *terminal.Session) {
// Rate limit before connecting.
if tErr := session.RateLimit(); tErr != nil {
// Fake connection error when rate limited.
// Add rate limit info to error.
if tErr.Is(terminal.ErrRateLimited) {
connectOpCntRateLimited.Inc()
op.Stop(op, tErr.With(session.RateLimitInfo()))
return
}

connectOpCntError.Inc()
op.Stop(op, tErr)
return
}
Expand All @@ -248,27 +253,31 @@ func (op *ConnectOp) setup(session *terminal.Session) {
ipScope := netutils.GetIPScope(op.request.IP)
if ipScope != netutils.Global {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", op.request.IP))
return
}

// Check exit policy.
if tErr := checkExitPolicy(op.request); tErr != nil {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, tErr)
return
}

// Check one last time before connecting if operation was not canceled.
if op.Ctx().Err() != nil {
op.Stop(op, terminal.ErrCanceled.With(op.Ctx().Err().Error()))
connectOpCntCanceled.Inc()
return
}

// Connect to destination.
dialNet := op.request.DialNetwork()
if dialNet == "" {
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrIncorrectUsage.With("protocol %s is not supported", op.request.Protocol))
return
}
Expand All @@ -285,10 +294,13 @@ func (op *ConnectOp) setup(session *terminal.Session) {
switch {
case errors.As(err, &netError) && netError.Timeout():
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntFailed.Inc()
case errors.Is(err, context.Canceled):
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntCanceled.Inc()
default:
session.ReportSuspiciousActivity(terminal.SusFactorWeirdButOK)
connectOpCntFailed.Inc()
}

op.Stop(op, terminal.ErrConnectionError.With("failed to connect to %s: %w", op.request, err))
Expand All @@ -301,6 +313,7 @@ func (op *ConnectOp) setup(session *terminal.Session) {
module.StartWorker("connect op conn writer", op.connWriter)
module.StartWorker("connect op flow handler", op.dfq.FlowHandler)

connectOpCntConnected.Inc()
log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), op.request)
}

Expand Down

0 comments on commit 29fe5d8

Please sign in to comment.