Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for blocked eval resources #10454

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ type AllocationMetric struct {
ClassExhausted map[string]int
DimensionExhausted map[string]int
QuotaExhausted []string
ResourcesExhausted map[string]*Resources
// Deprecated, replaced with ScoreMetaData
Scores map[string]float64
AllocationTime time.Duration
Expand Down
82 changes: 51 additions & 31 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,6 @@ type wrappedEval struct {
token string
}

// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
// computed node classes.
TotalEscaped int

// TotalBlocked is the total number of blocked evaluations.
TotalBlocked int

// TotalQuotaLimit is the total number of blocked evaluations that are due
// to the quota limit being reached.
TotalQuotaLimit int
}

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
Expand All @@ -123,7 +109,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stats: new(BlockedStats),
stats: NewBlockedStats(),
}
}

Expand Down Expand Up @@ -209,7 +195,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {

// Mark the job as tracked.
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++
b.stats.Block(eval)

// Track that the evaluation is being added due to reaching the quota limit
if eval.QuotaLimitReached != "" {
Expand Down Expand Up @@ -263,7 +249,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCa
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
b.stats.Unblock(eval)
dup = existingW.eval
} else {
dup = eval
Expand Down Expand Up @@ -379,7 +365,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
if evals, ok := b.system.JobEvals(nsID); ok {
for _, e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
b.stats.Unblock(e)
}
return
}
Expand All @@ -395,7 +381,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
b.stats.Unblock(w.eval)
if w.eval.QuotaLimitReached != "" {
b.stats.TotalQuotaLimit--
}
Expand All @@ -405,7 +391,7 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
b.stats.Unblock(w.eval)
if w.eval.QuotaLimitReached != "" {
b.stats.TotalQuotaLimit--
}
Expand Down Expand Up @@ -511,7 +497,7 @@ func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) {

for e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
b.stats.Unblock(e)
}

b.evalBroker.EnqueueAll(evals)
Expand Down Expand Up @@ -583,11 +569,13 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
}
}

if l := len(unblocked); l != 0 {
if len(unblocked) != 0 {
// Update the counters
b.stats.TotalEscaped = 0
b.stats.TotalBlocked -= l
b.stats.TotalQuotaLimit -= numQuotaLimit
for eval := range unblocked {
b.stats.Unblock(eval)
}

// Enqueue all the unblocked evals into the broker.
b.evalBroker.EnqueueAll(unblocked)
Expand Down Expand Up @@ -630,9 +618,12 @@ func (b *BlockedEvals) UnblockFailed() {
}
}

if l := len(unblocked); l > 0 {
b.stats.TotalBlocked -= l
if len(unblocked) > 0 {
b.stats.TotalQuotaLimit -= quotaLimit
for eval := range unblocked {
b.stats.Unblock(eval)
}

b.evalBroker.EnqueueAll(unblocked)
}
}
Expand Down Expand Up @@ -683,6 +674,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalEscaped = 0
b.stats.TotalBlocked = 0
b.stats.TotalQuotaLimit = 0
b.stats.BlockedResources = NewBlockedResourcesStats()
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[structs.NamespacedID]string)
Expand All @@ -698,7 +690,7 @@ func (b *BlockedEvals) Flush() {
// Stats is used to query the state of the blocked eval tracker.
func (b *BlockedEvals) Stats() *BlockedStats {
// Allocate a new stats struct
stats := new(BlockedStats)
stats := NewBlockedStats()

b.l.RLock()
defer b.l.RUnlock()
Expand All @@ -707,6 +699,8 @@ func (b *BlockedEvals) Stats() *BlockedStats {
stats.TotalEscaped = b.stats.TotalEscaped
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalQuotaLimit = b.stats.TotalQuotaLimit
stats.BlockedResources = b.stats.BlockedResources.Copy()

return stats
}

Expand All @@ -719,6 +713,24 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_escaped"}, float32(stats.TotalEscaped))

for k, v := range stats.BlockedResources.ByJob {
labels := []metrics.Label{
{Name: "namespace", Value: k.Namespace},
{Name: "job", Value: k.ID},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "memory"}, float32(v.MemoryMB), labels)
}

for k, v := range stats.BlockedResources.ByNodeInfo {
labels := []metrics.Label{
{Name: "datacenter", Value: k.Datacenter},
{Name: "node_class", Value: k.NodeClass},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "memory"}, float32(v.MemoryMB), labels)
}
case <-stopCh:
return
}
Expand All @@ -734,28 +746,36 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
select {
case <-stopCh:
return
case <-ticker.C:
b.pruneUnblockIndexes()
case t := <-ticker.C:
cutoff := t.UTC().Add(-1 * pruneThreshold)
b.pruneUnblockIndexes(cutoff)
b.stats.prune(cutoff)
}
}
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes() {
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

cutoff := time.Now().UTC().Add(-1 * pruneThreshold)
oldThreshold := b.timetable.NearestIndex(cutoff)

for key, index := range b.unblockIndexes {
if index < oldThreshold {
delete(b.unblockIndexes, key)
}
}
}

// pruneStats is used to prune any zero value stats that are excessively old.
func (b *BlockedEvals) pruneStats(cutoff time.Time) {
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
b.l.Lock()
defer b.l.Unlock()

b.stats.prune(cutoff)
}
Loading