Skip to content

Commit

Permalink
SIG: Sessmon metrics (#3329)
Browse files Browse the repository at this point in the history
  • Loading branch information
sustrik authored Nov 8, 2019
1 parent 1cb9d08 commit 7893e90
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 15 deletions.
4 changes: 4 additions & 0 deletions go/sig/egress/iface/sesspathpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (spp SessPathPool) GetByKey(key spathmeta.PathKey) *SessPath {
return res.SessPath
}

func (spp SessPathPool) PathCount() int {
return len(spp)
}

func (spp SessPathPool) Update(aps spathmeta.AppPathSet) {
// Remove any old entries that aren't present in the update.
for key := range spp {
Expand Down
2 changes: 1 addition & 1 deletion go/sig/egress/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ BatchLoop:
if dstRing == nil {
// Release buffer back to free buffer pool
iface.EgressFreePkts.Write(ringbuf.EntryList{buf}, true)
// FIXME(kormat): replace with metric.
metrics.PktUnroutable.Inc()
r.log.Error("EgressReader: unable to find dest IA", "ip", dstIP)
continue
}
Expand Down
55 changes: 42 additions & 13 deletions go/sig/egress/session/sessmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ type sessMonitor struct {
}

func newSessMonitor(sess *Session) *sessMonitor {
// Session starts as unhealthy.
metrics.SessionHealth.WithLabelValues(sess.IA().String(),
sess.SessId.String()).Set(0.0)
return &sessMonitor{
Logger: sess.Logger,
sess: sess, pool: sess.pool,
Logger: sess.Logger,
sess: sess,
pool: sess.pool,
sessPathPool: iface.NewSessPathPool(),
}
}
Expand Down Expand Up @@ -121,6 +125,8 @@ func (sm *sessMonitor) updatePaths() {
expTime := currPath.PathEntry().Path.ExpTime
mtu := currPath.PathEntry().Path.Mtu
sm.sessPathPool.Update(sm.pool.Paths())
metrics.SessionPaths.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Set(float64(sm.sessPathPool.PathCount()))
// Expiration or MTU of the current path may have changed during the update.
// In such a case we want to push the updated path to the Session.
if currPath.PathEntry().Path.ExpTime != expTime || currPath.PathEntry().Path.Mtu != mtu {
Expand All @@ -143,7 +149,7 @@ func (sm *sessMonitor) updateRemote() {
metrics.SessionTimedOut.WithLabelValues(
sm.sess.IA().String(),
sm.sess.SessId.String()).Inc()
sm.sess.healthy.Store(false)
sm.setHealth(false)
if sm.smRemote.SessPath != nil {
// Update path statistics. This is a bit of a stretch. The path
// may be OK, but the remote SIG may be down. However, we accept
Expand All @@ -153,7 +159,7 @@ func (sm *sessMonitor) updateRemote() {
}
// Start monitoring new path and discover a new SIG.
sm.smRemote.Sig.Host = addr.SvcSIG
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath)
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "timeout")
// XXX(roosd): The session's remote SIG will remain the same until the
// monitor discovers a remote SIG.
sm.updateSessSnap()
Expand All @@ -165,9 +171,9 @@ func (sm *sessMonitor) updateRemote() {
// but also when the pool is empty. Try to get a new path.
if sm.smRemote.SessPath == nil {
sm.Info("sessMonitor: Path not available", "remote", sm.smRemote)
sm.sess.healthy.Store(false)
sm.setHealth(false)
// Start monitoring the new path.
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath)
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "no_path")
sm.updateSessSnap()
sm.Info("sessMonitor: New remote", "remote", sm.smRemote)
return
Expand All @@ -181,7 +187,7 @@ func (sm *sessMonitor) updateRemote() {
if updatedPath == nil {
sm.Info("sessMonitor: Current path was invalidated", "remote", sm.smRemote)
// Start monitoring the new path.
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath)
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "retired")
// Make session use the new path immediately even though we haven't yet checked
// whether it works.
sm.updateSessSnap()
Expand All @@ -195,7 +201,7 @@ func (sm *sessMonitor) updateRemote() {
sm.Info("sessMonitor: Current path is about to expire", "remote", sm.smRemote)
sm.smRemote.SessPath = updatedPath
if sm.smRemote.SessPath.IsCloseToExpiry() {
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath)
sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "expired")
}
sm.updateSessSnap()
sm.Info("sessMonitor: New remote", "remote", sm.smRemote)
Expand All @@ -218,9 +224,14 @@ func (sm *sessMonitor) updateSessSnap() {
remote.Sig = old.Sig
}
sm.sess.currRemote.Store(remote)
if remote.SessPath != nil {
mtu := remote.SessPath.PathEntry().Path.Mtu
metrics.SessionMTU.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Set(float64(mtu))
}
}

func (sm *sessMonitor) getNewPath(old *iface.SessPath) *iface.SessPath {
func (sm *sessMonitor) getNewPath(old *iface.SessPath, reason string) *iface.SessPath {
var res *iface.SessPath
if old == nil {
res = sm.sessPathPool.Get("")
Expand All @@ -235,9 +246,8 @@ func (sm *sessMonitor) getNewPath(old *iface.SessPath) *iface.SessPath {
report = old.Key() != res.Key()
}
if report {
metrics.SessionPathSwitched.WithLabelValues(
sm.sess.IA().String(),
sm.sess.SessId.String()).Inc()
metrics.SessionPathSwitched.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String(), reason).Inc()
}
return res
}
Expand Down Expand Up @@ -284,6 +294,7 @@ func (sm *sessMonitor) sendReq() {
if err != nil {
sm.Error("sessMonitor: Error sending signed Ctrl payload", "err", err)
}
metrics.SessionProbes.WithLabelValues(sm.sess.IA().String(), sm.sess.SessId.String()).Inc()
}

func (sm *sessMonitor) handleRep(rpld *disp.RegPld) {
Expand All @@ -298,6 +309,8 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) {
"expected", sm.sess.IA(), "actual", rpld.Addr.IA)
return
}
metrics.SessionProbeReplies.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Inc()

// Inform SessPathPool that a reply has arrived.
if sm.smRemote.SessPath != nil {
Expand All @@ -320,8 +333,14 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) {
if sessRemote == nil || !sm.smRemote.Sig.Equal(sessRemote.Sig) {
sm.updateSessSnap()
sm.Info("sessMonitor: updating remote Info", "msgId", rpld.Id, "remote", sm.smRemote)
metrics.SessionRemoteSwitched.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Inc()
}
sm.sess.healthy.Store(true)
sm.setHealth(true)

latency := time.Now().Sub(rpld.Id.Time())
metrics.SessionProbeRTT.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Observe(latency.Seconds())
} else {
// This is going to happen if latency of the path is greater than the poll ticker period.
sm.Info("Reply to an old request received", "request", sm.updateMsgId, "reply", rpld.Id)
Expand All @@ -330,3 +349,13 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) {
sm.sess.SessId.String()).Inc()
}
}

func (sm *sessMonitor) setHealth(healthy bool) {
sm.sess.healthy.Store(healthy)
var healthVal float64
if healthy {
healthVal = 1
}
metrics.SessionHealth.WithLabelValues(sm.sess.IA().String(),
sm.sess.SessId.String()).Set(healthVal)
}
31 changes: 30 additions & 1 deletion go/sig/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Namespace = "sig"

// Declare prometheus metrics to export.
var (
PktUnroutable prometheus.Counter
PktsRecv *prometheus.CounterVec
PktsSent *prometheus.CounterVec
PktBytesRecv *prometheus.CounterVec
Expand All @@ -49,6 +50,13 @@ var (
SessionTimedOut *prometheus.CounterVec
SessionPathSwitched *prometheus.CounterVec
SessionOldPollReplies *prometheus.CounterVec
SessionProbes *prometheus.CounterVec
SessionProbeReplies *prometheus.CounterVec
SessionProbeRTT *prometheus.HistogramVec
SessionPaths *prometheus.GaugeVec
SessionMTU *prometheus.GaugeVec
SessionHealth *prometheus.GaugeVec
SessionRemoteSwitched *prometheus.CounterVec

EgressRxQueueFull *prometheus.CounterVec
)
Expand All @@ -66,7 +74,17 @@ func init() {
newCVec := func(name, help string, lNames []string) *prometheus.CounterVec {
return prom.NewCounterVec(Namespace, "", name, help, lNames)
}
newHVec := func(name, help string, lNames []string,
buckets []float64) *prometheus.HistogramVec {

return prom.NewHistogramVec(Namespace, "", name, help, lNames, buckets)
}
newGVec := func(name, help string, lNames []string) *prometheus.GaugeVec {
return prom.NewGaugeVec(Namespace, "", name, help, lNames)
}
// FIXME(kormat): these metrics should probably have more informative labels
PktUnroutable = newC("pkt_unroutable",
"Number of egress packets that can't be routed to any remote AS.")
PktsRecv = newCVec("pkts_recv_total", "Number of packets received.", iaLabels)
PktsSent = newCVec("pkts_sent_total", "Number of packets sent.", iaLabels)
PktBytesRecv = newCVec("pkt_bytes_recv_total", "Number of packet bytes received.", iaLabels)
Expand All @@ -80,9 +98,20 @@ func init() {
FramesTooOld = newC("frames_too_old_total", "Number of frames that are too old.")
FramesDuplicated = newC("frames_duplicated_total", "Number of duplicate frames.")
SessionTimedOut = newCVec("session_timeout", "Number of pollreq timeouts", iaLabels)
SessionPathSwitched = newCVec("session_switch_path", "Number of path switches", iaLabels)
SessionPathSwitched = newCVec("session_switch_path", "Number of path switches",
append(iaLabels, "reason"))
SessionOldPollReplies = newCVec("session_old_poll_replies",
"Number of poll replies received after next poll request was sent", iaLabels)
SessionProbes = newCVec("session_probes", "Number of probes sent", iaLabels)
SessionProbeReplies = newCVec("session_probe_replies",
"Number of probe replies received", iaLabels)
SessionProbeRTT = newHVec("session_probe_rtt", "Probe roundtrip time",
iaLabels, prom.DefaultLatencyBuckets)
SessionPaths = newGVec("session_paths", "Number of available paths", iaLabels)
SessionMTU = newGVec("session_mtu", "MTU used by the session", iaLabels)
SessionHealth = newGVec("session_health", "Session health (either 1 or 0)", iaLabels)
SessionRemoteSwitched = newCVec("session_switch_remote",
"Number of times the remote has changed.", iaLabels)

EgressRxQueueFull = newCVec("egress_recv_queue_full_total",
"Egress packets dropped due to full queues.", []string{"IA"})
Expand Down

0 comments on commit 7893e90

Please sign in to comment.