Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation resources returned in a struct #1260

Merged
merged 3 commits into from
Jun 12, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
return &resp, qm, nil
}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*TaskResourceUsage, error) {
func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, err
Expand All @@ -58,13 +58,9 @@ func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*Tas
if err != nil {
return nil, err
}
resp := make(map[string][]*TaskResourceUsage)
var resp AllocResourceUsage
_, err = client.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
res := make(map[string]*TaskResourceUsage)
for task, ru := range resp {
res[task] = ru[0]
}
return res, err
return &resp, err
}

// Allocation is used for serialization of allocations.
Expand Down
8 changes: 8 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ type TaskResourceUsage struct {
Pids map[string]*ResourceUsage
}

// AllocResourceUsage holds the aggregated task resource usage of the
// allocation.
type AllocResourceUsage struct {
ResourceUsage *ResourceUsage
Tasks map[string]*TaskResourceUsage
Timestamp int64
}

// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Expand Down
76 changes: 51 additions & 25 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"

cstructs "github.com/hashicorp/nomad/client/structs"
)

const (
Expand All @@ -28,11 +30,6 @@ const (
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)

// AllocStatsReporter exposes stats related APIs of an allocation runner
type AllocStatsReporter interface {
AllocStats() map[string]TaskStatsReporter
}

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
Expand Down Expand Up @@ -476,33 +473,62 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// StatsReporter returns an interface to query resource usage statistics of an
// allocation
func (r *AllocRunner) StatsReporter() AllocStatsReporter {
return r
}

// AllocStats returns the stats reporter of all the tasks running in the
// allocation
func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter {
// LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set
// the allocation stats will only include the given task.
func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
Copy link
Contributor

@diptanu diptanu Jun 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dadgar I think we should keep the StatsReporter interface, so that we can add methods around histograms etc, otherwise we will have to add those methods in the client as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

r.taskLock.RLock()
defer r.taskLock.RUnlock()
res := make(map[string]TaskStatsReporter)
for task, tr := range r.tasks {
res[task] = tr.StatsReporter()

astat := &cstructs.AllocResourceUsage{
Tasks: make(map[string]*cstructs.TaskResourceUsage),
}
return res

var flat []*cstructs.TaskResourceUsage
if taskFilter != "" {
tr, ok := r.tasks[taskFilter]
if !ok {
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
}
l := tr.LatestResourceUsage()
if l != nil {
astat.Tasks[taskFilter] = l
flat = []*cstructs.TaskResourceUsage{l}
astat.Timestamp = l.Timestamp
}
} else {
for task, tr := range r.tasks {
l := tr.LatestResourceUsage()
if l != nil {
astat.Tasks[task] = l
flat = append(flat, l)
if l.Timestamp > astat.Timestamp {
astat.Timestamp = l.Timestamp
}
}
}
}

astat.ResourceUsage = sumTaskResourceUsage(flat)
return astat, nil
}

// TaskStats returns the stats reporter for a specific task running in the
// allocation
func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) {
tr, ok := r.tasks[task]
if !ok {
return nil, fmt.Errorf("task %q not running in allocation %v", task, r.alloc.ID)
// sumTaskResourceUsage takes a set of task resources and sums their resources
func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.ResourceUsage {
summed := &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{},
CpuStats: &cstructs.CpuStats{},
}
for _, usage := range usages {
summed.Add(usage.ResourceUsage)
}
return summed
}

return tr.StatsReporter(), nil
// AllocStatsSince returns the allocation stats collected since the passed unix
// nanosecond timestamp. If the optional taskFilter is set the allocation stats
// will only include the given task.
func (r *AllocRunner) AllocStatsSince(taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) {
return nil, nil
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
Expand Down
55 changes: 38 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/hashstructure"

cstructs "github.com/hashicorp/nomad/client/structs"
)

const (
Expand Down Expand Up @@ -81,15 +83,20 @@ const (
// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
// AllocStats returns a map of alloc ids and their corresponding stats
// collector
AllocStats() map[string]AllocStatsReporter
// LatestAllocStats returns the latest allocation resource usage optionally
// filtering by task name
LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error)

// AllocStatsSince returns the allocation resource usage collected since the
// passed timestamp optionally filtering by task name.
AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error)

// HostStats returns resource usage stats for the host
HostStats() []*stats.HostStats
// LatestHostStats returns the latest resource usage stats for the host
LatestHostStats() *stats.HostStats

// HostStatsTS returns a time series of host resource usage stats
HostStatsTS(since int64) []*stats.HostStats
// HostStatsSince returns the collect resource usage stats for the host
// since the passed unix nanosecond time stamp
HostStatsSince(since int64) []*stats.HostStats
}

// Client is used to implement the client interaction with Nomad. Clients
Expand Down Expand Up @@ -400,26 +407,40 @@ func (c *Client) StatsReporter() ClientStatsReporter {
return c
}

// AllocStats returns all the stats reporter of the allocations running on a
// Nomad client
func (c *Client) AllocStats() map[string]AllocStatsReporter {
res := make(map[string]AllocStatsReporter)
for alloc, ar := range c.getAllocRunners() {
res[alloc] = ar
// LatestAllocStats returns the latest allocation resource usage optionally
// filtering by task name
func (c *Client) LatestAllocStats(allocID, taskFilter string) (*cstructs.AllocResourceUsage, error) {
c.allocLock.RLock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
}
return res
c.allocLock.RUnlock()
return ar.LatestAllocStats(taskFilter)
}

// AllocStatsSince returns the allocation resource usage collected since the
// passed timestamp optionally filtering by task name.
func (c *Client) AllocStatsSince(allocID, taskFilter string, since int64) ([]*cstructs.AllocResourceUsage, error) {
c.allocLock.RLock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, fmt.Errorf("unknown allocation ID %q", allocID)
}
c.allocLock.RUnlock()
return ar.AllocStatsSince(taskFilter, since)
}

// HostStats returns all the stats related to a Nomad client
func (c *Client) HostStats() []*stats.HostStats {
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
val := c.resourceUsage.Peek()
ru, _ := val.(*stats.HostStats)
return []*stats.HostStats{ru}
return ru
}

func (c *Client) HostStatsTS(since int64) []*stats.HostStats {
func (c *Client) HostStatsSince(since int64) []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()

Expand Down
15 changes: 8 additions & 7 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/helper/fields"
shelpers "github.com/hashicorp/nomad/helper/stats"
Expand Down Expand Up @@ -140,7 +141,7 @@ type DockerHandle struct {
maxKillTimeout time.Duration
resourceUsageLock sync.RWMutex
resourceUsage *cstructs.TaskResourceUsage
waitCh chan *cstructs.WaitResult
waitCh chan *dstructs.WaitResult
doneCh chan bool
}

Expand Down Expand Up @@ -541,7 +542,7 @@ func (d *DockerDriver) recoverablePullError(err error, image string) error {
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
return dstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Periodic() (bool, time.Duration) {
Expand Down Expand Up @@ -797,7 +798,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
doneCh: make(chan bool),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil {
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
Expand Down Expand Up @@ -872,7 +873,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
killTimeout: pid.KillTimeout,
maxKillTimeout: pid.MaxKillTimeout,
doneCh: make(chan bool),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil {
h.logger.Printf("[ERR] driver.docker: error registering services with consul: %v", err)
Expand Down Expand Up @@ -904,7 +905,7 @@ func (h *DockerHandle) ContainerID() string {
return h.containerID
}

func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
func (h *DockerHandle) WaitCh() chan *dstructs.WaitResult {
return h.waitCh
}

Expand Down Expand Up @@ -961,7 +962,7 @@ func (h *DockerHandle) run() {
}

close(h.doneCh)
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
h.waitCh <- dstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)

// Remove services
Expand Down
5 changes: 3 additions & 2 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)

// BuiltinDrivers contains the built in registered drivers
Expand Down Expand Up @@ -105,7 +106,7 @@ type DriverHandle interface {
ID() string

// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *cstructs.WaitResult
WaitCh() chan *dstructs.WaitResult

// Update is used to update the task if possible and update task related
// configurations.
Expand Down
17 changes: 9 additions & 8 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/helper/fields"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -42,13 +43,13 @@ type ExecDriverConfig struct {
type execHandle struct {
pluginClient *plugin.Client
executor executor.Executor
isolationConfig *cstructs.IsolationConfig
isolationConfig *dstructs.IsolationConfig
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
waitCh chan *dstructs.WaitResult
doneCh chan struct{}
version string
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
logger: d.logger,
version: d.config.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err)
Expand All @@ -169,7 +170,7 @@ type execId struct {
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *cstructs.IsolationConfig
IsolationConfig *dstructs.IsolationConfig
PluginConfig *PluginReattachConfig
}

Expand Down Expand Up @@ -217,7 +218,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if err := exec.SyncServices(consulContext(d.config, "")); err != nil {
d.logger.Printf("[ERR] driver.exec: error registering services with consul: %v", err)
Expand All @@ -244,7 +245,7 @@ func (h *execHandle) ID() string {
return string(data)
}

func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
func (h *execHandle) WaitCh() chan *dstructs.WaitResult {
return h.waitCh
}

Expand Down Expand Up @@ -305,7 +306,7 @@ func (h *execHandle) run() {
h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, ps.Signal, err)
h.waitCh <- dstructs.NewWaitResult(ps.ExitCode, ps.Signal, err)
close(h.waitCh)
// Remove services
if err := h.executor.DeregisterServices(); err != nil {
Expand Down
Loading