diff --git a/go/sig/egress/iface/sesspathpool.go b/go/sig/egress/iface/sesspathpool.go index 65ad5cdff8..a7c8bf26a4 100644 --- a/go/sig/egress/iface/sesspathpool.go +++ b/go/sig/egress/iface/sesspathpool.go @@ -74,6 +74,10 @@ func (spp SessPathPool) GetByKey(key spathmeta.PathKey) *SessPath { return res.SessPath } +func (spp SessPathPool) PathCount() int { + return len(spp) +} + func (spp SessPathPool) Update(aps spathmeta.AppPathSet) { // Remove any old entries that aren't present in the update. for key := range spp { diff --git a/go/sig/egress/reader/reader.go b/go/sig/egress/reader/reader.go index abbae9e792..ea739d2401 100644 --- a/go/sig/egress/reader/reader.go +++ b/go/sig/egress/reader/reader.go @@ -89,7 +89,7 @@ BatchLoop: if dstRing == nil { // Release buffer back to free buffer pool iface.EgressFreePkts.Write(ringbuf.EntryList{buf}, true) - // FIXME(kormat): replace with metric. + metrics.PktUnroutable.Inc() r.log.Error("EgressReader: unable to find dest IA", "ip", dstIP) continue } diff --git a/go/sig/egress/session/sessmon.go b/go/sig/egress/session/sessmon.go index 27f69132d4..2fe1ff65c9 100644 --- a/go/sig/egress/session/sessmon.go +++ b/go/sig/egress/session/sessmon.go @@ -63,9 +63,13 @@ type sessMonitor struct { } func newSessMonitor(sess *Session) *sessMonitor { + // Session starts as unhealthy. + metrics.SessionHealth.WithLabelValues(sess.IA().String(), + sess.SessId.String()).Set(0.0) return &sessMonitor{ - Logger: sess.Logger, - sess: sess, pool: sess.pool, + Logger: sess.Logger, + sess: sess, + pool: sess.pool, sessPathPool: iface.NewSessPathPool(), } } @@ -121,6 +125,8 @@ func (sm *sessMonitor) updatePaths() { expTime := currPath.PathEntry().Path.ExpTime mtu := currPath.PathEntry().Path.Mtu sm.sessPathPool.Update(sm.pool.Paths()) + metrics.SessionPaths.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Set(float64(sm.sessPathPool.PathCount())) // Expiration or MTU of the current path may have changed during the update. // In such a case we want to push the updated path to the Session. if currPath.PathEntry().Path.ExpTime != expTime || currPath.PathEntry().Path.Mtu != mtu { @@ -143,7 +149,7 @@ func (sm *sessMonitor) updateRemote() { metrics.SessionTimedOut.WithLabelValues( sm.sess.IA().String(), sm.sess.SessId.String()).Inc() - sm.sess.healthy.Store(false) + sm.setHealth(false) if sm.smRemote.SessPath != nil { // Update path statistics. This is a bit of a stretch. The path // may be OK, but the remote SIG may be down. However, we accept @@ -153,7 +159,7 @@ func (sm *sessMonitor) updateRemote() { } // Start monitoring new path and discover a new SIG. sm.smRemote.Sig.Host = addr.SvcSIG - sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath) + sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "timeout") // XXX(roosd): The session's remote SIG will remain the same until the // monitor discovers a remote SIG. sm.updateSessSnap() @@ -165,9 +171,9 @@ func (sm *sessMonitor) updateRemote() { // but also when the pool is empty. Try to get a new path. if sm.smRemote.SessPath == nil { sm.Info("sessMonitor: Path not available", "remote", sm.smRemote) - sm.sess.healthy.Store(false) + sm.setHealth(false) // Start monitoring the new path. - sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath) + sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "no_path") sm.updateSessSnap() sm.Info("sessMonitor: New remote", "remote", sm.smRemote) return @@ -181,7 +187,7 @@ func (sm *sessMonitor) updateRemote() { if updatedPath == nil { sm.Info("sessMonitor: Current path was invalidated", "remote", sm.smRemote) // Start monitoring the new path. - sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath) + sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "retired") // Make session use the new path immediately even though we haven't yet checked // whether it works. sm.updateSessSnap() @@ -195,7 +201,7 @@ func (sm *sessMonitor) updateRemote() { sm.Info("sessMonitor: Current path is about to expire", "remote", sm.smRemote) sm.smRemote.SessPath = updatedPath if sm.smRemote.SessPath.IsCloseToExpiry() { - sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath) + sm.smRemote.SessPath = sm.getNewPath(sm.smRemote.SessPath, "expired") } sm.updateSessSnap() sm.Info("sessMonitor: New remote", "remote", sm.smRemote) @@ -218,9 +224,14 @@ func (sm *sessMonitor) updateSessSnap() { remote.Sig = old.Sig } sm.sess.currRemote.Store(remote) + if remote.SessPath != nil { + mtu := remote.SessPath.PathEntry().Path.Mtu + metrics.SessionMTU.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Set(float64(mtu)) + } } -func (sm *sessMonitor) getNewPath(old *iface.SessPath) *iface.SessPath { +func (sm *sessMonitor) getNewPath(old *iface.SessPath, reason string) *iface.SessPath { var res *iface.SessPath if old == nil { res = sm.sessPathPool.Get("") @@ -235,9 +246,8 @@ func (sm *sessMonitor) getNewPath(old *iface.SessPath) *iface.SessPath { report = old.Key() != res.Key() } if report { - metrics.SessionPathSwitched.WithLabelValues( - sm.sess.IA().String(), - sm.sess.SessId.String()).Inc() + metrics.SessionPathSwitched.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String(), reason).Inc() } return res } @@ -284,6 +294,7 @@ func (sm *sessMonitor) sendReq() { if err != nil { sm.Error("sessMonitor: Error sending signed Ctrl payload", "err", err) } + metrics.SessionProbes.WithLabelValues(sm.sess.IA().String(), sm.sess.SessId.String()).Inc() } func (sm *sessMonitor) handleRep(rpld *disp.RegPld) { @@ -298,6 +309,8 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) { "expected", sm.sess.IA(), "actual", rpld.Addr.IA) return } + metrics.SessionProbeReplies.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Inc() // Inform SessPathPool that a reply has arrived. if sm.smRemote.SessPath != nil { @@ -320,8 +333,14 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) { if sessRemote == nil || !sm.smRemote.Sig.Equal(sessRemote.Sig) { sm.updateSessSnap() sm.Info("sessMonitor: updating remote Info", "msgId", rpld.Id, "remote", sm.smRemote) + metrics.SessionRemoteSwitched.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Inc() } - sm.sess.healthy.Store(true) + sm.setHealth(true) + + latency := time.Now().Sub(rpld.Id.Time()) + metrics.SessionProbeRTT.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Observe(latency.Seconds()) } else { // This is going to happen if latency of the path is greater than the poll ticker period. sm.Info("Reply to an old request received", "request", sm.updateMsgId, "reply", rpld.Id) @@ -330,3 +349,13 @@ func (sm *sessMonitor) handleRep(rpld *disp.RegPld) { sm.sess.SessId.String()).Inc() } } + +func (sm *sessMonitor) setHealth(healthy bool) { + sm.sess.healthy.Store(healthy) + var healthVal float64 + if healthy { + healthVal = 1 + } + metrics.SessionHealth.WithLabelValues(sm.sess.IA().String(), + sm.sess.SessId.String()).Set(healthVal) +} diff --git a/go/sig/metrics/metrics.go b/go/sig/metrics/metrics.go index 6c752f92ce..123fde8882 100644 --- a/go/sig/metrics/metrics.go +++ b/go/sig/metrics/metrics.go @@ -34,6 +34,7 @@ const Namespace = "sig" // Declare prometheus metrics to export. var ( + PktUnroutable prometheus.Counter PktsRecv *prometheus.CounterVec PktsSent *prometheus.CounterVec PktBytesRecv *prometheus.CounterVec @@ -49,6 +50,13 @@ var ( SessionTimedOut *prometheus.CounterVec SessionPathSwitched *prometheus.CounterVec SessionOldPollReplies *prometheus.CounterVec + SessionProbes *prometheus.CounterVec + SessionProbeReplies *prometheus.CounterVec + SessionProbeRTT *prometheus.HistogramVec + SessionPaths *prometheus.GaugeVec + SessionMTU *prometheus.GaugeVec + SessionHealth *prometheus.GaugeVec + SessionRemoteSwitched *prometheus.CounterVec EgressRxQueueFull *prometheus.CounterVec ) @@ -66,7 +74,17 @@ func init() { newCVec := func(name, help string, lNames []string) *prometheus.CounterVec { return prom.NewCounterVec(Namespace, "", name, help, lNames) } + newHVec := func(name, help string, lNames []string, + buckets []float64) *prometheus.HistogramVec { + + return prom.NewHistogramVec(Namespace, "", name, help, lNames, buckets) + } + newGVec := func(name, help string, lNames []string) *prometheus.GaugeVec { + return prom.NewGaugeVec(Namespace, "", name, help, lNames) + } // FIXME(kormat): these metrics should probably have more informative labels + PktUnroutable = newC("pkt_unroutable", + "Number of egress packets that can't be routed to any remote AS.") PktsRecv = newCVec("pkts_recv_total", "Number of packets received.", iaLabels) PktsSent = newCVec("pkts_sent_total", "Number of packets sent.", iaLabels) PktBytesRecv = newCVec("pkt_bytes_recv_total", "Number of packet bytes received.", iaLabels) @@ -80,9 +98,20 @@ func init() { FramesTooOld = newC("frames_too_old_total", "Number of frames that are too old.") FramesDuplicated = newC("frames_duplicated_total", "Number of duplicate frames.") SessionTimedOut = newCVec("session_timeout", "Number of pollreq timeouts", iaLabels) - SessionPathSwitched = newCVec("session_switch_path", "Number of path switches", iaLabels) + SessionPathSwitched = newCVec("session_switch_path", "Number of path switches", + append(iaLabels, "reason")) SessionOldPollReplies = newCVec("session_old_poll_replies", "Number of poll replies received after next poll request was sent", iaLabels) + SessionProbes = newCVec("session_probes", "Number of probes sent", iaLabels) + SessionProbeReplies = newCVec("session_probe_replies", + "Number of probe replies received", iaLabels) + SessionProbeRTT = newHVec("session_probe_rtt", "Probe roundtrip time", + iaLabels, prom.DefaultLatencyBuckets) + SessionPaths = newGVec("session_paths", "Number of available paths", iaLabels) + SessionMTU = newGVec("session_mtu", "MTU used by the session", iaLabels) + SessionHealth = newGVec("session_health", "Session health (either 1 or 0)", iaLabels) + SessionRemoteSwitched = newCVec("session_switch_remote", + "Number of times the remote has changed.", iaLabels) EgressRxQueueFull = newCVec("egress_recv_queue_full_total", "Egress packets dropped due to full queues.", []string{"IA"})