diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 96073013612e..02ce6c18ff4b 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -727,10 +727,10 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) { metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "job", "memory"}, float32(v.MemoryMB), labels) } - for k, v := range stats.BlockedResources.ByNodeInfo { + for k, v := range stats.BlockedResources.ByNode { labels := []metrics.Label{ - {Name: "datacenter", Value: k.Datacenter}, - {Name: "node_class", Value: k.NodeClass}, + {Name: "datacenter", Value: k.dc}, + {Name: "node_class", Value: k.class}, } metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "cpu"}, float32(v.CPU), labels) metrics.SetGaugeWithLabels([]string{"nomad", "blocked_evals", "memory"}, float32(v.MemoryMB), labels) diff --git a/nomad/blocked_evals_stats.go b/nomad/blocked_evals_stats.go index 171f4f9fdd3f..007559651837 100644 --- a/nomad/blocked_evals_stats.go +++ b/nomad/blocked_evals_stats.go @@ -21,7 +21,13 @@ type BlockedStats struct { // BlockedResources stores the amount of resources requested by blocked // evaluations. - BlockedResources BlockedResourcesStats + BlockedResources *BlockedResourcesStats +} + +// node stores information related to nodes. +type node struct { + dc string + class string } // NewBlockedStats returns a new BlockedStats. @@ -59,16 +65,16 @@ func (b *BlockedStats) prune(cutoff time.Time) { } } - for k, v := range b.BlockedResources.ByNodeInfo { + for k, v := range b.BlockedResources.ByNode { if shouldPrune(v) { - delete(b.BlockedResources.ByNodeInfo, k) + delete(b.BlockedResources.ByNode, k) } } } // generateResourceStats returns a summary of the resources requested by the // input evaluation. -func generateResourceStats(eval *structs.Evaluation) BlockedResourcesStats { +func generateResourceStats(eval *structs.Evaluation) *BlockedResourcesStats { dcs := make(map[string]struct{}) classes := make(map[string]struct{}) @@ -80,11 +86,9 @@ func generateResourceStats(eval *structs.Evaluation) BlockedResourcesStats { for dc := range allocMetrics.NodesAvailable { dcs[dc] = struct{}{} } - for class := range allocMetrics.ClassExhausted { classes[class] = struct{}{} } - for _, r := range allocMetrics.ResourcesExhausted { resources.CPU += r.CPU resources.MemoryMB += r.MemoryMB @@ -92,47 +96,48 @@ func generateResourceStats(eval *structs.Evaluation) BlockedResourcesStats { } byJob := make(map[structs.NamespacedID]BlockedResourcesSummary) - byJob[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = resources + nsID := structs.NewNamespacedID(eval.JobID, eval.Namespace) + byJob[nsID] = resources - byNodeInfo := make(map[NodeInfo]BlockedResourcesSummary) + byNodeInfo := make(map[node]BlockedResourcesSummary) for dc := range dcs { for class := range classes { - k := NodeInfo{dc, class} + k := node{dc: dc, class: class} byNodeInfo[k] = resources } } - return BlockedResourcesStats{ - ByJob: byJob, - ByNodeInfo: byNodeInfo, + return &BlockedResourcesStats{ + ByJob: byJob, + ByNode: byNodeInfo, } } -// BlockedResourcesStats stores resources requested by block evaluations -// split into different dimensions. +// BlockedResourcesStats stores resources requested by blocked evaluations, +// tracked both by job and by node. type BlockedResourcesStats struct { - ByJob map[structs.NamespacedID]BlockedResourcesSummary - ByNodeInfo map[NodeInfo]BlockedResourcesSummary + ByJob map[structs.NamespacedID]BlockedResourcesSummary + ByNode map[node]BlockedResourcesSummary } // NewBlockedResourcesStats returns a new BlockedResourcesStats. -func NewBlockedResourcesStats() BlockedResourcesStats { - return BlockedResourcesStats{ - ByJob: make(map[structs.NamespacedID]BlockedResourcesSummary), - ByNodeInfo: make(map[NodeInfo]BlockedResourcesSummary), +func NewBlockedResourcesStats() *BlockedResourcesStats { + return &BlockedResourcesStats{ + ByJob: make(map[structs.NamespacedID]BlockedResourcesSummary), + ByNode: make(map[node]BlockedResourcesSummary), } } // Copy returns a deep copy of the blocked resource stats. -func (b BlockedResourcesStats) Copy() BlockedResourcesStats { +func (b *BlockedResourcesStats) Copy() *BlockedResourcesStats { result := NewBlockedResourcesStats() for k, v := range b.ByJob { - result.ByJob[k] = v + result.ByJob[k] = v // value copy } - for k, v := range b.ByNodeInfo { - result.ByNodeInfo[k] = v + for k, v := range b.ByNode { + result.ByNode[k] = v // value copy } return result @@ -140,15 +145,15 @@ func (b BlockedResourcesStats) Copy() BlockedResourcesStats { // Add returns a new BlockedResourcesStats with the values set to the current // resource values plus the input. -func (b BlockedResourcesStats) Add(a BlockedResourcesStats) BlockedResourcesStats { +func (b *BlockedResourcesStats) Add(a *BlockedResourcesStats) *BlockedResourcesStats { result := b.Copy() for k, v := range a.ByJob { result.ByJob[k] = b.ByJob[k].Add(v) } - for k, v := range a.ByNodeInfo { - result.ByNodeInfo[k] = b.ByNodeInfo[k].Add(v) + for k, v := range a.ByNode { + result.ByNode[k] = b.ByNode[k].Add(v) } return result @@ -156,26 +161,20 @@ func (b BlockedResourcesStats) Add(a BlockedResourcesStats) BlockedResourcesStat // Subtract returns a new BlockedResourcesStats with the values set to the // current resource values minus the input. -func (b BlockedResourcesStats) Subtract(a BlockedResourcesStats) BlockedResourcesStats { +func (b *BlockedResourcesStats) Subtract(a *BlockedResourcesStats) *BlockedResourcesStats { result := b.Copy() for k, v := range a.ByJob { result.ByJob[k] = b.ByJob[k].Subtract(v) } - for k, v := range a.ByNodeInfo { - result.ByNodeInfo[k] = b.ByNodeInfo[k].Subtract(v) + for k, v := range a.ByNode { + result.ByNode[k] = b.ByNode[k].Subtract(v) } return result } -// NodeInfo stores information related to nodes. -type NodeInfo struct { - Datacenter string - NodeClass string -} - // BlockedResourcesSummary stores resource values for blocked evals. type BlockedResourcesSummary struct { Timestamp time.Time diff --git a/nomad/blocked_evals_stats_test.go b/nomad/blocked_evals_stats_test.go index 7e6fcd52594f..f09f4b9e5984 100644 --- a/nomad/blocked_evals_stats_test.go +++ b/nomad/blocked_evals_stats_test.go @@ -11,8 +11,279 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) +func now(year int) time.Time { + return time.Date(2000+year, 1, 2, 3, 4, 5, 6, time.UTC) +} + +func TestBlockedResourceSummary_Add(t *testing.T) { + now1 := now(1) + now2 := now(2) + a := BlockedResourcesSummary{ + Timestamp: now1, + CPU: 600, + MemoryMB: 256, + } + + b := BlockedResourcesSummary{ + Timestamp: now2, + CPU: 250, + MemoryMB: 128, + } + + result := a.Add(b) + + // a not modified + require.Equal(t, 600, a.CPU) + require.Equal(t, 256, a.MemoryMB) + require.Equal(t, now1, a.Timestamp) + + // b not modified + require.Equal(t, 250, b.CPU) + require.Equal(t, 128, b.MemoryMB) + require.Equal(t, now2, b.Timestamp) + + // result is a + b, using timestamp from b + require.Equal(t, 850, result.CPU) + require.Equal(t, 384, result.MemoryMB) + require.Equal(t, now2, result.Timestamp) +} + +func TestBlockedResourceSummary_Add_nil(t *testing.T) { + now1 := now(1) + b := BlockedResourcesSummary{ + Timestamp: now1, + CPU: 250, + MemoryMB: 128, + } + + // zero + b == b + result := (BlockedResourcesSummary{}).Add(b) + require.Equal(t, now1, result.Timestamp) + require.Equal(t, 250, result.CPU) + require.Equal(t, 128, result.MemoryMB) +} + +func TestBlockedResourceSummary_Subtract(t *testing.T) { + now1 := now(1) + now2 := now(2) + a := BlockedResourcesSummary{ + Timestamp: now1, + CPU: 600, + MemoryMB: 256, + } + + b := BlockedResourcesSummary{ + Timestamp: now2, + CPU: 250, + MemoryMB: 120, + } + + result := a.Subtract(b) + + // a not modified + require.Equal(t, 600, a.CPU) + require.Equal(t, 256, a.MemoryMB) + require.Equal(t, now1, a.Timestamp) + + // b not modified + require.Equal(t, 250, b.CPU) + require.Equal(t, 120, b.MemoryMB) + require.Equal(t, now2, b.Timestamp) + + // result is a + b, using timestamp from b + require.Equal(t, 350, result.CPU) + require.Equal(t, 136, result.MemoryMB) + require.Equal(t, now2, result.Timestamp) +} + +func TestBlockedResourceSummary_IsZero(t *testing.T) { + now1 := now(1) + + // cpu and mem zero, timestamp is ignored + require.True(t, (&BlockedResourcesSummary{ + Timestamp: now1, + CPU: 0, + MemoryMB: 0, + }).IsZero()) + + // cpu non-zero + require.False(t, (&BlockedResourcesSummary{ + Timestamp: now1, + CPU: 1, + MemoryMB: 0, + }).IsZero()) + + // mem non-zero + require.False(t, (&BlockedResourcesSummary{ + Timestamp: now1, + CPU: 0, + MemoryMB: 1, + }).IsZero()) +} + +func TestBlockedResourceStats_New(t *testing.T) { + a := NewBlockedResourcesStats() + require.NotNil(t, a.ByJob) + require.Empty(t, a.ByJob) + require.NotNil(t, a.ByNode) + require.Empty(t, a.ByNode) +} + +var ( + id1 = structs.NamespacedID{ + ID: "1", + Namespace: "one", + } + + id2 = structs.NamespacedID{ + ID: "2", + Namespace: "two", + } + + node1 = node{ + dc: "dc1", + class: "alpha", + } + + node2 = node{ + dc: "dc1", + class: "beta", + } +) + +func TestBlockedResourceStats_Copy(t *testing.T) { + now1 := now(1) + now2 := now(2) + + a := NewBlockedResourcesStats() + a.ByJob = map[structs.NamespacedID]BlockedResourcesSummary{ + id1: { + Timestamp: now1, + CPU: 100, + MemoryMB: 256, + }, + } + a.ByNode = map[node]BlockedResourcesSummary{ + node1: { + Timestamp: now1, + CPU: 300, + MemoryMB: 333, + }, + } + + c := a.Copy() + c.ByJob[id1] = BlockedResourcesSummary{ + Timestamp: now2, + CPU: 888, + MemoryMB: 888, + } + c.ByNode[node1] = BlockedResourcesSummary{ + Timestamp: now2, + CPU: 999, + MemoryMB: 999, + } + + // underlying data should have been deep copied + require.Equal(t, 100, a.ByJob[id1].CPU) + require.Equal(t, 300, a.ByNode[node1].CPU) +} + +func TestBlockedResourcesStats_Add(t *testing.T) { + a := NewBlockedResourcesStats() + a.ByJob = map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(1), CPU: 111, MemoryMB: 222}, + } + a.ByNode = map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(2), CPU: 333, MemoryMB: 444}, + } + + b := NewBlockedResourcesStats() + b.ByJob = map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(3), CPU: 200, MemoryMB: 300}, + id2: {Timestamp: now(4), CPU: 400, MemoryMB: 500}, + } + b.ByNode = map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(5), CPU: 600, MemoryMB: 700}, + node2: {Timestamp: now(6), CPU: 800, MemoryMB: 900}, + } + + t.Run("a add b", func(t *testing.T) { + result := a.Add(b) + + require.Equal(t, map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(3), CPU: 311, MemoryMB: 522}, + id2: {Timestamp: now(4), CPU: 400, MemoryMB: 500}, + }, result.ByJob) + + require.Equal(t, map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(5), CPU: 933, MemoryMB: 1144}, + node2: {Timestamp: now(6), CPU: 800, MemoryMB: 900}, + }, result.ByNode) + }) + + // make sure we handle zeros in both directions + // and timestamps originate from rhs + t.Run("b add a", func(t *testing.T) { + result := b.Add(a) + require.Equal(t, map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(1), CPU: 311, MemoryMB: 522}, + id2: {Timestamp: now(4), CPU: 400, MemoryMB: 500}, + }, result.ByJob) + + require.Equal(t, map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(2), CPU: 933, MemoryMB: 1144}, + node2: {Timestamp: now(6), CPU: 800, MemoryMB: 900}, + }, result.ByNode) + }) +} + +func TestBlockedResourcesStats_Subtract(t *testing.T) { + a := NewBlockedResourcesStats() + a.ByJob = map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(1), CPU: 100, MemoryMB: 100}, + id2: {Timestamp: now(2), CPU: 200, MemoryMB: 200}, + } + a.ByNode = map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(3), CPU: 300, MemoryMB: 300}, + node2: {Timestamp: now(4), CPU: 400, MemoryMB: 400}, + } + + b := NewBlockedResourcesStats() + b.ByJob = map[structs.NamespacedID]BlockedResourcesSummary{ + id1: {Timestamp: now(5), CPU: 10, MemoryMB: 11}, + id2: {Timestamp: now(6), CPU: 12, MemoryMB: 13}, + } + b.ByNode = map[node]BlockedResourcesSummary{ + node1: {Timestamp: now(7), CPU: 14, MemoryMB: 15}, + node2: {Timestamp: now(8), CPU: 16, MemoryMB: 17}, + } + + result := a.Subtract(b) + + // id1 + require.Equal(t, now(5), result.ByJob[id1].Timestamp) + require.Equal(t, 90, result.ByJob[id1].CPU) + require.Equal(t, 89, result.ByJob[id1].MemoryMB) + + // id2 + require.Equal(t, now(6), result.ByJob[id2].Timestamp) + require.Equal(t, 188, result.ByJob[id2].CPU) + require.Equal(t, 187, result.ByJob[id2].MemoryMB) + + // node1 + require.Equal(t, now(7), result.ByNode[node1].Timestamp) + require.Equal(t, 286, result.ByNode[node1].CPU) + require.Equal(t, 285, result.ByNode[node1].MemoryMB) + + // node2 + require.Equal(t, now(8), result.ByNode[node2].Timestamp) + require.Equal(t, 384, result.ByNode[node2].CPU) + require.Equal(t, 383, result.ByNode[node2].MemoryMB) +} + // testBlockedEvalsRandomBlockedEval wraps an eval that is randomly generated. type testBlockedEvalsRandomBlockedEval struct { eval *structs.Evaluation @@ -90,9 +361,9 @@ func clearTimestampFromBlockedResourceStats(b *BlockedResourcesStats) { v.Timestamp = time.Time{} b.ByJob[k] = v } - for k, v := range b.ByNodeInfo { + for k, v := range b.ByNode { v.Timestamp = time.Time{} - b.ByNodeInfo[k] = v + b.ByNode[k] = v } } @@ -103,14 +374,14 @@ func TestBlockedEvalsStats_BlockedResources(t *testing.T) { blocked, _ := testBlockedEvals(t) // evalHistory stores all evals generated during the test. - evalHistory := []*structs.Evaluation{} + var evalHistory []*structs.Evaluation // blockedEvals keeps track if evals are blocked or unblocked. blockedEvals := map[string]bool{} // blockAndUntrack processes the generated evals in order using a // BlockedEvals instance. - blockAndUntrack := func(testEval testBlockedEvalsRandomBlockedEval, block bool, unblockIdx uint16) BlockedResourcesStats { + blockAndUntrack := func(testEval testBlockedEvalsRandomBlockedEval, block bool, unblockIdx uint16) *BlockedResourcesStats { if block || len(evalHistory) == 0 { blocked.Block(testEval.eval) } else { @@ -123,13 +394,13 @@ func TestBlockedEvalsStats_BlockedResources(t *testing.T) { blocked.pruneStats(time.Now().UTC()) result := blocked.Stats().BlockedResources - clearTimestampFromBlockedResourceStats(&result) + clearTimestampFromBlockedResourceStats(result) return result } // manualCount processes only the blocked evals and generate a // BlockedResourcesStats result directly from the eval history. - manualCount := func(testEval testBlockedEvalsRandomBlockedEval, block bool, unblockIdx uint16) BlockedResourcesStats { + manualCount := func(testEval testBlockedEvalsRandomBlockedEval, block bool, unblockIdx uint16) *BlockedResourcesStats { if block || len(evalHistory) == 0 { evalHistory = append(evalHistory, testEval.eval) blockedEvals[testEval.eval.ID] = true @@ -146,7 +417,7 @@ func TestBlockedEvalsStats_BlockedResources(t *testing.T) { } result = result.Add(generateResourceStats(e)) } - clearTimestampFromBlockedResourceStats(&result) + clearTimestampFromBlockedResourceStats(result) return result } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b2b8800d5cd1..29dfdc25bed8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -207,7 +207,7 @@ type RPCInfo interface { IsForwarded() bool SetForwarded() TimeToBlock() time.Duration - // TimeToBlock sets how long this request can block. The requested time may not be possible, + // SetTimeToBlock sets how long this request can block. The requested time may not be possible, // so Callers should readback TimeToBlock. E.g. you cannot set time to block at all on WriteRequests // and it cannot exceed MaxBlockingRPCQueryTime SetTimeToBlock(t time.Duration)