From 4fbe182372c2a8e70229178b9e4c27d25c38eccf Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 12:37:41 -0800 Subject: [PATCH 1/2] Add metrics to show allocations on the client This PR adds the following metrics to the client: client.allocations.migrating client.allocations.blocked client.allocations.pending client.allocations.running client.allocations.terminal Also adds some missing fields to the API version of the evaluation. --- api/evaluations.go | 40 ++++++++++++++++++---------------- client/alloc_runner.go | 2 ++ client/client.go | 46 ++++++++++++++++++++++++++++++++++------ nomad/structs/structs.go | 8 +++---- 4 files changed, 68 insertions(+), 28 deletions(-) diff --git a/api/evaluations.go b/api/evaluations.go index 1940e3c30a6b..527f844ead60 100644 --- a/api/evaluations.go +++ b/api/evaluations.go @@ -54,24 +54,28 @@ func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*Allocation // Evaluation is used to serialize an evaluation. type Evaluation struct { - ID string - Priority int - Type string - TriggeredBy string - JobID string - JobModifyIndex uint64 - NodeID string - NodeModifyIndex uint64 - Status string - StatusDescription string - Wait time.Duration - NextEval string - PreviousEval string - BlockedEval string - FailedTGAllocs map[string]*AllocationMetric - QueuedAllocations map[string]int - CreateIndex uint64 - ModifyIndex uint64 + ID string + Priority int + Type string + TriggeredBy string + JobID string + JobModifyIndex uint64 + NodeID string + NodeModifyIndex uint64 + Status string + StatusDescription string + Wait time.Duration + NextEval string + PreviousEval string + BlockedEval string + FailedTGAllocs map[string]*AllocationMetric + ClassEligibility map[string]bool + EscapedComputedClass bool + AnnotatePlan bool + QueuedAllocations map[string]int + SnapshotIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 } // EvalIndexSort is a wrapper to sort evaluations by CreateIndex. diff --git a/client/alloc_runner.go b/client/alloc_runner.go index c3ffabdb0a18..09155b44ed18 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -584,6 +584,8 @@ func (r *AllocRunner) handleDestroy() { r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v", r.alloc.ID, err) } + case <-r.updateCh: + r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID) } } diff --git a/client/client.go b/client/client.go index ada0e57ce4dc..e5329e2a875e 100644 --- a/client/client.go +++ b/client/client.go @@ -310,7 +310,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg go c.run() // Start collecting stats - go c.collectHostStats() + go c.emitStats() c.logger.Printf("[INFO] client: Node ID %q", c.Node().ID) return c, nil @@ -2170,8 +2170,8 @@ func (c *Client) consulReaperImpl() error { return c.consulSyncer.ReapUnmatched(domains) } -// collectHostStats collects host resource usage stats periodically -func (c *Client) collectHostStats() { +// emitStats collects host resource usage stats periodically +func (c *Client) emitStats() { // Start collecting host stats right away and then keep collecting every // collection interval next := time.NewTimer(0) @@ -2188,16 +2188,18 @@ func (c *Client) collectHostStats() { // Publish Node metrics if operator has opted in if c.config.PublishNodeMetrics { - c.emitStats(c.hostStatsCollector.Stats()) + c.emitHostStats(c.hostStatsCollector.Stats()) } + + c.emitClientMetrics() case <-c.shutdownCh: return } } } -// emitStats pushes host resource usage stats to remote metrics collection sinks -func (c *Client) emitStats(hStats *stats.HostStats) { +// emitHostStats pushes host resource usage stats to remote metrics collection sinks +func (c *Client) emitHostStats(hStats *stats.HostStats) { nodeID := c.Node().ID metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total)) metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available)) @@ -2263,6 +2265,38 @@ func (c *Client) emitStats(hStats *stats.HostStats) { } } +// emitClientMetrics emits lower volume client metrics +func (c *Client) emitClientMetrics() { + nodeID := c.Node().ID + + // Emit allocation metrics + c.migratingAllocsLock.Lock() + migrating := len(c.migratingAllocs) + c.migratingAllocsLock.Unlock() + + c.blockedAllocsLock.Lock() + blocked := len(c.blockedAllocations) + c.blockedAllocsLock.Unlock() + + pending, running, terminal := 0, 0, 0 + for _, ar := range c.getAllocRunners() { + switch ar.Alloc().ClientStatus { + case structs.AllocClientStatusPending: + pending++ + case structs.AllocClientStatusRunning: + running++ + case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed: + terminal++ + } + } + + metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating)) + metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked)) + metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending)) + metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running)) + metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal)) +} + func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources { // Unfortunately the allocs only have IP so we need to match them to the // device diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 83f05ae0c28d..4c67f922b98f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3827,15 +3827,15 @@ type Evaluation struct { // during the evaluation. This should not be set during normal operations. AnnotatePlan bool + // QueuedAllocations is the number of unplaced allocations at the time the + // evaluation was processed. The map is keyed by Task Group names. + QueuedAllocations map[string]int + // SnapshotIndex is the Raft index of the snapshot used to process the // evaluation. As such it will only be set once it has gone through the // scheduler. SnapshotIndex uint64 - // QueuedAllocations is the number of unplaced allocations at the time the - // evaluation was processed. The map is keyed by Task Group names. - QueuedAllocations map[string]int - // Raft Indexes CreateIndex uint64 ModifyIndex uint64 From 07973793b816202cc35a75f4e5aafd2889cdca67 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 21:05:34 -0800 Subject: [PATCH 2/2] Address comment --- client/alloc_runner.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 09155b44ed18..43071f47b505 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -574,18 +574,22 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) { // handleDestroy blocks till the AllocRunner should be destroyed and does the // necessary cleanup. func (r *AllocRunner) handleDestroy() { - select { - case <-r.destroyCh: - if err := r.DestroyContext(); err != nil { - r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v", - r.alloc.ID, err) - } - if err := r.DestroyState(); err != nil { - r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v", - r.alloc.ID, err) + for { + select { + case <-r.destroyCh: + if err := r.DestroyContext(); err != nil { + r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v", + r.alloc.ID, err) + } + if err := r.DestroyState(); err != nil { + r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v", + r.alloc.ID, err) + } + + return + case <-r.updateCh: + r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID) } - case <-r.updateCh: - r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID) } }