Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task runner recovers from external plugin exiting #5152

Merged
merged 12 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions client/allocrunner/taskrunner/lazy_handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package taskrunner

import (
"context"
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)

const (
// retrieveBackoffBaseline is the baseline time for exponential backoff while
// retrieving a handle.
retrieveBackoffBaseline = 250 * time.Millisecond

// retrieveBackoffLimit is the limit of the exponential backoff for
// retrieving a handle.
retrieveBackoffLimit = 5 * time.Second

// retrieveFailureLimit is how many times we will attempt to retrieve a
// new handle before giving up.
retrieveFailureLimit = 5
)

// retrieveHandleFn is used to retrieve the latest driver handle
type retrieveHandleFn func() *DriverHandle

// LazyHandle is used to front calls to a DriverHandle where it is expected the
// existing handle may no longer be valid because the backing plugin has
// shutdown. LazyHandle detects the plugin shutting down and retrieves a new
// handle so that the consumer does not need to worry whether the handle is to
// the latest driver instance.
type LazyHandle struct {
// retrieveHandle is used to retrieve the latest handle
retrieveHandle retrieveHandleFn

// h is the current handle and may be nil
h *DriverHandle

// shutdownCtx is used to cancel retries if the agent is shutting down
shutdownCtx context.Context

logger log.Logger
sync.Mutex
}

// NewLazyHandle takes the function to receive the latest handle and a logger
// and returns a LazyHandle
func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle {
return &LazyHandle{
retrieveHandle: fn,
h: fn(),
shutdownCtx: shutdownCtx,
logger: logger.Named("lazy_handle"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a named sublogger helpful here? I feel like we may have gone overboard with subloggers and now have some really long named loggers that don't really help end users at all (eg client.alloc_runner.task_runner)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect the log to be emitted but the named logger helps for debugability since we will be able to easily narrow down the log lines

}
}

// getHandle returns the current handle or retrieves a new one
func (l *LazyHandle) getHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()

if l.h != nil {
return l.h, nil
}

return l.refreshHandleLocked()
}

// refreshHandle retrieves a new handle
func (l *LazyHandle) refreshHandle() (*DriverHandle, error) {
l.Lock()
defer l.Unlock()
return l.refreshHandleLocked()
}

// refreshHandleLocked retrieves a new handle and should be called with the lock
// held. It will retry to give the client time to restart the driver and restore
// the handle.
func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) {
for i := 0; i < retrieveFailureLimit; i++ {
l.h = l.retrieveHandle()
if l.h != nil {
return l.h, nil
}

// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * retrieveBackoffBaseline
if backoff > retrieveBackoffLimit {
backoff = retrieveBackoffLimit
}

l.logger.Debug("failed to retrieve handle", "backoff", backoff)

select {
case <-l.shutdownCtx.Done():
return nil, l.shutdownCtx.Err()
case <-time.After(backoff):
}
}

return nil, fmt.Errorf("no driver handle")
schmichael marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
h, err := l.getHandle()
if err != nil {
return nil, 0, err
}

// Only retry once
first := true

TRY:
out, c, err := h.Exec(timeout, cmd, args)
if err == bstructs.ErrPluginShutdown && first {
first = false

h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}

return out, c, err
}

func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) {
h, err := l.getHandle()
if err != nil {
return nil, err
}

// Only retry once
first := true

TRY:
out, err := h.Stats()
if err == bstructs.ErrPluginShutdown && first {
first = false

h, err = l.refreshHandle()
if err == nil {
goto TRY
}
}

return out, err
}
12 changes: 6 additions & 6 deletions client/allocrunner/taskrunner/stats_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package taskrunner

import (
"context"
"strings"
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
)

// StatsUpdater is the interface required by the StatsHook to update stats.
Expand Down Expand Up @@ -99,11 +99,11 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
return
}

//XXX This is a net/rpc specific error
// We do not log when the plugin is shutdown as this is simply a
// race between the stopCollection channel being closed and calling
// Stats on the handle.
if !strings.Contains(err.Error(), "connection is shut down") {
// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
}

Expand Down
81 changes: 64 additions & 17 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/hclutils"
)

const (
Expand Down Expand Up @@ -408,8 +409,10 @@ MAIN:
}

// Grab the result proxy and wait for task to exit
WAIT:
{
handle := tr.getDriverHandle()
result = nil

// Do *not* use tr.killCtx here as it would cause
// Wait() to unblock before the task exits when Kill()
Expand All @@ -418,12 +421,15 @@ MAIN:
tr.logger.Error("wait task failed", "error", err)
} else {
select {
case result = <-resultCh:
// WaitCh returned a result
tr.handleTaskExitResult(result)
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
case result = <-resultCh:
}

// WaitCh returned a result
if retryWait := tr.handleTaskExitResult(result); retryWait {
goto WAIT
}
}
}
Expand Down Expand Up @@ -467,9 +473,37 @@ MAIN:
tr.logger.Debug("task run loop exiting")
}

