Skip to content

Commit

Permalink
Add metrics for blocked eval resources (#10454)
Browse files Browse the repository at this point in the history
* add metrics for blocked eval resources

* docs: add new blocked_evals metrics

* fix to call `pruneStats` instead of `stats.prune` directly
  • Loading branch information
lgfa29 committed Apr 29, 2021
1 parent fc5056b commit c711492
Show file tree
Hide file tree
Showing 11 changed files with 962 additions and 452 deletions.
1 change: 1 addition & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ type AllocationMetric struct {
ClassExhausted map[string]int
DimensionExhausted map[string]int
QuotaExhausted []string
ResourcesExhausted map[string]*Resources
// Deprecated, replaced with ScoreMetaData
Scores map[string]float64
AllocationTime time.Duration
Expand Down
82 changes: 51 additions & 31 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,6 @@ type wrappedEval struct {
token string
}

// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
// computed node classes.
TotalEscaped int

// TotalBlocked is the total number of blocked evaluations.
TotalBlocked int

// TotalQuotaLimit is the total number of blocked evaluations that are due
// to the quota limit being reached.
TotalQuotaLimit int
}

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
Expand All @@ -123,7 +109,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stats: new(BlockedStats),
stats: NewBlockedStats(),
}
}

Expand Down Expand Up @@ -209,7 +195,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {

// Mark the job as tracked.
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++
b.stats.Block(eval)

// Track that the evaluation is being added due to reaching the quota limit
if eval.QuotaLimitReached != "" {
Expand Down Expand Up @@ -263,7 +249,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCa
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
b.stats.Unblock(eval)
dup = existingW.eval
} else {
dup = eval
Expand Down Expand Up @@ -379,7 +365,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
if evals, ok := b.system.JobEvals(nsID); ok {
for _, e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
b.stats.Unblock(e)
}
return
}
Expand All @@ -395,7 +381,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
b.stats.Unblock(w.eval)
if w.eval.QuotaLimitReached != "" {
b.stats.TotalQuotaLimit--
}
Expand All @@ -405,7 +391,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
b.stats.Unblock(w.eval)
if w.eval.QuotaLimitReached != "" {
b.stats.TotalQuotaLimit--
}
Expand Down Expand Up @@ -511,7 +497,7 @@ func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) {

for e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
b.stats.Unblock(e)
}

b.evalBroker.EnqueueAll(evals)
Expand Down Expand Up @@ -583,11 +569,13 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
}
}

if l := len(unblocked); l != 0 {
if len(unblocked) != 0 {
// Update the counters
b.stats.TotalEscaped = 0
b.stats.TotalBlocked -= l
b.stats.TotalQuotaLimit -= numQuotaLimit
for eval := range unblocked {
b.stats.Unblock(eval)
}

// Enqueue all the unblocked evals into the broker.
b.evalBroker.EnqueueAll(unblocked)
Expand Down Expand Up @@ -630,9 +618,12 @@ func (b *BlockedEvals) UnblockFailed() {
}
}

if l := len(unblocked); l > 0 {
b.stats.TotalBlocked -= l
if len(unblocked) > 0 {
b.stats.TotalQuotaLimit -= quotaLimit
for eval := range unblocked {
b.stats.Unblock(eval)
}

b.evalBroker.EnqueueAll(unblocked)
}
}
Expand Down Expand Up @@ -683,6 +674,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalEscaped = 0
b.stats.TotalBlocked = 0
b.stats.TotalQuotaLimit = 0
b.stats.BlockedResources = NewBlockedResourcesStats()
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[structs.NamespacedID]string)
Expand All @@ -698,7 +690,7 @@ func (b *BlockedEvals) Flush() {
// Stats is used to query the state of the blocked eval tracker.
func (b *BlockedEvals) Stats() *BlockedStats {
// Allocate a new stats struct
stats := new(BlockedStats)
stats := NewBlockedStats()

b.l.RLock()
defer b.l.RUnlock()
Expand All @@ -707,6 +699,8 @@ func (b *BlockedEvals) Stats() *BlockedStats {
stats.TotalEscaped = b.stats.TotalEscaped
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalQuotaLimit = b.stats.TotalQuotaLimit
stats.BlockedResources = b.stats.BlockedResources.Copy()

return stats
}

Expand All @@ -719,6 +713,24 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_escaped"}, float32(stats.TotalEscaped))

for k, v := range stats.BlockedResources.ByJob {
labels := []metrics.Label{
{Name: "namespace", Value: k.Namespace},
{Name: "job", Value: k.ID},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "memory"}, float32(v.MemoryMB), labels)
}

for k, v := range stats.BlockedResources.ByNodeInfo {
labels := []metrics.Label{
{Name: "datacenter", Value: k.Datacenter},
{Name: "node_class", Value: k.NodeClass},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "memory"}, float32(v.MemoryMB), labels)
}
case <-stopCh:
return
}
Expand All @@ -734,28 +746,36 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
select {
case <-stopCh:
return
case <-ticker.C:
b.pruneUnblockIndexes()
case t := <-ticker.C:
cutoff := t.UTC().Add(-1 * pruneThreshold)
b.pruneUnblockIndexes(cutoff)
b.pruneStats(cutoff)
}
}
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes() {
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

cutoff := time.Now().UTC().Add(-1 * pruneThreshold)
oldThreshold := b.timetable.NearestIndex(cutoff)

for key, index := range b.unblockIndexes {
if index < oldThreshold {
delete(b.unblockIndexes, key)
}
}
}

// pruneStats is used to prune any zero value stats that are excessively old.
func (b *BlockedEvals) pruneStats(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

b.stats.prune(cutoff)
}
Loading

0 comments on commit c711492

Please sign in to comment.