diff --git a/api/allocations.go b/api/allocations.go index 6badd3f69b5c..1da3a856f34e 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -1,8 +1,11 @@ package api import ( + "fmt" "sort" "time" + + "github.com/hashicorp/go-cleanhttp" ) // Allocations is used to query the alloc-related endpoints. @@ -40,6 +43,30 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query return &resp, qm, nil } +func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*TaskResourceUsage, error) { + node, _, err := a.client.Nodes().Info(alloc.NodeID, q) + if err != nil { + return nil, err + } + if node.HTTPAddr == "" { + return nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID) + } + client, err := NewClient(&Config{ + Address: fmt.Sprintf("http://%s", node.HTTPAddr), + HttpClient: cleanhttp.DefaultClient(), + }) + if err != nil { + return nil, err + } + resp := make(map[string][]*TaskResourceUsage) + client.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil) + res := make(map[string]*TaskResourceUsage) + for task, ru := range resp { + res[task] = ru[0] + } + return res, nil +} + // Allocation is used for serialization of allocations. type Allocation struct { ID string diff --git a/api/nodes.go b/api/nodes.go index 7d4e6dba0fb8..5ed88d6b0405 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -1,8 +1,11 @@ package api import ( + "fmt" "sort" "strconv" + + "github.com/hashicorp/go-cleanhttp" ) // Nodes is used to query node-related API endpoints @@ -71,6 +74,29 @@ func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMet return resp.EvalID, wm, nil } +func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) { + node, _, err := n.client.Nodes().Info(nodeID, q) + if err != nil { + return nil, err + } + if node.HTTPAddr == "" { + return nil, fmt.Errorf("http addr of the node %q is running is not advertised", nodeID) + } + client, err := NewClient(&Config{ + Address: fmt.Sprintf("http://%s", node.HTTPAddr), + HttpClient: cleanhttp.DefaultClient(), + }) + if err != nil { + return nil, err + } + var resp []HostStats + if _, err := client.query("/v1/client/stats/", &resp, nil); err != nil { + return nil, err + } + + return &resp[0], nil +} + // Node is used to deserialize a node entry. type Node struct { ID string @@ -90,6 +116,38 @@ type Node struct { ModifyIndex uint64 } +// HostStats represents resource usage stats of the host running a Nomad client +type HostStats struct { + Memory *HostMemoryStats + CPU []*HostCPUStats + DiskStats []*HostDiskStats + Uptime uint64 +} + +type HostMemoryStats struct { + Total uint64 + Available uint64 + Used uint64 + Free uint64 +} + +type HostCPUStats struct { + CPU string + User float64 + System float64 + Idle float64 +} + +type HostDiskStats struct { + Device string + Mountpoint string + Size uint64 + Used uint64 + Available uint64 + UsedPercent float64 + InodesUsedPercent float64 +} + // NodeListStub is a subset of information returned during // node list operations. type NodeListStub struct { diff --git a/api/tasks.go b/api/tasks.go index e0fd67bac4f9..c82db5ba2663 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -4,6 +4,39 @@ import ( "time" ) +// MemoryStats holds memory usage related stats +type MemoryStats struct { + RSS uint64 + Cache uint64 + Swap uint64 + MaxUsage uint64 + KernelUsage uint64 + KernelMaxUsage uint64 +} + +// CpuStats holds cpu usage related stats +type CpuStats struct { + SystemMode float64 + UserMode float64 + ThrottledPeriods uint64 + ThrottledTime uint64 + Percent float64 +} + +// ResourceUsage holds information related to cpu and memory stats +type ResourceUsage struct { + MemoryStats *MemoryStats + CpuStats *CpuStats +} + +// TaskResourceUsage holds aggregated resource usage of all processes in a Task +// and the resource usage of the individual pids +type TaskResourceUsage struct { + ResourceUsage *ResourceUsage + Timestamp int64 + Pids map[string]*ResourceUsage +} + // RestartPolicy defines how the Nomad client restarts // tasks in a taskgroup when they fail type RestartPolicy struct { diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3a0300b11aea..b796ccbe4bb9 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -28,6 +28,11 @@ const ( // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) +// AllocStatsReporter exposes stats related APIs of an allocation runner +type AllocStatsReporter interface { + AllocStats() map[string]TaskStatsReporter +} + // AllocRunner is used to wrap an allocation and provide the execution context. type AllocRunner struct { config *config.Config @@ -471,6 +476,35 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } +// StatsReporter returns an interface to query resource usage statistics of an +// allocation +func (r *AllocRunner) StatsReporter() AllocStatsReporter { + return r +} + +// AllocStats returns the stats reporter of all the tasks running in the +// allocation +func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter { + r.taskLock.RLock() + defer r.taskLock.RUnlock() + res := make(map[string]TaskStatsReporter) + for task, tr := range r.tasks { + res[task] = tr.StatsReporter() + } + return res +} + +// TaskStats returns the stats reporter for a specific task running in the +// allocation +func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) { + tr, ok := r.tasks[task] + if !ok { + return nil, fmt.Errorf("task %q not running in allocation %v", task, r.alloc.ID) + } + + return tr.StatsReporter(), nil +} + // shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and // checks if the current running allocation is behind and should be updated. func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { diff --git a/client/client.go b/client/client.go index 8b753573b732..9a62b209b448 100644 --- a/client/client.go +++ b/client/client.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/hashstructure" @@ -76,11 +77,27 @@ const ( // DefaultConfig returns the default configuration func DefaultConfig() *config.Config { return &config.Config{ - LogOutput: os.Stderr, - Region: "global", + LogOutput: os.Stderr, + Region: "global", + StatsDataPoints: 60, + StatsCollectionInterval: 1 * time.Second, } } +// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad +// Client +type ClientStatsReporter interface { + // AllocStats returns a map of alloc ids and their corresponding stats + // collector + AllocStats() map[string]AllocStatsReporter + + // HostStats returns resource usage stats for the host + HostStats() []*stats.HostStats + + // HostStatsTS returns a time series of host resource usage stats + HostStatsTS(since int64) []*stats.HostStats +} + // Client is used to implement the client interaction with Nomad. Clients // are expected to register as a schedulable node to the servers, and to // run allocations as determined by the servers. @@ -116,6 +133,11 @@ type Client struct { consulService *consul.ConsulService + // HostStatsCollector collects host resource usage stats + hostStatsCollector *stats.HostStatsCollector + resourceUsage *stats.RingBuff + resourceUsageLock sync.RWMutex + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -126,15 +148,22 @@ func NewClient(cfg *config.Config) (*Client, error) { // Create a logger logger := log.New(cfg.LogOutput, "", log.LstdFlags) + resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints) + if err != nil { + return nil, err + } + // Create the client c := &Client{ - config: cfg, - start: time.Now(), - connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), - logger: logger, - allocs: make(map[string]*AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), - shutdownCh: make(chan struct{}), + config: cfg, + start: time.Now(), + connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil), + logger: logger, + hostStatsCollector: stats.NewHostStatsCollector(), + resourceUsage: resourceUsage, + allocs: make(map[string]*AllocRunner), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), } // Initialize the client @@ -189,6 +218,9 @@ func NewClient(cfg *config.Config) (*Client, error) { // Start the client! go c.run() + // Start collecting stats + go c.collectHostStats() + // Start the consul sync go c.syncConsul() @@ -394,6 +426,67 @@ func (c *Client) Node() *structs.Node { return c.config.Node } +// StatsReporter exposes the various APIs related resource usage of a Nomad +// client +func (c *Client) StatsReporter() ClientStatsReporter { + return c +} + +// AllocStats returns all the stats reporter of the allocations running on a +// Nomad client +func (c *Client) AllocStats() map[string]AllocStatsReporter { + res := make(map[string]AllocStatsReporter) + allocRunners := c.getAllocRunners() + for alloc, ar := range allocRunners { + res[alloc] = ar + } + return res +} + +// HostStats returns all the stats related to a Nomad client +func (c *Client) HostStats() []*stats.HostStats { + c.resourceUsageLock.RLock() + defer c.resourceUsageLock.RUnlock() + val := c.resourceUsage.Peek() + ru, _ := val.(*stats.HostStats) + return []*stats.HostStats{ru} +} + +func (c *Client) HostStatsTS(since int64) []*stats.HostStats { + c.resourceUsageLock.RLock() + defer c.resourceUsageLock.RUnlock() + + values := c.resourceUsage.Values() + low := 0 + high := len(values) - 1 + var idx int + + for { + mid := (low + high) >> 1 + midVal, _ := values[mid].(*stats.HostStats) + if midVal.Timestamp < since { + low = mid + 1 + } else if midVal.Timestamp > since { + high = mid - 1 + } else if midVal.Timestamp == since { + idx = mid + break + } + if low > high { + idx = low + break + } + } + values = values[idx:] + ts := make([]*stats.HostStats, len(values)) + for index, val := range values { + ru, _ := val.(*stats.HostStats) + ts[index] = ru + } + return ts + +} + // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { ar, ok := c.allocs[allocID] @@ -1227,3 +1320,27 @@ func (c *Client) syncConsul() { } } + +// collectHostStats collects host resource usage stats periodically +func (c *Client) collectHostStats() { + // Start collecting host stats right away and then keep collecting every + // collection interval + next := time.NewTimer(0) + defer next.Stop() + for { + select { + case <-next.C: + ru, err := c.hostStatsCollector.Collect() + if err != nil { + c.logger.Printf("[DEBUG] client: error fetching host resource usage stats: %v", err) + continue + } + c.resourceUsageLock.RLock() + c.resourceUsage.Enqueue(ru) + c.resourceUsageLock.RUnlock() + next.Reset(c.config.StatsCollectionInterval) + case <-c.shutdownCh: + return + } + } +} diff --git a/client/config/config.go b/client/config/config.go index e3e54547cc8e..4d606ba2439b 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -112,6 +112,14 @@ type Config struct { // ConsulConfig is the configuration to connect with Consul Agent ConsulConfig *consul.ConsulConfig + + // StatsDataPoints is the number of resource usage data points the Nomad + // client keeps in memory + StatsDataPoints int + + // StatsCollectionInterval is the interval at which the Nomad client + // collects resource usage stats + StatsCollectionInterval time.Duration } func (c *Config) Copy() *Config { diff --git a/client/driver/docker.go b/client/driver/docker.go index b101dbcbd5dd..0cac50480284 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -113,18 +113,20 @@ type dockerPID struct { } type DockerHandle struct { - pluginClient *plugin.Client - executor executor.Executor - client *docker.Client - logger *log.Logger - cleanupImage bool - imageID string - containerID string - version string - killTimeout time.Duration - maxKillTimeout time.Duration - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executor executor.Executor + client *docker.Client + logger *log.Logger + cleanupImage bool + imageID string + containerID string + version string + killTimeout time.Duration + maxKillTimeout time.Duration + resourceUsageLock sync.RWMutex + resourceUsage *cstructs.TaskResourceUsage + waitCh chan *cstructs.WaitResult + doneCh chan bool } func NewDockerDriver(ctx *DriverContext) Driver { @@ -768,12 +770,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), maxKillTimeout: maxKill, - doneCh: make(chan struct{}), + doneCh: make(chan bool), waitCh: make(chan *cstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil { d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err) } + go h.collectStats() go h.run() return h, nil } @@ -841,7 +844,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er version: pid.Version, killTimeout: pid.KillTimeout, maxKillTimeout: pid.MaxKillTimeout, - doneCh: make(chan struct{}), + doneCh: make(chan bool), waitCh: make(chan *cstructs.WaitResult, 1), } if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil { @@ -908,6 +911,12 @@ func (h *DockerHandle) Kill() error { return nil } +func (h *DockerHandle) Stats() (*cstructs.TaskResourceUsage, error) { + h.resourceUsageLock.RLock() + defer h.resourceUsageLock.RUnlock() + return h.resourceUsage, nil +} + func (h *DockerHandle) run() { // Wait for it... exitCode, err := h.client.WaitContainer(h.containerID) @@ -956,3 +965,53 @@ func (h *DockerHandle) run() { } } } + +// collectStats starts collecting resource usage stats of a docker container +func (h *DockerHandle) collectStats() { + statsCh := make(chan *docker.Stats) + statsOpts := docker.StatsOptions{ID: h.containerID, Done: h.doneCh, Stats: statsCh, Stream: true} + go func() { + //TODO handle Stats error + if err := h.client.Stats(statsOpts); err != nil { + h.logger.Printf("[DEBUG] driver.docker: error collecting stats from container %s: %v", h.containerID, err) + } + }() + for { + select { + case s := <-statsCh: + if s != nil { + ms := &cstructs.MemoryStats{ + RSS: s.MemoryStats.Stats.Rss, + Cache: s.MemoryStats.Stats.Cache, + Swap: s.MemoryStats.Stats.Swap, + MaxUsage: s.MemoryStats.MaxUsage, + } + + cs := &cstructs.CpuStats{ + SystemMode: float64(s.CPUStats.CPUUsage.UsageInKernelmode), + UserMode: float64(s.CPUStats.CPUUsage.UsageInKernelmode), + ThrottledPeriods: s.CPUStats.ThrottlingData.ThrottledPeriods, + ThrottledTime: s.CPUStats.ThrottlingData.ThrottledTime, + } + // Calculate percentage + cs.Percent = 0.0 + cpuDelta := float64(s.CPUStats.CPUUsage.TotalUsage) - float64(s.PreCPUStats.CPUUsage.TotalUsage) + systemDelta := float64(s.CPUStats.SystemCPUUsage) - float64(s.PreCPUStats.SystemCPUUsage) + if cpuDelta > 0.0 && systemDelta > 0.0 { + cs.Percent = (cpuDelta / systemDelta) * float64(len(s.CPUStats.CPUUsage.PercpuUsage)) * 100.0 + } + h.resourceUsageLock.Lock() + h.resourceUsage = &cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: ms, + CpuStats: cs, + }, + Timestamp: s.Read.UTC().UnixNano(), + } + h.resourceUsageLock.Unlock() + } + case <-h.doneCh: + return + } + } +} diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 3fb972bdcbe6..ab97fc9cd33b 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -143,7 +143,7 @@ func TestDockerDriver_Handle(t *testing.T) { containerID: "containerid", killTimeout: 5 * time.Nanosecond, maxKillTimeout: 15 * time.Nanosecond, - doneCh: make(chan struct{}), + doneCh: make(chan bool), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -816,3 +816,49 @@ func TestDockerDriver_CleanupContainer(t *testing.T) { } } + +func TestDockerDriver_Stats(t *testing.T) { + t.Parallel() + task := &structs.Task{ + Name: "sleep", + Config: map[string]interface{}{ + "image": "busybox", + "command": "/bin/sleep", + "args": []string{"100"}, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + Resources: basicResources, + } + + _, handle, cleanup := dockerSetup(t, task) + defer cleanup() + + go func() { + time.Sleep(3 * time.Second) + ru, err := handle.Stats() + if err != nil { + t.Fatalf("err: %v", err) + } + if ru.ResourceUsage == nil { + handle.Kill() + t.Fatalf("expected resource usage") + } + err = handle.Kill() + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + select { + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatalf("should err: %v", res) + } + case <-time.After(time.Duration(tu.TestMultiplier()*10) * time.Second): + t.Fatalf("timeout") + } + +} diff --git a/client/driver/driver.go b/client/driver/driver.go index 68b3b46b9faf..9e8513073fd5 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -113,6 +113,9 @@ type DriverHandle interface { // Kill is used to stop the task Kill() error + + // Stats returns aggregated stats of the driver + Stats() (*cstructs.TaskResourceUsage, error) } // ExecContext is shared between drivers within an allocation diff --git a/client/driver/exec.go b/client/driver/exec.go index e5264c6fcd18..77456b028a5c 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -280,6 +280,10 @@ func (h *execHandle) Kill() error { } } +func (h *execHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *execHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index c95437dcac75..604ee50bd6d2 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -9,22 +9,33 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "sync" "syscall" "time" "github.com/hashicorp/go-multierror" + "github.com/mitchellh/go-ps" cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + "github.com/shirou/gopsutil/process" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/client/driver/logging" cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // pidScanInterval is the interval at which the executor scans the process + // tree for finding out the pids that the executor and it's child processes + // have forked + pidScanInterval = 5 * time.Second +) + // Executor is the interface which allows a driver to launch and supervise // a process type Executor interface { @@ -38,6 +49,7 @@ type Executor interface { SyncServices(ctx *ConsulContext) error DeregisterServices() error Version() (*ExecutorVersion, error) + Stats() (*cstructs.TaskResourceUsage, error) } // ConsulContext holds context to configure the consul client and run checks @@ -121,6 +133,12 @@ type ProcessState struct { Time time.Time } +// nomadPid holds a pid and it's cpu percentage calculator +type nomadPid struct { + pid int + cpuStats *stats.CpuStats +} + // SyslogServerState holds the address and islation information of a launched // syslog server type SyslogServerState struct { @@ -145,6 +163,8 @@ type UniversalExecutor struct { ctx *ExecutorContext command *ExecCommand + pids []*nomadPid + pidLock sync.RWMutex taskDir string exitState *ProcessState processExited chan interface{} @@ -162,15 +182,19 @@ type UniversalExecutor struct { consulService *consul.ConsulService consulCtx *ConsulContext + cpuStats *stats.CpuStats logger *log.Logger } // NewExecutor returns an Executor func NewExecutor(logger *log.Logger) Executor { - return &UniversalExecutor{ + exec := &UniversalExecutor{ logger: logger, processExited: make(chan interface{}), + cpuStats: stats.NewCpuStats(), } + + return exec } // Version returns the api version of the executor @@ -250,6 +274,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext if err := e.cmd.Start(); err != nil { return nil, err } + go e.collectPids() go e.wait() ic := &cstructs.IsolationConfig{Cgroup: e.groups, CgroupPaths: e.cgPaths} return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil @@ -427,6 +452,8 @@ func (e *UniversalExecutor) ShutDown() error { return nil } +// SyncServices syncs the services of the task that the executor is running with +// Consul func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.logger.Printf("[INFO] executor: registering services") e.consulCtx = ctx @@ -448,6 +475,8 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { return err } +// DeregisterServices removes the services of the task that the executor is +// running from Consul func (e *UniversalExecutor) DeregisterServices() error { e.logger.Printf("[INFO] executor: de-registering services and shutting down consul service") if e.consulService != nil { @@ -456,6 +485,39 @@ func (e *UniversalExecutor) DeregisterServices() error { return nil } +// pidStats returns the resource usage stats per pid +func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, error) { + stats := make(map[string]*cstructs.ResourceUsage) + e.pidLock.RLock() + pids := make([]*nomadPid, len(e.pids)) + copy(pids, e.pids) + e.pidLock.RUnlock() + for _, pid := range e.pids { + p, err := process.NewProcess(int32(pid.pid)) + if err != nil { + e.logger.Printf("[DEBUG] executor: unable to create new process with pid: %v", pid.pid) + continue + } + ms := &cstructs.MemoryStats{} + if memInfo, err := p.MemoryInfo(); err == nil { + ms.RSS = memInfo.RSS + ms.Swap = memInfo.Swap + } + + cs := &cstructs.CpuStats{} + if cpuStats, err := p.Times(); err == nil { + cs.SystemMode = cpuStats.System + cs.UserMode = cpuStats.User + + // calculate cpu usage percent + cs.Percent = pid.cpuStats.Percent(cpuStats.Total()) + } + stats[strconv.Itoa(pid.pid)] = &cstructs.ResourceUsage{MemoryStats: ms, CpuStats: cs} + } + + return stats, nil +} + // configureTaskDir sets the task dir in the executor func (e *UniversalExecutor) configureTaskDir() error { taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name] @@ -618,3 +680,106 @@ func (e *UniversalExecutor) interpolateServices(task *structs.Task) { service.Tags = e.ctx.TaskEnv.ParseAndReplace(service.Tags) } } + +// collectPids collects the pids of the child processes that the executor is +// running every 5 seconds +func (e *UniversalExecutor) collectPids() { + // Fire the timer right away when the executor starts from there on the pids + // are collected every scan interval + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + pids, err := e.getAllPids() + if err != nil { + e.logger.Printf("[DEBUG] executor: error collecting pids: %v", err) + } + e.pidLock.Lock() + e.pids = pids + e.pidLock.Unlock() + timer.Reset(pidScanInterval) + case <-e.processExited: + return + } + } +} + +// scanPids scans all the pids on the machine running the current executor and +// returns the child processes of the executor. +func (e *UniversalExecutor) scanPids(parentPid int, allPids []ps.Process) ([]*nomadPid, error) { + processFamily := make(map[int]struct{}) + processFamily[parentPid] = struct{}{} + + // A buffer for holding pids which haven't matched with any parent pid + var pidsRemaining []ps.Process + for { + // flag to indicate if we have found a match + foundNewPid := false + + for _, pid := range allPids { + _, childPid := processFamily[pid.PPid()] + + // checking if the pid is a child of any of the parents + if childPid { + processFamily[pid.Pid()] = struct{}{} + foundNewPid = true + } else { + // if it is not, then we add the pid to the buffer + pidsRemaining = append(pidsRemaining, pid) + } + // scan only the pids which are left in the buffer + allPids = pidsRemaining + } + + // not scanning anymore if we couldn't find a single match + if !foundNewPid { + break + } + } + res := make([]*nomadPid, 0, len(processFamily)) + for pid := range processFamily { + res = append(res, &nomadPid{pid, stats.NewCpuStats()}) + } + return res, nil +} + +// aggregatedResourceUsage aggregates the resource usage of all the pids and +// returns a TaskResourceUsage data point +func (e *UniversalExecutor) aggregatedResourceUsage(pidStats map[string]*cstructs.ResourceUsage) *cstructs.TaskResourceUsage { + ts := time.Now().UTC().UnixNano() + var ( + systemModeCPU, userModeCPU, percent float64 + totalRSS, totalSwap uint64 + ) + + for _, pidStat := range pidStats { + systemModeCPU += pidStat.CpuStats.SystemMode + userModeCPU += pidStat.CpuStats.UserMode + percent += pidStat.CpuStats.Percent + + totalRSS += pidStat.MemoryStats.RSS + totalSwap += pidStat.MemoryStats.Swap + } + + totalCPU := &cstructs.CpuStats{ + SystemMode: systemModeCPU, + UserMode: userModeCPU, + Percent: percent, + } + + totalMemory := &cstructs.MemoryStats{ + RSS: totalRSS, + Swap: totalSwap, + } + + resourceUsage := cstructs.ResourceUsage{ + MemoryStats: totalMemory, + CpuStats: totalCPU, + } + return &cstructs.TaskResourceUsage{ + ResourceUsage: &resourceUsage, + Timestamp: ts, + Pids: pidStats, + } +} diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index db2479d82cbb..d66cfb078ac2 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -2,7 +2,14 @@ package executor -import cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +import ( + "os" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" + + "github.com/mitchellh/go-ps" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) func (e *UniversalExecutor) configureChroot() error { return nil @@ -27,3 +34,19 @@ func (e *UniversalExecutor) applyLimits(pid int) error { func (e *UniversalExecutor) configureIsolation() error { return nil } + +func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { + pidStats, err := e.pidStats() + if err != nil { + return nil, err + } + return e.aggregatedResourceUsage(pidStats), nil +} + +func (e *UniversalExecutor) getAllPids() ([]*nomadPid, error) { + allProcesses, err := ps.Processes() + if err != nil { + return nil, err + } + return e.scanPids(os.Getpid(), allProcesses) +} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index c7f3b5b7dd8c..45f52cb383b9 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -8,17 +8,23 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/hashicorp/go-multierror" + "github.com/mitchellh/go-ps" "github.com/opencontainers/runc/libcontainer/cgroups" cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/system" "github.com/hashicorp/nomad/client/allocdir" + cstructs "github.com/hashicorp/nomad/client/driver/structs" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) var ( + // A mapping of directories on the host OS to attempt to embed inside each // task's chroot. chrootEnv = map[string]string{ @@ -31,6 +37,10 @@ var ( "/sbin": "/sbin", "/usr": "/usr", } + + clockTicks = uint64(system.GetClockTicks()) + + nanosecondsInSecond = uint64(1000000000) ) // configureIsolation configures chroot and creates cgroups @@ -116,6 +126,68 @@ func (e *UniversalExecutor) configureCgroups(resources *structs.Resources) error return nil } +// Stats reports the resource utilization of the cgroup. If there is no resource +// isolation we aggregate the resource utilization of all the pids launched by +// the executor. +func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { + if !e.command.ResourceLimits { + pidStats, err := e.pidStats() + if err != nil { + return nil, err + } + return e.aggregatedResourceUsage(pidStats), nil + } + ts := time.Now() + manager := getCgroupManager(e.groups, e.cgPaths) + stats, err := manager.GetStats() + if err != nil { + return nil, err + } + + // Memory Related Stats + swap := stats.MemoryStats.SwapUsage + maxUsage := stats.MemoryStats.Usage.MaxUsage + rss := stats.MemoryStats.Stats["rss"] + cache := stats.MemoryStats.Stats["cache"] + ms := &cstructs.MemoryStats{ + RSS: rss, + Cache: cache, + Swap: swap.Usage, + MaxUsage: maxUsage, + KernelUsage: stats.MemoryStats.KernelUsage.Usage, + KernelMaxUsage: stats.MemoryStats.KernelUsage.MaxUsage, + } + + // CPU Related Stats + totalProcessCPUUsage := stats.CpuStats.CpuUsage.TotalUsage + userModeTime := stats.CpuStats.CpuUsage.UsageInUsermode + kernelModeTime := stats.CpuStats.CpuUsage.UsageInKernelmode + + umTicks := (userModeTime * clockTicks) / nanosecondsInSecond + kmTicks := (kernelModeTime * clockTicks) / nanosecondsInSecond + + cs := &cstructs.CpuStats{ + SystemMode: float64(kmTicks), + UserMode: float64(umTicks), + ThrottledPeriods: stats.CpuStats.ThrottlingData.ThrottledPeriods, + ThrottledTime: stats.CpuStats.ThrottlingData.ThrottledTime, + } + if e.cpuStats != nil { + cs.Percent = e.cpuStats.Percent(float64(totalProcessCPUUsage / nanosecondsInSecond)) + } + taskResUsage := cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: ms, + CpuStats: cs, + }, + Timestamp: ts.UTC().UnixNano(), + } + if pidStats, err := e.pidStats(); err == nil { + taskResUsage.Pids = pidStats + } + return &taskResUsage, nil +} + // runAs takes a user id as a string and looks up the user, and sets the command // to execute as that user. func (e *UniversalExecutor) runAs(userid string) error { @@ -186,6 +258,30 @@ func (e *UniversalExecutor) removeChrootMounts() error { return e.ctx.AllocDir.UnmountAll() } +// getAllPids returns the pids of all the processes spun up by the executor. We +// use the libcontainer apis to get the pids when the user is using cgroup +// isolation and we scan the entire process table if the user is not using any +// isolation +func (e *UniversalExecutor) getAllPids() ([]*nomadPid, error) { + if e.command.ResourceLimits { + manager := getCgroupManager(e.groups, e.cgPaths) + pids, err := manager.GetAllPids() + if err != nil { + return nil, err + } + np := make([]*nomadPid, len(pids)) + for idx, pid := range pids { + np[idx] = &nomadPid{pid, stats.NewCpuStats()} + } + return np, nil + } + allProcesses, err := ps.Processes() + if err != nil { + return nil, err + } + return e.scanPids(os.Getpid(), allProcesses) +} + // destroyCgroup kills all processes in the cgroup and removes the cgroup // configuration from the host. This function is idempotent. func DestroyCgroup(groups *cgroupConfig.Cgroup, cgPaths map[string]string, executorPid int) error { diff --git a/client/driver/executor/executor_test.go b/client/driver/executor/executor_test.go index 92b26510d680..de479970e5db 100644 --- a/client/driver/executor/executor_test.go +++ b/client/driver/executor/executor_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/mitchellh/go-ps" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/driver/structs" @@ -131,7 +133,14 @@ func TestExecutor_WaitExitSignal(t *testing.T) { } go func() { - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) + ru, err := executor.Stats() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(ru.Pids) != 2 { + t.Fatalf("expected number of pids: 2, actual: %v", len(ru.Pids)) + } proc, err := os.FindProcess(ps.Pid) if err != nil { t.Fatalf("err: %v", err) @@ -343,3 +352,45 @@ func TestExecutorInterpolateServices(t *testing.T) { t.Fatalf("expected: %v, actual: %v", expectedCheckArgs, task.Services[0].Checks[0].Args) } } + +func TestScanPids(t *testing.T) { + p1 := NewFakeProcess(2, 5) + p2 := NewFakeProcess(10, 2) + p3 := NewFakeProcess(15, 6) + p4 := NewFakeProcess(3, 10) + p5 := NewFakeProcess(20, 18) + + // Make a fake exececutor + ctx := testExecutorContext(t) + defer ctx.AllocDir.Destroy() + executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags)).(*UniversalExecutor) + + nomadPids, err := executor.scanPids(5, []ps.Process{p1, p2, p3, p4, p5}) + if err != nil { + t.Fatalf("error: %v", err) + } + if len(nomadPids) != 4 { + t.Fatalf("expected: 4, actual: %v", len(nomadPids)) + } +} + +type FakeProcess struct { + pid int + ppid int +} + +func (f FakeProcess) Pid() int { + return f.pid +} + +func (f FakeProcess) PPid() int { + return f.ppid +} + +func (f FakeProcess) Executable() string { + return "fake" +} + +func NewFakeProcess(pid int, ppid int) ps.Process { + return FakeProcess{pid: pid, ppid: ppid} +} diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index decb359e3c07..a4edf2da70ee 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/driver/executor" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -88,6 +89,12 @@ func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) { return &version, err } +func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) { + var resourceUsage cstructs.TaskResourceUsage + err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage) + return &resourceUsage, err +} + type ExecutorRPCServer struct { Impl executor.Executor logger *log.Logger @@ -149,6 +156,14 @@ func (e *ExecutorRPCServer) Version(args interface{}, version *executor.Executor return err } +func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error { + ru, err := e.Impl.Stats() + if ru != nil { + *resourceUsage = *ru + } + return err +} + type ExecutorPlugin struct { logger *log.Logger Impl *ExecutorRPCServer diff --git a/client/driver/java.go b/client/driver/java.go index acd2289a469b..f18bdf8425c1 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -380,6 +380,10 @@ func (h *javaHandle) Kill() error { } } +func (h *javaHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *javaHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/qemu.go b/client/driver/qemu.go index aad12f77fa55..719ce41e7dd7 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -375,6 +375,10 @@ func (h *qemuHandle) Kill() error { } } +func (h *qemuHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *qemuHandle) run() { ps, err := h.executor.Wait() if ps.ExitCode == 0 && err != nil { diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 8a71a73c48c4..2aa0695ccde6 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -277,6 +277,10 @@ func (h *rawExecHandle) Kill() error { } } +func (h *rawExecHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return h.executor.Stats() +} + func (h *rawExecHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 7aa6f8d001c1..137c643d7622 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -399,6 +399,10 @@ func (h *rktHandle) Kill() error { } } +func (h *rktHandle) Stats() (*cstructs.TaskResourceUsage, error) { + return nil, fmt.Errorf("stats not implemented for rkt") +} + func (h *rktHandle) run() { ps, err := h.executor.Wait() close(h.doneCh) diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index 9059df6ecd52..d089b9813940 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -84,3 +84,36 @@ type CheckResult struct { // Err is the error that a check returned Err error } + +// MemoryStats holds memory usage related stats +type MemoryStats struct { + RSS uint64 + Cache uint64 + Swap uint64 + MaxUsage uint64 + KernelUsage uint64 + KernelMaxUsage uint64 +} + +// CpuStats holds cpu usage related stats +type CpuStats struct { + SystemMode float64 + UserMode float64 + ThrottledPeriods uint64 + ThrottledTime uint64 + Percent float64 +} + +// ResourceUsage holds information related to cpu and memory stats +type ResourceUsage struct { + MemoryStats *MemoryStats + CpuStats *CpuStats +} + +// TaskResourceUsage holds aggregated resource usage of all processes in a Task +// and the resource usage of the individual pids +type TaskResourceUsage struct { + ResourceUsage *ResourceUsage + Timestamp int64 + Pids map[string]*ResourceUsage +} diff --git a/client/fingerprint/cpu.go b/client/fingerprint/cpu.go index bcd09f507df3..57c59a6f93ce 100644 --- a/client/fingerprint/cpu.go +++ b/client/fingerprint/cpu.go @@ -22,7 +22,7 @@ func NewCPUFingerprint(logger *log.Logger) Fingerprint { } func (f *CPUFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { - cpuInfo, err := cpu.CPUInfo() + cpuInfo, err := cpu.Info() if err != nil { f.logger.Println("[WARN] Error reading CPU information:", err) return false, err diff --git a/client/fingerprint/host.go b/client/fingerprint/host.go index 87acac63c977..d735c05cb31f 100644 --- a/client/fingerprint/host.go +++ b/client/fingerprint/host.go @@ -25,7 +25,7 @@ func NewHostFingerprint(logger *log.Logger) Fingerprint { } func (f *HostFingerprint) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) { - hostInfo, err := host.HostInfo() + hostInfo, err := host.Info() if err != nil { f.logger.Println("[WARN] Error retrieving host information: ", err) return false, err diff --git a/client/stats/cpu.go b/client/stats/cpu.go new file mode 100644 index 000000000000..ca92c0e73275 --- /dev/null +++ b/client/stats/cpu.go @@ -0,0 +1,49 @@ +package stats + +import ( + "runtime" + "time" +) + +// CpuStats calculates cpu usage percentage +type CpuStats struct { + prevProcessUsage float64 + prevTime time.Time + + totalCpus int +} + +// NewCpuStats returns a cpu stats calculator +func NewCpuStats() *CpuStats { + numCpus := runtime.NumCPU() + return &CpuStats{totalCpus: numCpus} +} + +// Percent calculates the cpu usage percentage based on the current cpu usage +// and the previous cpu usage +func (c *CpuStats) Percent(currentProcessUsage float64) float64 { + now := time.Now() + + if c.prevProcessUsage == 0.0 { + // invoked first time + c.prevProcessUsage = currentProcessUsage + c.prevTime = now + return 0.0 + } + + delta := (now.Sub(c.prevTime).Seconds()) * float64(c.totalCpus) + ret := c.calculatePercent(c.prevProcessUsage, currentProcessUsage, delta) + c.prevProcessUsage = currentProcessUsage + c.prevTime = now + return ret + +} + +func (c *CpuStats) calculatePercent(t1, t2 float64, delta float64) float64 { + if delta == 0 { + return 0 + } + delta_proc := t2 - t1 + overall_percent := ((delta_proc / delta) * 100) * float64(c.totalCpus) + return overall_percent +} diff --git a/client/stats/cpu_test.go b/client/stats/cpu_test.go new file mode 100644 index 000000000000..0e13cdbf5dac --- /dev/null +++ b/client/stats/cpu_test.go @@ -0,0 +1,17 @@ +package stats + +import ( + "testing" + "time" +) + +func TestCpuStatsPercent(t *testing.T) { + cs := NewCpuStats() + cs.Percent(79.7) + time.Sleep(1 * time.Second) + percent := cs.Percent(80.69) + expectedPercent := 98.00 + if percent < expectedPercent && percent > (expectedPercent+1.00) { + t.Fatalf("expected: %v, actual: %v", expectedPercent, percent) + } +} diff --git a/client/stats/host.go b/client/stats/host.go new file mode 100644 index 000000000000..a14126f35aec --- /dev/null +++ b/client/stats/host.go @@ -0,0 +1,159 @@ +package stats + +import ( + "time" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/host" + "github.com/shirou/gopsutil/mem" +) + +// HostStats represents resource usage stats of the host running a Nomad client +type HostStats struct { + Memory *MemoryStats + CPU []*CPUStats + DiskStats []*DiskStats + Uptime uint64 + Timestamp int64 +} + +// MemoryStats represnts stats related to virtual memory usage +type MemoryStats struct { + Total uint64 + Available uint64 + Used uint64 + Free uint64 +} + +// CPUStats represents stats related to cpu usage +type CPUStats struct { + CPU string + User float64 + System float64 + Idle float64 + Total float64 +} + +// DiskStats represents stats related to disk usage +type DiskStats struct { + Device string + Mountpoint string + Size uint64 + Used uint64 + Available uint64 + UsedPercent float64 + InodesUsedPercent float64 +} + +// HostStatsCollector collects host resource usage stats +type HostStatsCollector struct { + statsCalculator map[string]*HostCpuStatsCalculator +} + +// NewHostStatsCollector returns a HostStatsCollector +func NewHostStatsCollector() *HostStatsCollector { + statsCalculator := make(map[string]*HostCpuStatsCalculator) + return &HostStatsCollector{statsCalculator: statsCalculator} +} + +// Collect collects stats related to resource usage of a host +func (h *HostStatsCollector) Collect() (*HostStats, error) { + hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()} + if memStats, err := mem.VirtualMemory(); err == nil { + ms := &MemoryStats{ + Total: memStats.Total, + Available: memStats.Available, + Used: memStats.Used, + Free: memStats.Free, + } + hs.Memory = ms + } + + if cpuStats, err := cpu.Times(true); err == nil { + cs := make([]*CPUStats, len(cpuStats)) + for idx, cpuStat := range cpuStats { + cs[idx] = &CPUStats{ + CPU: cpuStat.CPU, + User: cpuStat.User, + System: cpuStat.System, + Idle: cpuStat.Idle, + } + percentCalculator, ok := h.statsCalculator[cpuStat.CPU] + if !ok { + percentCalculator = NewHostCpuStatsCalculator() + h.statsCalculator[cpuStat.CPU] = percentCalculator + } + idle, user, system, total := percentCalculator.Calculate(cpuStat) + cs[idx].Idle = idle + cs[idx].System = system + cs[idx].User = user + cs[idx].Total = total + } + hs.CPU = cs + } + + if partitions, err := disk.Partitions(false); err == nil { + var diskStats []*DiskStats + for _, partition := range partitions { + if usage, err := disk.Usage(partition.Mountpoint); err == nil { + ds := DiskStats{ + Device: partition.Device, + Mountpoint: partition.Mountpoint, + Size: usage.Total, + Used: usage.Used, + Available: usage.Free, + UsedPercent: usage.UsedPercent, + InodesUsedPercent: usage.InodesUsedPercent, + } + diskStats = append(diskStats, &ds) + } + } + hs.DiskStats = diskStats + } + + if uptime, err := host.Uptime(); err == nil { + hs.Uptime = uptime + } + return hs, nil +} + +// HostCpuStatsCalculator calculates cpu usage percentages +type HostCpuStatsCalculator struct { + prevIdle float64 + prevUser float64 + prevSystem float64 + prevBusy float64 + prevTotal float64 +} + +// NewHostCpuStatsCalculator returns a HostCpuStatsCalculator +func NewHostCpuStatsCalculator() *HostCpuStatsCalculator { + return &HostCpuStatsCalculator{} +} + +// Calculate calculates the current cpu usage percentages +func (h *HostCpuStatsCalculator) Calculate(times cpu.TimesStat) (idle float64, user float64, system float64, total float64) { + currentIdle := times.Idle + currentUser := times.User + currentSystem := times.System + currentTotal := times.Total() + + deltaTotal := currentTotal - h.prevTotal + idle = ((currentIdle - h.prevIdle) / deltaTotal) * 100 + user = ((currentUser - h.prevUser) / deltaTotal) * 100 + system = ((currentSystem - h.prevSystem) / deltaTotal) * 100 + + currentBusy := times.User + times.System + times.Nice + times.Iowait + times.Irq + + times.Softirq + times.Steal + times.Guest + times.GuestNice + times.Stolen + + total = ((currentBusy - h.prevBusy) / deltaTotal) * 100 + + h.prevIdle = currentIdle + h.prevUser = currentUser + h.prevSystem = currentSystem + h.prevTotal = currentTotal + h.prevBusy = currentBusy + + return +} diff --git a/client/stats/ring.go b/client/stats/ring.go new file mode 100644 index 000000000000..8ee8ac679f64 --- /dev/null +++ b/client/stats/ring.go @@ -0,0 +1,67 @@ +package stats + +import ( + "fmt" + "sync" +) + +var ( + // The default size of the ring buffer + defaultCap = 60 +) + +// RingBuff is a data structure which is a circular list based on slices +type RingBuff struct { + head int + buff []interface{} + + lock sync.RWMutex +} + +// NewRingBuff creates a new ring buffer of the specified size +func NewRingBuff(capacity int) (*RingBuff, error) { + if capacity < 1 { + return nil, fmt.Errorf("can not create a ring buffer with capacity: %v", capacity) + } + return &RingBuff{buff: make([]interface{}, 0, capacity), head: -1}, nil +} + +// Enqueue queues a new value in the ring buffer. This operation would +// over-write an older value if the list has reached it's capacity +func (r *RingBuff) Enqueue(value interface{}) { + r.lock.Lock() + defer r.lock.Unlock() + if len(r.buff) < cap(r.buff) { + r.buff = append(r.buff, struct{}{}) + } + r.head += 1 + if r.head == cap(r.buff) { + r.head = 0 + } + r.buff[r.head] = value +} + +// Peek returns the last value enqueued in the ring buffer +func (r *RingBuff) Peek() interface{} { + r.lock.RLock() + defer r.lock.RUnlock() + if r.head == -1 { + return nil + } + return r.buff[r.head] +} + +// Values returns all the values in the buffer. +func (r *RingBuff) Values() []interface{} { + r.lock.RLock() + defer r.lock.RUnlock() + if r.head == len(r.buff)-1 { + vals := make([]interface{}, len(r.buff)) + copy(vals, r.buff) + return vals + } + + slice1 := r.buff[r.head+1:] + slice2 := r.buff[:r.head+1] + return append(slice1, slice2...) +} diff --git a/client/stats/ring_test.go b/client/stats/ring_test.go new file mode 100644 index 000000000000..11e4028e28cd --- /dev/null +++ b/client/stats/ring_test.go @@ -0,0 +1,83 @@ +package stats + +import ( + "testing" +) + +func TestRingBuffInvalid(t *testing.T) { + if _, err := NewRingBuff(0); err == nil { + t.Fatalf("expected err") + } +} + +func TestRingBuffEnqueue(t *testing.T) { + rb, err := NewRingBuff(3) + if err != nil { + t.Fatalf("err: %v", err) + } + + rb.Enqueue(1) + rb.Enqueue(2) + rb.Enqueue(3) + if val := rb.Peek(); val != 3 { + t.Fatalf("expected: %v, actual: %v", 3, val) + } + + rb.Enqueue(4) + rb.Enqueue(5) + if val := rb.Peek(); val != 5 { + t.Fatalf("expected: %v, actual: %v", 5, val) + } +} + +func TestRingBuffValues(t *testing.T) { + rb, err := NewRingBuff(3) + if err != nil { + t.Fatalf("err: %v", err) + } + rb.Enqueue(1) + rb.Enqueue(2) + rb.Enqueue(3) + rb.Enqueue(4) + + expected := []interface{}{2, 3, 4} + if !sliceEq(expected, rb.Values()) { + t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) + } + + rb.Enqueue(5) + expected = []interface{}{3, 4, 5} + if !sliceEq(expected, rb.Values()) { + t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) + } + + rb.Enqueue(6) + expected = []interface{}{4, 5, 6} + if !sliceEq(expected, rb.Values()) { + t.Fatalf("expected: %v, actual: %v", expected, rb.Values()) + } + +} + +func sliceEq(slice1, slice2 []interface{}) bool { + + if slice1 == nil && slice2 == nil { + return true + } + + if slice1 == nil || slice2 == nil { + return false + } + + if len(slice1) != len(slice2) { + return false + } + + for i := range slice1 { + if slice1[i] != slice2[i] { + return false + } + } + + return true +} diff --git a/client/task_runner.go b/client/task_runner.go index 7a54ced55f4c..370775801be2 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/client/driver/env" @@ -34,6 +35,17 @@ const ( killFailureLimit = 5 ) +// TaskStatsReporter exposes APIs to query resource usage of a Task +type TaskStatsReporter interface { + // ResourceUsage returns the latest resource usage data point collected for + // the task + ResourceUsage() []*cstructs.TaskResourceUsage + + // ResourceUsageTS returns all the resource usage data points since a given + // time + ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage +} + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { config *config.Config @@ -43,6 +55,9 @@ type TaskRunner struct { alloc *structs.Allocation restartTracker *RestartTracker + resourceUsage *stats.RingBuff + resourceUsageLock sync.RWMutex + task *structs.Task taskEnv *env.TaskEnvironment updateCh chan *structs.Allocation @@ -86,11 +101,18 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) + resourceUsage, err := stats.NewRingBuff(config.StatsDataPoints) + if err != nil { + logger.Printf("[ERR] client: can't create resource usage buffer: %v", err) + return nil + } + tc := &TaskRunner{ config: config, updater: updater, logger: logger, restartTracker: restartTracker, + resourceUsage: resourceUsage, ctx: ctx, alloc: alloc, task: task, @@ -433,9 +455,83 @@ func (r *TaskRunner) startTask() error { r.handleLock.Lock() r.handle = handle r.handleLock.Unlock() + go r.collectResourceUsageStats() return nil } +// collectResourceUsageStats starts collecting resource usage stats of a Task +func (r *TaskRunner) collectResourceUsageStats() { + // start collecting the stats right away and then start collecting every + // collection interval + next := time.NewTimer(0) + defer next.Stop() + for { + select { + case <-next.C: + ru, err := r.handle.Stats() + if err != nil { + r.logger.Printf("[DEBUG] client: error fetching stats of task %v: %v", r.task.Name, err) + } + r.resourceUsageLock.Lock() + r.resourceUsage.Enqueue(ru) + r.resourceUsageLock.Unlock() + next.Reset(r.config.StatsCollectionInterval) + case <-r.handle.WaitCh(): + return + } + } +} + +// TaskStatsReporter returns the stats reporter of the task +func (r *TaskRunner) StatsReporter() TaskStatsReporter { + return r +} + +// ResourceUsage returns the last resource utilization datapoint collected +func (r *TaskRunner) ResourceUsage() []*cstructs.TaskResourceUsage { + r.resourceUsageLock.RLock() + defer r.resourceUsageLock.RUnlock() + val := r.resourceUsage.Peek() + ru, _ := val.(*cstructs.TaskResourceUsage) + return []*cstructs.TaskResourceUsage{ru} +} + +// ResourceUsageTS returns the list of all the resource utilization datapoints +// collected +func (r *TaskRunner) ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage { + r.resourceUsageLock.RLock() + defer r.resourceUsageLock.RUnlock() + + values := r.resourceUsage.Values() + low := 0 + high := len(values) - 1 + var idx int + + for { + mid := (low + high) / 2 + midVal, _ := values[mid].(*cstructs.TaskResourceUsage) + if midVal.Timestamp < since { + low = mid + 1 + } else if midVal.Timestamp > since { + high = mid - 1 + } else if midVal.Timestamp == since { + idx = mid + break + } + if low > high { + idx = low + break + } + } + values = values[idx:] + ts := make([]*cstructs.TaskResourceUsage, len(values)) + for index, val := range values { + ru, _ := val.(*cstructs.TaskResourceUsage) + ts[index] = ru + } + return ts +} + // handleUpdate takes an updated allocation and updates internal state to // reflect the new config for the task. func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { diff --git a/client/task_runner_test.go b/client/task_runner_test.go index af695d935912..afcb31d0182f 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -129,6 +130,13 @@ func TestTaskRunner_Destroy(t *testing.T) { t.Fatalf("err: %v", err) }) + // Make sure we are collecting afew stats + time.Sleep(2 * time.Second) + stats := tr.StatsReporter().ResourceUsage() + if len(stats) == 0 { + t.Fatalf("expected task runner to have some stats") + } + // Begin the tear down tr.Destroy() @@ -397,3 +405,47 @@ func TestTaskRunner_Validate_UserEnforcement(t *testing.T) { } } + +func TestTaskRunnerResouseUsageTS(t *testing.T) { + _, tr := testTaskRunner(false) + + t1, _ := time.Parse(time.RFC822, "02 Jan 06 15:03 MST") + t2, _ := time.Parse(time.RFC822, "02 Jan 06 15:05 MST") + t3, _ := time.Parse(time.RFC822, "02 Jan 06 15:06 MST") + t4, _ := time.Parse(time.RFC822, "02 Jan 06 15:07 MST") + t5, _ := time.Parse(time.RFC822, "02 Jan 06 15:08 MST") + + ru1 := cstructs.TaskResourceUsage{Timestamp: t1.UnixNano()} + ru2 := cstructs.TaskResourceUsage{Timestamp: t2.UnixNano()} + ru3 := cstructs.TaskResourceUsage{Timestamp: t3.UnixNano()} + ru4 := cstructs.TaskResourceUsage{Timestamp: t4.UnixNano()} + ru5 := cstructs.TaskResourceUsage{Timestamp: t5.UnixNano()} + + tr.resourceUsage.Enqueue(&ru1) + tr.resourceUsage.Enqueue(&ru2) + tr.resourceUsage.Enqueue(&ru3) + tr.resourceUsage.Enqueue(&ru4) + tr.resourceUsage.Enqueue(&ru5) + + values := tr.ResourceUsageTS(t3.Add(-1 * time.Second).UnixNano()) + if len(values) != 3 { + t.Fatalf("expected values: 3, actual: %v", len(values)) + } + + values = tr.ResourceUsageTS(t3.UnixNano()) + if len(values) != 3 { + t.Fatalf("expected values: 3, actual: %v", len(values)) + } + + begenning, _ := time.Parse(time.RFC822, "01 Jan 1970 00:00 UTC") + values = tr.ResourceUsageTS(begenning.UnixNano()) + if len(values) != 5 { + t.Fatalf("expected values: 5, actual: %v", len(values)) + } + + values = tr.ResourceUsageTS(t5.Add(1 * time.Second).UnixNano()) + if len(values) != 0 { + t.Fatalf("expected values: 3, actual: %v", len(values)) + } + +} diff --git a/command/agent/agent.go b/command/agent/agent.go index d3c1d7cd18ab..c7e623d8348d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -274,6 +274,9 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) { conf.ConsulConfig = a.consulConfig + conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints + conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval + return conf, nil } diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 7668bfd85661..8fe63ce7916c 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -1,12 +1,18 @@ package agent import ( + "fmt" "net/http" + "strconv" "strings" "github.com/hashicorp/nomad/nomad/structs" ) +const ( + allocNotFoundErr = "allocation not found" +) + func (s *HTTPServer) AllocsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { return nil, CodedError(405, ErrInvalidMethod) @@ -53,3 +59,59 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re } return out.Alloc, nil } + +func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if s.agent.client == nil { + return nil, clientNotRunning + } + + reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/") + + // tokenize the suffix of the path to get the alloc id and find the action + // invoked on the alloc id + tokens := strings.Split(reqSuffix, "/") + if len(tokens) == 1 || tokens[1] != "stats" { + return nil, CodedError(404, allocNotFoundErr) + } + allocID := tokens[0] + + clientStats := s.agent.client.StatsReporter() + allocStats, ok := clientStats.AllocStats()[allocID] + if !ok { + return nil, CodedError(404, "alloc not running on node") + } + + var since int + var err error + ts := false + if sinceTime := req.URL.Query().Get("since"); sinceTime != "" { + ts = true + since, err = strconv.Atoi(sinceTime) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err)) + } + } + + if task := req.URL.Query().Get("task"); task != "" { + taskStats, ok := allocStats.AllocStats()[task] + if !ok { + return nil, CodedError(404, "task not present in allocation") + } + if ts { + return taskStats.ResourceUsageTS(int64(since)), nil + } + return taskStats.ResourceUsage(), nil + } + + // Return the resource usage of all the tasks in an allocation if task name + // is not specified + res := make(map[string]interface{}) + for task, taskStats := range allocStats.AllocStats() { + if ts { + res[task] = taskStats.ResourceUsageTS(int64(since)) + } else { + res[task] = taskStats.ResourceUsage() + } + } + return res, nil +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 7bb80a3053d5..e2a4efd83305 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -3,6 +3,7 @@ package agent import ( "net/http" "net/http/httptest" + "strings" "testing" "github.com/hashicorp/nomad/nomad/mock" @@ -146,3 +147,20 @@ func TestHTTP_AllocQuery(t *testing.T) { } }) } + +func TestHTTP_AllocStats(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/client/allocation/123/foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + _, err = s.Server.ClientAllocRequest(respW, req) + if !strings.Contains(err.Error(), allocNotFoundErr) { + t.Fatalf("err: %v", err) + } + }) +} diff --git a/command/agent/command.go b/command/agent/command.go index d414808fc6da..b9f5d1eeb163 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -188,6 +188,14 @@ func (c *Command) readConfig() *Config { } config.Server.retryInterval = dur + // Parse the stats collection interval + dur, err = time.ParseDuration(config.Client.StatsConfig.CollectionInterval) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing stats collection interval: %s", err)) + return nil + } + config.Client.StatsConfig.collectionInterval = dur + // Check that the server is running in at least one mode. if !(config.Server.Enabled || config.Client.Enabled) { c.Ui.Error("Must specify either server, client or dev mode for the agent.") diff --git a/command/agent/config-test-fixtures/basic.hcl b/command/agent/config-test-fixtures/basic.hcl index b62bc0804a12..38c22ec78144 100644 --- a/command/agent/config-test-fixtures/basic.hcl +++ b/command/agent/config-test-fixtures/basic.hcl @@ -45,6 +45,10 @@ client { client_min_port = 1000 client_max_port = 2000 max_kill_timeout = "10s" + stats { + data_points = 35 + collection_interval = "5s" + } } server { enabled = true diff --git a/command/agent/config.go b/command/agent/config.go index ee99d32e6603..be0cd0db26b7 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -179,6 +179,17 @@ type ConsulConfig struct { ClientAutoJoin bool `mapstructure:"client_auto_join"` } +// StatsConfig determines behavior of resource usage stats collections +type StatsConfig struct { + + // DataPoints is the number of data points Nomad client stores in-memory + DataPoints int `mapstructure:"data_points"` + + // CollectionInterval is the interval of resource usage stats collection + CollectionInterval string `mapstructure:"collection_interval"` + collectionInterval time.Duration `mapstructure:"_"` +} + // ClientConfig is configuration specific to the client mode type ClientConfig struct { // Enabled controls if we are a client @@ -226,6 +237,10 @@ type ClientConfig struct { // be used to target a certain utilization or to prevent Nomad from using a // particular set of ports. Reserved *Resources `mapstructure:"reserved"` + + // StatsConfig determines behavior of resource usage stats collection in + // Nomad client + StatsConfig *StatsConfig `mapstructure:"stats"` } // ServerConfig is configuration specific to the server mode @@ -435,6 +450,11 @@ func DefaultConfig() *Config { ClientMinPort: 14000, ClientMaxPort: 14512, Reserved: &Resources{}, + StatsConfig: &StatsConfig{ + DataPoints: 60, + CollectionInterval: "1s", + collectionInterval: 1 * time.Second, + }, }, Server: &ServerConfig{ Enabled: false, @@ -680,6 +700,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.Reserved != nil { result.Reserved = result.Reserved.Merge(b.Reserved) } + if b.StatsConfig != nil { + result.StatsConfig = result.StatsConfig.Merge(b.StatsConfig) + } // Add the servers result.Servers = append(result.Servers, b.Servers...) @@ -855,6 +878,18 @@ func (r *Resources) Merge(b *Resources) *Resources { return &result } +func (s *StatsConfig) Merge(b *StatsConfig) *StatsConfig { + result := *s + if b.DataPoints != 0 { + result.DataPoints = b.DataPoints + } + if b.CollectionInterval != "" { + result.CollectionInterval = b.CollectionInterval + result.collectionInterval = b.collectionInterval + } + return &result +} + // LoadConfig loads the configuration at the given path, regardless if // its a file or directory. func LoadConfig(path string) (*Config, error) { diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 089493cc383f..4ecbab431173 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -319,6 +319,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { "client_max_port", "client_min_port", "reserved", + "stats", } if err := checkHCLKeys(listVal, valid); err != nil { return err @@ -332,6 +333,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { delete(m, "options") delete(m, "meta") delete(m, "reserved") + delete(m, "stats") var config ClientConfig if err := mapstructure.WeakDecode(m, &config); err != nil { @@ -373,10 +375,55 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error { } } + // Parse stats config + if o := listVal.Filter("stats"); len(o.Items) > 0 { + if err := parseStats(&config.StatsConfig, o); err != nil { + return multierror.Prefix(err, "stats ->") + } + } + *result = &config return nil } +func parseStats(result **StatsConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'stats' block allowed") + } + + // Get our stats object + obj := list.Items[0] + + var listVal *ast.ObjectList + if ot, ok := obj.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return fmt.Errorf("client value: should be an object") + } + + // check for invalid keys + valid := []string{ + "data_points", + "collection_interval", + } + if err := checkHCLKeys(listVal, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, listVal); err != nil { + return err + } + var stats StatsConfig + if err := mapstructure.WeakDecode(m, &stats); err != nil { + return err + } + *result = &stats + + return nil +} + func parseReserved(result **Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) > 1 { diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 7b9f3d5b631b..6012ba881269 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -63,6 +63,10 @@ func TestConfig_Parse(t *testing.T) { ReservedPorts: "1,100,10-12", ParsedReservedPorts: []int{1, 10, 11, 12, 100}, }, + StatsConfig: &StatsConfig{ + DataPoints: 35, + CollectionInterval: "5s", + }, }, Server: &ServerConfig{ Enabled: true, diff --git a/command/agent/http.go b/command/agent/http.go index 168021185b47..5b5f6fd5121c 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -114,6 +114,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/evaluation/", s.wrap(s.EvalSpecificRequest)) s.mux.HandleFunc("/v1/client/fs/", s.wrap(s.FsRequest)) + s.mux.HandleFunc("/v1/client/stats/", s.wrap(s.ClientStatsRequest)) + s.mux.HandleFunc("/v1/client/allocation/", s.wrap(s.ClientAllocRequest)) s.mux.HandleFunc("/v1/agent/self", s.wrap(s.AgentSelfRequest)) s.mux.HandleFunc("/v1/agent/join", s.wrap(s.AgentJoinRequest)) diff --git a/command/agent/stats_endpoint.go b/command/agent/stats_endpoint.go new file mode 100644 index 000000000000..0cf5b8dc562a --- /dev/null +++ b/command/agent/stats_endpoint.go @@ -0,0 +1,34 @@ +package agent + +import ( + "fmt" + "net/http" + "strconv" +) + +const ( + invalidSinceErrPrefix = "can't read the since query parameter" +) + +func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if s.agent.client == nil { + return nil, clientNotRunning + } + + var since int + var err error + ts := false + if sinceTime := req.URL.Query().Get("since"); sinceTime != "" { + ts = true + since, err = strconv.Atoi(sinceTime) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("%s: %v", invalidSinceErrPrefix, err)) + } + } + + clientStats := s.agent.client.StatsReporter() + if ts { + return clientStats.HostStatsTS(int64(since)), nil + } + return clientStats.HostStats(), nil +} diff --git a/command/agent/stats_endpoint_test.go b/command/agent/stats_endpoint_test.go new file mode 100644 index 000000000000..6f9649ab63f4 --- /dev/null +++ b/command/agent/stats_endpoint_test.go @@ -0,0 +1,23 @@ +package agent + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestClientStatsRequest(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + req, err := http.NewRequest("GET", "/v1/client/stats/?since=foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + respW := httptest.NewRecorder() + _, err = s.Server.ClientStatsRequest(respW, req) + if !strings.ContainsAny(err.Error(), invalidSinceErrPrefix) { + t.Fatalf("unexpected err: %v", err) + } + }) +} diff --git a/command/alloc_status.go b/command/alloc_status.go index 700a175469c4..289880da0485 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -3,6 +3,7 @@ package command import ( "fmt" "sort" + "strconv" "strings" "time" @@ -119,6 +120,11 @@ func (c *AllocStatusCommand) Run(args []string) int { return 1 } + stats, err := client.Allocations().Stats(alloc, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("couldn't retreive stats: %v", err)) + } + // Format the allocation data basic := []string{ fmt.Sprintf("ID|%s", limit(alloc.ID, length)), @@ -140,7 +146,7 @@ func (c *AllocStatusCommand) Run(args []string) int { c.Ui.Output(formatKV(basic)) if !short { - c.taskResources(alloc) + c.taskResources(alloc, stats) } // Print the state of each task. @@ -302,7 +308,7 @@ func (c *AllocStatusCommand) allocResources(alloc *api.Allocation) { } // taskResources prints out the tasks current resource usage -func (c *AllocStatusCommand) taskResources(alloc *api.Allocation) { +func (c *AllocStatusCommand) taskResources(alloc *api.Allocation, stats map[string]*api.TaskResourceUsage) { if len(alloc.TaskResources) == 0 { return } @@ -338,9 +344,17 @@ func (c *AllocStatusCommand) taskResources(alloc *api.Allocation) { if len(addr) > 0 { firstAddr = addr[0] } + cpuUsage := strconv.Itoa(resource.CPU) + memUsage := strconv.Itoa(resource.MemoryMB) + if ru, ok := stats[task]; ok { + cpuStats := ru.ResourceUsage.CpuStats + memoryStats := ru.ResourceUsage.MemoryStats + cpuUsage = fmt.Sprintf("%v/%v", (cpuStats.SystemMode + cpuStats.UserMode), resource.CPU) + memUsage = fmt.Sprintf("%v/%v", memoryStats.RSS/(1024*1024), resource.MemoryMB) + } resourcesOutput = append(resourcesOutput, fmt.Sprintf("%v|%v|%v|%v|%v", - resource.CPU, - resource.MemoryMB, + cpuUsage, + memUsage, resource.DiskMB, resource.IOPS, firstAddr)) diff --git a/command/node_status.go b/command/node_status.go index 66e14335fdad..05a5c38668eb 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -3,7 +3,11 @@ package command import ( "fmt" "sort" + "strconv" "strings" + "time" + + "github.com/dustin/go-humanize" "github.com/hashicorp/nomad/api" ) @@ -53,6 +57,7 @@ func (c *NodeStatusCommand) Synopsis() string { func (c *NodeStatusCommand) Run(args []string) int { var short, verbose, list_allocs, self bool + var hostStats *api.HostStats flags := c.Meta.FlagSet("node-status", FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } @@ -193,6 +198,10 @@ func (c *NodeStatusCommand) Run(args []string) int { return 1 } + if hostStats, err = client.Nodes().Stats(node.ID, nil); err != nil { + c.Ui.Error(fmt.Sprintf("error fetching node resource utilization stats: %v", err)) + } + // Format the output basic := []string{ fmt.Sprintf("ID|%s", limit(node.ID, length)), @@ -202,6 +211,10 @@ func (c *NodeStatusCommand) Run(args []string) int { fmt.Sprintf("Drain|%v", node.Drain), fmt.Sprintf("Status|%s", node.Status), } + if hostStats != nil { + uptime := time.Duration(hostStats.Uptime * uint64(time.Second)) + basic = append(basic, fmt.Sprintf("Uptime|%s", uptime.String())) + } c.Ui.Output(formatKV(basic)) if !short { @@ -212,6 +225,14 @@ func (c *NodeStatusCommand) Run(args []string) int { } c.Ui.Output("\n==> Resource Utilization") c.Ui.Output(formatList(resources)) + if hostStats != nil { + c.Ui.Output("\n===> Node CPU Stats") + c.printCpuStats(hostStats) + c.Ui.Output("\n===> Node Memory Stats") + c.printMemoryStats(hostStats) + c.Ui.Output("\n===> Node Disk Stats") + c.printDiskStats(hostStats) + } allocs, err := getAllocs(client, node, length) if err != nil { @@ -246,6 +267,43 @@ func (c *NodeStatusCommand) Run(args []string) int { return 0 } +func (c *NodeStatusCommand) printCpuStats(hostStats *api.HostStats) { + for _, cpuStat := range hostStats.CPU { + cpuStatsAttr := make([]string, 4) + cpuStatsAttr[0] = fmt.Sprintf("CPU|%v", cpuStat.CPU) + cpuStatsAttr[1] = fmt.Sprintf("User|%v", formatFloat64(cpuStat.User)) + cpuStatsAttr[2] = fmt.Sprintf("System|%v", formatFloat64(cpuStat.System)) + cpuStatsAttr[3] = fmt.Sprintf("Idle|%v", formatFloat64(cpuStat.Idle)) + c.Ui.Output(formatKV(cpuStatsAttr)) + c.Ui.Output("") + } +} + +func (c *NodeStatusCommand) printMemoryStats(hostStats *api.HostStats) { + memoryStat := hostStats.Memory + memStatsAttr := make([]string, 4) + memStatsAttr[0] = fmt.Sprintf("Total|%v", humanize.Bytes(memoryStat.Total)) + memStatsAttr[1] = fmt.Sprintf("Available|%v", humanize.Bytes(memoryStat.Available)) + memStatsAttr[2] = fmt.Sprintf("Used|%v", humanize.Bytes(memoryStat.Used)) + memStatsAttr[3] = fmt.Sprintf("Free|%v", humanize.Bytes(memoryStat.Free)) + c.Ui.Output(formatKV(memStatsAttr)) +} + +func (c *NodeStatusCommand) printDiskStats(hostStats *api.HostStats) { + for _, diskStat := range hostStats.DiskStats { + diskStatsAttr := make([]string, 7) + diskStatsAttr[0] = fmt.Sprintf("Device|%s", diskStat.Device) + diskStatsAttr[1] = fmt.Sprintf("MountPoint|%s", diskStat.Mountpoint) + diskStatsAttr[2] = fmt.Sprintf("Size|%s", humanize.Bytes(diskStat.Size)) + diskStatsAttr[3] = fmt.Sprintf("Used|%s", humanize.Bytes(diskStat.Used)) + diskStatsAttr[4] = fmt.Sprintf("Available|%s", humanize.Bytes(diskStat.Available)) + diskStatsAttr[5] = fmt.Sprintf("Used Percent|%s", formatFloat64(diskStat.UsedPercent)) + diskStatsAttr[6] = fmt.Sprintf("Inodes Percent|%s", formatFloat64(diskStat.InodesUsedPercent)) + c.Ui.Output(formatKV(diskStatsAttr)) + c.Ui.Output("") + } +} + // getRunningAllocs returns a slice of allocation id's running on the node func getRunningAllocs(client *api.Client, nodeID string) ([]*api.Allocation, error) { var allocs []*api.Allocation @@ -323,3 +381,7 @@ func getResources(client *api.Client, node *api.Node) ([]string, error) { return resources, err } + +func formatFloat64(val float64) string { + return strconv.FormatFloat(val, 'f', 2, 64) +} diff --git a/command/stats.go b/command/stats.go new file mode 100644 index 000000000000..1ae603332281 --- /dev/null +++ b/command/stats.go @@ -0,0 +1,168 @@ +package command + +import ( + "fmt" + "strconv" + "strings" + + "github.com/dustin/go-humanize" + + "github.com/hashicorp/nomad/api" +) + +type StatsCommand struct { + Meta +} + +func (f *StatsCommand) Help() string { + helpText := ` +Usage: nomad stats [options] + + Displays statistics related to resource usage of tasks in an allocation. Use + the -task flag to query statistics of an individual task running in an + allocation. + +General Options: + + ` + generalOptionsUsage() + ` + +Node Stats Options: + + -task + Display statistics for a specific task in an allocation. +` + + return strings.TrimSpace(helpText) +} + +func (f *StatsCommand) Synopsis() string { + return "Dispalys resource usage stats of an allocation or a task running on a nomad client" +} + +func (f *StatsCommand) Run(args []string) int { + var verbose bool + var task string + flags := f.Meta.FlagSet("stats", FlagSetClient) + flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&task, "task", "", "") + flags.Usage = func() { f.Ui.Output(f.Help()) } + + if err := flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + if len(args) < 1 { + f.Ui.Error("allocation id is a required parameter") + return 1 + } + client, err := f.Meta.Client() + if err != nil { + f.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + var allocID string + allocID = strings.TrimSpace(args[0]) + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Query the allocation info + if len(allocID) == 1 { + f.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + f.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) + } + f.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + f.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + stats, err := client.Allocations().Stats(alloc, nil) + if err != nil { + f.Ui.Error(fmt.Sprintf("unable to get stats: %v", err)) + return 1 + } + if task == "" { + f.printAllocResourceUsage(alloc, stats) + } else { + f.printTaskResourceUsage(task, stats) + } + return 0 +} + +func (f *StatsCommand) printTaskResourceUsage(task string, resourceUsage map[string]*api.TaskResourceUsage) { + tu, ok := resourceUsage[task] + if !ok { + return + } + memoryStats := tu.ResourceUsage.MemoryStats + cpuStats := tu.ResourceUsage.CpuStats + f.Ui.Output(fmt.Sprintf("===> Task: %q", task)) + f.Ui.Output("Memory Stats") + out := make([]string, 2) + out[0] = "RSS|Cache|Swap|Max Usage|Kernel Usage|KernelMaxUsage" + out[1] = fmt.Sprintf("%v|%v|%v|%v|%v|%v", + humanize.Bytes(memoryStats.RSS), + humanize.Bytes(memoryStats.Cache), + humanize.Bytes(memoryStats.Swap), + humanize.Bytes(memoryStats.MaxUsage), + humanize.Bytes(memoryStats.KernelUsage), + humanize.Bytes(memoryStats.KernelMaxUsage), + ) + f.Ui.Output(formatList(out)) + + f.Ui.Output("") + + f.Ui.Output("CPU Stats") + out = make([]string, 2) + out[0] = "Percent|Throttled Periods|Throttled Time" + percent := strconv.FormatFloat(cpuStats.Percent, 'f', 2, 64) + out[1] = fmt.Sprintf("%v|%v|%v", percent, + cpuStats.ThrottledPeriods, cpuStats.ThrottledTime) + f.Ui.Output(formatList(out)) +} + +func (f *StatsCommand) printAllocResourceUsage(alloc *api.Allocation, resourceUsage map[string]*api.TaskResourceUsage) { + f.Ui.Output(fmt.Sprintf("Resource Usage of Tasks running in Allocation %q", alloc.ID)) + for task, _ := range alloc.TaskStates { + f.printTaskResourceUsage(task, resourceUsage) + } +} diff --git a/commands.go b/commands.go index b402f429bc8e..023a74d7d23a 100644 --- a/commands.go +++ b/commands.go @@ -121,6 +121,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "stats": func() (cli.Command, error) { + return &command.StatsCommand{ + Meta: meta, + }, nil + }, "status": func() (cli.Command, error) { return &command.StatusCommand{ Meta: meta, diff --git a/vendor/github.com/mitchellh/go-ps/LICENSE.md b/vendor/github.com/mitchellh/go-ps/LICENSE.md new file mode 100644 index 000000000000..229851590442 --- /dev/null +++ b/vendor/github.com/mitchellh/go-ps/LICENSE.md @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Mitchell Hashimoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/mitchellh/go-ps/README.md b/vendor/github.com/mitchellh/go-ps/README.md new file mode 100644 index 000000000000..11cea98f0829 --- /dev/null +++ b/vendor/github.com/mitchellh/go-ps/README.md @@ -0,0 +1,36 @@ +# Process List Library for Go + +go-ps is a library for Go that implements OS-specific APIs to list and +manipulate processes in a platform-safe way. The library can find and +list processes on Linux, Mac OS X, and Windows. + +If you're new to Go, this library has a good amount of advanced Go educational +value as well. It uses some advanced features of Go: build tags, accessing +DLL methods for Windows, cgo for Darwin, etc. + +How it works: + + * **Darwin** uses the `sysctl` syscall to retrieve the process table, via + cgo. + * **Unix** uses the procfs at `/proc` to inspect the process tree. + * **Windows** uses the Windows API, and methods such as + `CreateToolhelp32Snapshot` to get a point-in-time snapshot of + the process table. + +## Installation + +Install using standard `go get`: + +``` +$ go get github.com/mitchellh/go-ps +... +``` + +## TODO + +Want to contribute? Here is a short TODO list of things that aren't +implemented for this library that would be nice: + + * FreeBSD support + * Plan9 support + * Eliminate the need for cgo with Darwin diff --git a/vendor/github.com/mitchellh/go-ps/Vagrantfile b/vendor/github.com/mitchellh/go-ps/Vagrantfile new file mode 100644 index 000000000000..61662ab1e3e7 --- /dev/null +++ b/vendor/github.com/mitchellh/go-ps/Vagrantfile @@ -0,0 +1,43 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + config.vm.box = "chef/ubuntu-12.04" + + config.vm.provision "shell", inline: $script + + ["vmware_fusion", "vmware_workstation"].each do |p| + config.vm.provider "p" do |v| + v.vmx["memsize"] = "1024" + v.vmx["numvcpus"] = "2" + v.vmx["cpuid.coresPerSocket"] = "1" + end + end +end + +$script = <