Skip to content

Commit

Permalink
Merge pull request #5018 from hashicorp/f-executor-stats
Browse files Browse the repository at this point in the history
executor: streaming stats api
  • Loading branch information
nickethier committed Jan 14, 2019
2 parents f10c625 + 9037797 commit 97a73e0
Show file tree
Hide file tree
Showing 36 changed files with 1,146 additions and 663 deletions.
3 changes: 2 additions & 1 deletion client/allocrunner/interfaces/task_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package interfaces

import (
"context"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
Expand Down Expand Up @@ -88,7 +89,7 @@ type TaskPrestartHook interface {

// DriverStats is the interface implemented by DriverHandles to return task stats.
type DriverStats interface {
Stats() (*cstructs.TaskResourceUsage, error)
Stats(context.Context, time.Duration) (<-chan *cstructs.TaskResourceUsage, error)
}

type TaskPoststartRequest struct {
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/driver_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (h *DriverHandle) Kill() error {
return h.driver.StopTask(h.taskID, h.task.KillTimeout, h.task.KillSignal)
}

func (h *DriverHandle) Stats() (*cstructs.TaskResourceUsage, error) {
return h.driver.TaskStats(h.taskID)
func (h *DriverHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
return h.driver.TaskStats(ctx, h.taskID, interval)
}

func (h *DriverHandle) Signal(s string) error {
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/lazy_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ TRY:
return out, c, err
}

func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) {
func (l *LazyHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
h, err := l.getHandle()
if err != nil {
return nil, err
Expand All @@ -138,7 +138,7 @@ func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) {
first := true

TRY:
out, err := h.Stats()
out, err := h.Stats(ctx, interval)
if err == bstructs.ErrPluginShutdown && first {
first = false

Expand Down
102 changes: 66 additions & 36 deletions client/allocrunner/taskrunner/stats_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)

Expand All @@ -22,8 +23,8 @@ type statsHook struct {
updater StatsUpdater
interval time.Duration

// stopCh is closed by Exited or Canceled
stopCh chan struct{}
// cancel is called by Exited
cancel context.CancelFunc

mu sync.Mutex

Expand All @@ -48,13 +49,19 @@ func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststart
defer h.mu.Unlock()

// This shouldn't happen, but better safe than risk leaking a goroutine
if h.stopCh != nil {
if h.cancel != nil {
h.logger.Debug("poststart called twice without exiting between")
close(h.stopCh)
h.cancel()
}

h.stopCh = make(chan struct{})
go h.collectResourceUsageStats(req.DriverStats, h.stopCh)
// Using a new context here because the existing context is for the scope of
// the Poststart request. If that context was used, stats collection would
// stop when the task was killed. It makes for more readable code and better
// follows the taskrunner hook model to create a new context that can be
// canceled on the Exited hook.
ctx, cancel := context.WithCancel(context.Background())
h.cancel = cancel
go h.collectResourceUsageStats(ctx, req.DriverStats)

return nil
}
Expand All @@ -63,40 +70,47 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte
h.mu.Lock()
defer h.mu.Unlock()

if h.stopCh == nil {
if h.cancel == nil {
// No stats running
return nil
}

// Close chan to stop stats collection
close(h.stopCh)
// Call cancel to stop stats collection
h.cancel()

// Clear chan so we don't double close for any reason
h.stopCh = nil
// Clear cancel func so we don't double call for any reason
h.cancel = nil

return nil
}

// collectResourceUsageStats starts collecting resource usage stats of a Task.
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, stopCh <-chan struct{}) {
// start collecting the stats right away and then start collecting every
// collection interval
next := time.NewTimer(0)
defer next.Stop()
func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {

ch, err := handle.Stats(ctx, h.interval)
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
h.logger.Error("failed to start stats collection for task", "error", err)
}

var backoff time.Duration
var retry int
limit := time.Second * 5
for {
time.Sleep(backoff)
select {
case <-next.C:
// Reset the timer
next.Reset(h.interval)

// Collect stats from driver
ru, err := handle.Stats()
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
case ru, ok := <-ch:
// Channel is closed
if !ok {
var re *structs.RecoverableError
ch, err = handle.Stats(ctx, h.interval)
if err == nil {
goto RETRY
}

// We do not log when the plugin is shutdown since this is
Expand All @@ -105,15 +119,36 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)
RETRY:
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * time.Second
if backoff > limit {
backoff = limit
}
// Increment retry counter
retry++

continue
}

// Update stats on TaskRunner and emit them
h.updater.UpdateStats(ru)
case <-stopCh:
return

default:
select {
case <-ctx.Done():
return
default:
}
}
}
}
Expand All @@ -122,14 +157,9 @@ func (h *statsHook) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()

if h.stopCh == nil {
if h.cancel == nil {
return
}

select {
case <-h.stopCh:
// Already closed
default:
close(h.stopCh)
}
h.cancel()
}
12 changes: 10 additions & 2 deletions client/allocrunner/taskrunner/stats_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type mockDriverStats struct {
err error
}

func (m *mockDriverStats) Stats() (*cstructs.TaskResourceUsage, error) {
func (m *mockDriverStats) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
if m.err != nil {
return nil, m.err
}
Expand All @@ -59,7 +59,15 @@ func (m *mockDriverStats) Stats() (*cstructs.TaskResourceUsage, error) {
Pids: map[string]*cstructs.ResourceUsage{},
}
ru.Pids["task"] = ru.ResourceUsage
return ru, nil
ch := make(chan *cstructs.TaskResourceUsage)
go func() {
defer close(ch)
select {
case <-ctx.Done():
case ch <- ru:
}
}()
return ch, nil
}

// TestTaskRunner_StatsHook_PoststartExited asserts the stats hook starts and
Expand Down
2 changes: 2 additions & 0 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ var (
// and is used to parse the contents of the 'plugin "docker" {...}' block.
// Example:
// plugin "docker" {
// config {
// endpoint = "unix:///var/run/docker.sock"
// auth {
// config = "/etc/docker-auth.json"
Expand All @@ -152,6 +153,7 @@ var (
// }
// allow_privileged = false
// allow_caps = ["CHOWN", "NET_RAW" ... ]
// }
// }
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"endpoint": hclspec.NewAttr("endpoint", "string", false),
Expand Down
6 changes: 2 additions & 4 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
}

d.tasks.Set(handle.Config.ID, h)
go h.collectStats()
go h.run()

return nil
Expand Down Expand Up @@ -290,7 +289,6 @@ CREATE:
}

d.tasks.Set(cfg.ID, h)
go h.collectStats()
go h.run()

return handle, net, nil
Expand Down Expand Up @@ -1087,13 +1085,13 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
return status, nil
}

func (d *Driver) TaskStats(taskID string) (*drivers.TaskResourceUsage, error) {
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
h, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}

return h.Stats()
return h.Stats(ctx, interval)
}

func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
Expand Down
19 changes: 10 additions & 9 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,16 +1478,17 @@ func TestDockerDriver_Stats(t *testing.T) {
require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))

go func() {
time.Sleep(3 * time.Second)
ru, err := handle.Stats()
if err != nil {
t.Fatalf("err: %v", err)
}
if ru.ResourceUsage == nil {
d.DestroyTask(task.ID, true)
t.Fatalf("expected resource usage")
defer d.DestroyTask(task.ID, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, err := handle.Stats(ctx, 1*time.Second)
assert.NoError(t, err)
select {
case ru := <-ch:
assert.NotNil(t, ru.ResourceUsage)
case <-time.After(3 * time.Second):
assert.Fail(t, "stats timeout")
}
d.DestroyTask(task.ID, true)
}()

waitCh, err := d.WaitTask(context.Background(), task.ID)
Expand Down
Loading

0 comments on commit 97a73e0

Please sign in to comment.