Skip to content

Commit

Permalink
refactor blocked evals metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Apr 28, 2021
1 parent fb2372b commit e36120c
Show file tree
Hide file tree
Showing 5 changed files with 665 additions and 437 deletions.
210 changes: 18 additions & 192 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,181 +95,6 @@ type wrappedEval struct {
token string
}

// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
// computed node classes.
TotalEscaped int

// TotalBlocked is the total number of blocked evaluations.
TotalBlocked int

// TotalQuotaLimit is the total number of blocked evaluations that are due
// to the quota limit being reached.
TotalQuotaLimit int

// BlockedResources stores the amount of resources requested by blocked
// evaluations.
BlockedResources BlockedResourcesStats
}

// NewBlockedStats returns a new BlockedStats.
func NewBlockedStats() *BlockedStats {
return &BlockedStats{
BlockedResources: NewBlockedResourcesStats(),
}
}

// Block updates the stats for the blocked eval tracker with the details of the
// evaluation being blocked.
func (b *BlockedStats) Block(eval *structs.Evaluation) {
b.TotalBlocked++
resourceStats := generateResourceStats(eval)
b.BlockedResources = b.BlockedResources.Add(resourceStats)
}

// Unblock updates the stats for the blocked eval tracker with the details of the
// evaluation being unblocked.
func (b *BlockedStats) Unblock(eval *structs.Evaluation) {
b.TotalBlocked--
resourceStats := generateResourceStats(eval)
b.BlockedResources = b.BlockedResources.Subtract(resourceStats)
}

// generateResourceStats returns a summary of the resources requested by the
// input evaluation.
func generateResourceStats(eval *structs.Evaluation) BlockedResourcesStats {
dcs := make(map[string]struct{})
classes := make(map[string]struct{})
resources := BlockedResourcesSummary{}

for _, allocMetrics := range eval.FailedTGAllocs {
for dc := range allocMetrics.NodesAvailable {
dcs[dc] = struct{}{}
}

for class := range allocMetrics.ClassExhausted {
classes[class] = struct{}{}
}

for _, r := range allocMetrics.ResourcesExhausted {
resources.CPU += r.CPU
resources.MemoryMB += r.MemoryMB
}
}

byJob := make(map[structs.NamespacedID]BlockedResourcesSummary)
byJob[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = resources

byDatacenter := make(map[string]BlockedResourcesSummary)
for dc := range dcs {
byDatacenter[dc] = resources
}

byNodeClass := make(map[string]BlockedResourcesSummary)
for class := range classes {
byNodeClass[class] = resources
}

return BlockedResourcesStats{
ByJob: byJob,
ByDatacenter: byDatacenter,
ByNodeClass: byNodeClass,
}
}

// BlockedResourcesStats stores resources requested by block evaluations
// split into different dimensions.
type BlockedResourcesStats struct {
ByJob map[structs.NamespacedID]BlockedResourcesSummary
ByDatacenter map[string]BlockedResourcesSummary
ByNodeClass map[string]BlockedResourcesSummary
}

// NewBlockedResourcesStats returns a new BlockedResourcesStats.
func NewBlockedResourcesStats() BlockedResourcesStats {
return BlockedResourcesStats{
ByJob: make(map[structs.NamespacedID]BlockedResourcesSummary),
ByDatacenter: make(map[string]BlockedResourcesSummary),
ByNodeClass: make(map[string]BlockedResourcesSummary),
}
}

// Copy returns a deep copy of the blocked resource stats.
func (b BlockedResourcesStats) Copy() BlockedResourcesStats {
a := NewBlockedResourcesStats()
for k, v := range b.ByJob {
a.ByJob[k] = v
}
for k, v := range b.ByDatacenter {
a.ByDatacenter[k] = v
}
for k, v := range b.ByNodeClass {
a.ByNodeClass[k] = v
}
return a
}

// Add returns a new BlockedResourcesStats with the values set to the current
// resource values plus the input.
func (b BlockedResourcesStats) Add(a BlockedResourcesStats) BlockedResourcesStats {
result := b.Copy()

for k, v := range a.ByJob {
result.ByJob[k] = b.ByJob[k].Add(v)
}
for k, v := range a.ByDatacenter {
result.ByDatacenter[k] = b.ByDatacenter[k].Add(v)
}
for k, v := range a.ByNodeClass {
result.ByNodeClass[k] = b.ByNodeClass[k].Add(v)
}

return result
}

// Subtract returns a new BlockedResourcesStats with the values set to the
// current resource values minus the input.
func (b BlockedResourcesStats) Subtract(a BlockedResourcesStats) BlockedResourcesStats {
result := b.Copy()

for k, v := range a.ByJob {
result.ByJob[k] = b.ByJob[k].Subtract(v)
}
for k, v := range a.ByDatacenter {
result.ByDatacenter[k] = b.ByDatacenter[k].Subtract(v)
}
for k, v := range a.ByNodeClass {
result.ByNodeClass[k] = b.ByNodeClass[k].Subtract(v)
}

return result
}

// BlockedResourcesSummary stores resource values for blocked evals.
type BlockedResourcesSummary struct {
CPU int
MemoryMB int
}

// Add returns a new BlockedResourcesSummary with each resource set to the
// current value plus the input.
func (b BlockedResourcesSummary) Add(a BlockedResourcesSummary) BlockedResourcesSummary {
return BlockedResourcesSummary{
CPU: b.CPU + a.CPU,
MemoryMB: b.MemoryMB + a.MemoryMB,
}
}

// Subtract returns a new BlockedResourcesSummary with each resource set to the
// current value minus the input.
func (b BlockedResourcesSummary) Subtract(a BlockedResourcesSummary) BlockedResourcesSummary {
return BlockedResourcesSummary{
CPU: b.CPU - a.CPU,
MemoryMB: b.MemoryMB - a.MemoryMB,
}
}

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
Expand Down Expand Up @@ -898,20 +723,13 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "memory"}, float32(v.MemoryMB), labels)
}

for k, v := range stats.BlockedResources.ByDatacenter {
labels := []metrics.Label{
{Name: "datacenter", Value: k},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "datacenter", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "datacenter", "memory"}, float32(v.MemoryMB), labels)
}

for k, v := range stats.BlockedResources.ByNodeClass {
for k, v := range stats.BlockedResources.ByNodeInfo {
labels := []metrics.Label{
{Name: "node_class", Value: k},
{Name: "datacenter", Value: k.Datacenter},
{Name: "node_class", Value: k.NodeClass},
}
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "node_class", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "node_class", "memory"}, float32(v.MemoryMB), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "cpu"}, float32(v.CPU), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "memory"}, float32(v.MemoryMB), labels)
}
case <-stopCh:
return
Expand All @@ -928,28 +746,36 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
select {
case <-stopCh:
return
case <-ticker.C:
b.pruneUnblockIndexes()
case t := <-ticker.C:
cutoff := t.UTC().Add(-1 * pruneThreshold)
b.pruneUnblockIndexes(cutoff)
b.stats.prune(cutoff)
}
}
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes() {
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

cutoff := time.Now().UTC().Add(-1 * pruneThreshold)
oldThreshold := b.timetable.NearestIndex(cutoff)

for key, index := range b.unblockIndexes {
if index < oldThreshold {
delete(b.unblockIndexes, key)
}
}
}

// pruneStats is used to prune any zero value stats that are excessively old.
func (b *BlockedEvals) pruneStats(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

b.stats.prune(cutoff)
}
Loading

0 comments on commit e36120c

Please sign in to comment.