func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) {
// handleTaskExitResult handles the results returned by the task exiting. If
// retryWait is true, the caller should attempt to wait on the task again since
schmichael marked this conversation as resolved.
Show resolved Hide resolved
// it has not actually finished running. This can happen if the driver plugin
// has exited.
func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) {
if result == nil {
return
return false
}

if result.Err == bstructs.ErrPluginShutdown {
dn := tr.Task().Driver
tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn)

// Initialize a new driver handle
if err := tr.initDriver(); err != nil {
tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn)
return false
}

// Try to restore the handle
tr.stateLock.RLock()
h := tr.localState.TaskHandle
net := tr.localState.DriverNetwork
tr.stateLock.RUnlock()
if !tr.restoreHandle(h, net) {
tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn)
return false
}

tr.logger.Debug("task successfully recovered on driver", "driver", dn)
return true
}

event := structs.NewTaskEvent(structs.TaskTerminated).
Expand All @@ -483,6 +517,8 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) {
if result.OOMKilled && !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels)
}

return false
}

// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits
Expand Down Expand Up @@ -530,7 +566,6 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
// runDriver runs the driver and waits for it to exit
func (tr *TaskRunner) runDriver() error {

// TODO(nickethier): make sure this uses alloc.AllocatedResources once #4750 is rebased
taskConfig := tr.buildTaskConfig()

// Build hcl context variables
Expand All @@ -556,10 +591,10 @@ func (tr *TaskRunner) runDriver() error {

evalCtx := &hcl.EvalContext{
Variables: vars,
Functions: shared.GetStdlibFuncs(),
Functions: hclutils.GetStdlibFuncs(),
}

val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
val, diag := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx)
if diag.HasErrors() {
return multierror.Append(errors.New("failed to parse config"), diag.Errs()...)
}
Expand All @@ -568,8 +603,6 @@ func (tr *TaskRunner) runDriver() error {
return fmt.Errorf("failed to encode driver config: %v", err)
}

//XXX Evaluate and encode driver config

// If there's already a task handle (eg from a Restore) there's nothing
// to do except update state.
if tr.getDriverHandle() != nil {
Expand All @@ -586,7 +619,20 @@ func (tr *TaskRunner) runDriver() error {
// Start the job if there's no existing handle (or if RecoverTask failed)
handle, net, err := tr.driver.StartTask(taskConfig)
if err != nil {
return fmt.Errorf("driver start failed: %v", err)
// The plugin has died, try relaunching it
if err == bstructs.ErrPluginShutdown {
tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover")
if err := tr.initDriver(); err != nil {
return fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err)
}

handle, net, err = tr.driver.StartTask(taskConfig)
if err != nil {
return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err)
}
} else {
return fmt.Errorf("driver start failed: %v", err)
}
}

tr.stateLock.Lock()
Expand Down Expand Up @@ -735,16 +781,16 @@ func (tr *TaskRunner) Restore() error {
// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask
// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is
// called.
func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) {
func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) (success bool) {
// Ensure handle is well-formed
if taskHandle.Config == nil {
return
return true
}

if err := tr.driver.RecoverTask(taskHandle); err != nil {
if tr.TaskState().State != structs.TaskStateRunning {
// RecoverTask should fail if the Task wasn't running
return
return true
}

tr.logger.Error("error recovering task; cleaning up",
Expand All @@ -760,14 +806,15 @@ func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstruct
"error", err, "task_id", taskHandle.Config.ID)
}

return false
}

return
return true
}

// Update driver handle on task runner
tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net))
return
return true
}

// UpdateState sets the task runners allocation state and triggers a server
Expand Down
8 changes: 6 additions & 2 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error {
handle := tr.getDriverHandle()
net := handle.Network()

// Pass the lazy handle to the hooks so even if the driver exits and we
// launch a new one (external plugin), the handle will refresh.
lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger)

var merr multierror.Error
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskPoststartHook)
Expand All @@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error {
}

req := interfaces.TaskPoststartRequest{
DriverExec: handle,
DriverExec: lazyHandle,
DriverNetwork: net,
DriverStats: handle,
DriverStats: lazyHandle,
TaskEnv: tr.envBuilder.Build(),
}
var resp interfaces.TaskPoststartResponse
Expand Down
Loading