diff --git a/client/allocrunner/taskrunner/lazy_handle.go b/client/allocrunner/taskrunner/lazy_handle.go new file mode 100644 index 000000000000..a3dbaed6b86f --- /dev/null +++ b/client/allocrunner/taskrunner/lazy_handle.go @@ -0,0 +1,152 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" +) + +const ( + // retrieveBackoffBaseline is the baseline time for exponential backoff while + // retrieving a handle. + retrieveBackoffBaseline = 250 * time.Millisecond + + // retrieveBackoffLimit is the limit of the exponential backoff for + // retrieving a handle. + retrieveBackoffLimit = 5 * time.Second + + // retrieveFailureLimit is how many times we will attempt to retrieve a + // new handle before giving up. + retrieveFailureLimit = 5 +) + +// retrieveHandleFn is used to retrieve the latest driver handle +type retrieveHandleFn func() *DriverHandle + +// LazyHandle is used to front calls to a DriverHandle where it is expected the +// existing handle may no longer be valid because the backing plugin has +// shutdown. LazyHandle detects the plugin shutting down and retrieves a new +// handle so that the consumer does not need to worry whether the handle is to +// the latest driver instance. +type LazyHandle struct { + // retrieveHandle is used to retrieve the latest handle + retrieveHandle retrieveHandleFn + + // h is the current handle and may be nil + h *DriverHandle + + // shutdownCtx is used to cancel retries if the agent is shutting down + shutdownCtx context.Context + + logger log.Logger + sync.Mutex +} + +// NewLazyHandle takes the function to receive the latest handle and a logger +// and returns a LazyHandle +func NewLazyHandle(shutdownCtx context.Context, fn retrieveHandleFn, logger log.Logger) *LazyHandle { + return &LazyHandle{ + retrieveHandle: fn, + h: fn(), + shutdownCtx: shutdownCtx, + logger: logger.Named("lazy_handle"), + } +} + +// getHandle returns the current handle or retrieves a new one +func (l *LazyHandle) getHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + + if l.h != nil { + return l.h, nil + } + + return l.refreshHandleLocked() +} + +// refreshHandle retrieves a new handle +func (l *LazyHandle) refreshHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + return l.refreshHandleLocked() +} + +// refreshHandleLocked retrieves a new handle and should be called with the lock +// held. It will retry to give the client time to restart the driver and restore +// the handle. +func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) { + for i := 0; i < retrieveFailureLimit; i++ { + l.h = l.retrieveHandle() + if l.h != nil { + return l.h, nil + } + + // Calculate the new backoff + backoff := (1 << (2 * uint64(i))) * retrieveBackoffBaseline + if backoff > retrieveBackoffLimit { + backoff = retrieveBackoffLimit + } + + l.logger.Debug("failed to retrieve handle", "backoff", backoff) + + select { + case <-l.shutdownCtx.Done(): + return nil, l.shutdownCtx.Err() + case <-time.After(backoff): + } + } + + return nil, fmt.Errorf("no driver handle") +} + +func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + h, err := l.getHandle() + if err != nil { + return nil, 0, err + } + + // Only retry once + first := true + +TRY: + out, c, err := h.Exec(timeout, cmd, args) + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, c, err +} + +func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) { + h, err := l.getHandle() + if err != nil { + return nil, err + } + + // Only retry once + first := true + +TRY: + out, err := h.Stats() + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, err +} diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 01733e49537c..4a0362c0bbb5 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -2,13 +2,13 @@ package taskrunner import ( "context" - "strings" "sync" "time" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" ) // StatsUpdater is the interface required by the StatsHook to update stats. @@ -99,11 +99,11 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto return } - //XXX This is a net/rpc specific error - // We do not log when the plugin is shutdown as this is simply a - // race between the stopCollection channel being closed and calling - // Stats on the handle. - if !strings.Contains(err.Error(), "connection is shut down") { + // We do not log when the plugin is shutdown since this is + // likely because the driver plugin has unexpectedly exited, + // in which case sleeping and trying again or returning based + // on the stop channel is the correct behavior + if err != bstructs.ErrPluginShutdown { h.logger.Debug("error fetching stats of task", "error", err) } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 88eedf3c8d33..e120f0ca5b0a 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -28,9 +28,10 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" ) const ( @@ -408,8 +409,10 @@ MAIN: } // Grab the result proxy and wait for task to exit + WAIT: { handle := tr.getDriverHandle() + result = nil // Do *not* use tr.killCtx here as it would cause // Wait() to unblock before the task exits when Kill() @@ -418,12 +421,15 @@ MAIN: tr.logger.Error("wait task failed", "error", err) } else { select { - case result = <-resultCh: - // WaitCh returned a result - tr.handleTaskExitResult(result) case <-tr.ctx.Done(): // TaskRunner was told to exit immediately return + case result = <-resultCh: + } + + // WaitCh returned a result + if retryWait := tr.handleTaskExitResult(result); retryWait { + goto WAIT } } } @@ -467,9 +473,37 @@ MAIN: tr.logger.Debug("task run loop exiting") } -func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) { +// handleTaskExitResult handles the results returned by the task exiting. If +// retryWait is true, the caller should attempt to wait on the task again since +// it has not actually finished running. This can happen if the driver plugin +// has exited. +func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) { if result == nil { - return + return false + } + + if result.Err == bstructs.ErrPluginShutdown { + dn := tr.Task().Driver + tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn) + + // Initialize a new driver handle + if err := tr.initDriver(); err != nil { + tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn) + return false + } + + // Try to restore the handle + tr.stateLock.RLock() + h := tr.localState.TaskHandle + net := tr.localState.DriverNetwork + tr.stateLock.RUnlock() + if !tr.restoreHandle(h, net) { + tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn) + return false + } + + tr.logger.Debug("task successfully recovered on driver", "driver", dn) + return true } event := structs.NewTaskEvent(structs.TaskTerminated). @@ -483,6 +517,8 @@ func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) { if result.OOMKilled && !tr.clientConfig.DisableTaggedMetrics { metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels) } + + return false } // handleUpdates runs update hooks when triggerUpdateCh is ticked and exits @@ -530,7 +566,6 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { // runDriver runs the driver and waits for it to exit func (tr *TaskRunner) runDriver() error { - // TODO(nickethier): make sure this uses alloc.AllocatedResources once #4750 is rebased taskConfig := tr.buildTaskConfig() // Build hcl context variables @@ -556,10 +591,10 @@ func (tr *TaskRunner) runDriver() error { evalCtx := &hcl.EvalContext{ Variables: vars, - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } - val, diag := shared.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) + val, diag := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, evalCtx) if diag.HasErrors() { return multierror.Append(errors.New("failed to parse config"), diag.Errs()...) } @@ -568,8 +603,6 @@ func (tr *TaskRunner) runDriver() error { return fmt.Errorf("failed to encode driver config: %v", err) } - //XXX Evaluate and encode driver config - // If there's already a task handle (eg from a Restore) there's nothing // to do except update state. if tr.getDriverHandle() != nil { @@ -586,7 +619,20 @@ func (tr *TaskRunner) runDriver() error { // Start the job if there's no existing handle (or if RecoverTask failed) handle, net, err := tr.driver.StartTask(taskConfig) if err != nil { - return fmt.Errorf("driver start failed: %v", err) + // The plugin has died, try relaunching it + if err == bstructs.ErrPluginShutdown { + tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover") + if err := tr.initDriver(); err != nil { + return fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err) + } + + handle, net, err = tr.driver.StartTask(taskConfig) + if err != nil { + return fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err) + } + } else { + return fmt.Errorf("driver start failed: %v", err) + } } tr.stateLock.Lock() @@ -735,16 +781,16 @@ func (tr *TaskRunner) Restore() error { // restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask // and sets the driver handle. If the TaskHandle is not valid, DestroyTask is // called. -func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) { +func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstructs.DriverNetwork) (success bool) { // Ensure handle is well-formed if taskHandle.Config == nil { - return + return true } if err := tr.driver.RecoverTask(taskHandle); err != nil { if tr.TaskState().State != structs.TaskStateRunning { // RecoverTask should fail if the Task wasn't running - return + return true } tr.logger.Error("error recovering task; cleaning up", @@ -760,14 +806,15 @@ func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstruct "error", err, "task_id", taskHandle.Config.ID) } + return false } - return + return true } // Update driver handle on task runner tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net)) - return + return true } // UpdateState sets the task runners allocation state and triggers a server diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 6aa0da23a2d6..e1ec1d675759 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error { handle := tr.getDriverHandle() net := handle.Network() + // Pass the lazy handle to the hooks so even if the driver exits and we + // launch a new one (external plugin), the handle will refresh. + lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger) + var merr multierror.Error for _, hook := range tr.runnerHooks { post, ok := hook.(interfaces.TaskPoststartHook) @@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error { } req := interfaces.TaskPoststartRequest{ - DriverExec: handle, + DriverExec: lazyHandle, DriverNetwork: net, - DriverStats: handle, + DriverStats: lazyHandle, TaskEnv: tr.envBuilder.Build(), } var resp interfaces.TaskPoststartResponse diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 0ab5be2ffc8a..a28d9591e5c2 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -337,3 +337,90 @@ func TestTaskRunner_Restore_HookEnv(t *testing.T) { require.Contains(env, "mock_hook") require.Equal("1", env["mock_hook"]) } + +// This test asserts that we can recover from an "external" plugin exiting by +// retrieving a new instance of the driver and recovering the task. +func TestTaskRunner_RecoverFromDriverExiting(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create an allocation using the mock driver that exits simulating the + // driver crashing. We can then test that the task runner recovers from this + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "plugin_exit_after": "1s", + "run_for": "5s", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB() // "persist" state between prestart calls + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(err) + + start := time.Now() + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for the task to be running + testWaitForTaskToStart(t, tr) + + // Get the task ID + tr.stateLock.RLock() + l := tr.localState.TaskHandle + require.NotNil(l) + require.NotNil(l.Config) + require.NotEmpty(l.Config.ID) + id := l.Config.ID + tr.stateLock.RUnlock() + + // Get the mock driver plugin + driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) + require.NoError(err) + mockDriver := driverPlugin.(*mockdriver.Driver) + + // Wait for the task to start + testutil.WaitForResult(func() (bool, error) { + // Get the handle and check that it was recovered + handle := mockDriver.GetHandle(id) + if handle == nil { + return false, fmt.Errorf("nil handle") + } + if !handle.Recovered { + return false, fmt.Errorf("handle not recovered") + } + return true, nil + }, func(err error) { + t.Fatal(err.Error()) + }) + + // Wait for task to complete + select { + case <-tr.WaitCh(): + case <-time.After(10 * time.Second): + } + + // Ensure that we actually let the task complete + require.True(time.Now().Sub(start) > 5*time.Second) + + // Check it finished successfully + state := tr.TaskState() + require.True(state.Successful()) +} + +// testWaitForTaskToStart waits for the task to or fails the test +func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { + // Wait for the task to start + testutil.WaitForResult(func() (bool, error) { + tr.stateLock.RLock() + started := !tr.state.StartedAt.IsZero() + tr.stateLock.RUnlock() + + return started, nil + }, func(err error) { + t.Fatalf("not started") + }) +} diff --git a/client/client.go b/client/client.go index 7abadd4254e0..df1b75eae953 100644 --- a/client/client.go +++ b/client/client.go @@ -622,9 +622,6 @@ func (c *Client) Shutdown() error { } c.logger.Info("shutting down") - // Shutdown the plugin managers - c.pluginManagers.Shutdown() - // Stop renewing tokens and secrets if c.vaultClient != nil { c.vaultClient.Stop() @@ -649,6 +646,9 @@ func (c *Client) Shutdown() error { } arGroup.Wait() + // Shutdown the plugin managers + c.pluginManagers.Shutdown() + c.shutdown = true close(c.shutdownCh) diff --git a/client/devicemanager/instance.go b/client/devicemanager/instance.go index f834062e1fe7..0837abf10aad 100644 --- a/client/devicemanager/instance.go +++ b/client/devicemanager/instance.go @@ -10,6 +10,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" @@ -363,7 +364,7 @@ START: // Handle any errors if fresp.Error != nil { - if fresp.Error == base.ErrPluginShutdown { + if fresp.Error == bstructs.ErrPluginShutdown { i.logger.Error("plugin exited unexpectedly") goto START } @@ -488,7 +489,7 @@ START: // Handle any errors if sresp.Error != nil { - if sresp.Error == base.ErrPluginShutdown { + if sresp.Error == bstructs.ErrPluginShutdown { i.logger.Error("plugin exited unexpectedly") goto START } diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index 92fff1972602..52d804894e5a 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "sync" "time" hclog "github.com/hashicorp/go-hclog" @@ -84,12 +85,22 @@ type TaskLogger struct { } func (tl *TaskLogger) Close() { + var wg sync.WaitGroup if tl.lro != nil { - tl.lro.Close() + wg.Add(1) + go func() { + tl.lro.Close() + wg.Done() + }() } if tl.lre != nil { - tl.lre.Close() + wg.Add(1) + go func() { + tl.lre.Close() + wg.Done() + }() } + wg.Wait() } func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { diff --git a/client/logmon/plugin.go b/client/logmon/plugin.go index 3e0db73179d0..3e4632ccfdb2 100644 --- a/client/logmon/plugin.go +++ b/client/logmon/plugin.go @@ -15,7 +15,7 @@ import ( // LaunchLogMon an instance of logmon // TODO: Integrate with base plugin loader func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { - logger = logger.Named("logmon-launcher") + logger = logger.Named("logmon") bin, err := discover.NomadExecutable() if err != nil { return nil, nil, err @@ -24,12 +24,13 @@ func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: base.Handshake, Plugins: map[string]plugin.Plugin{ - "logmon": NewPlugin(NewLogMon(hclog.L().Named("logmon"))), + "logmon": &Plugin{}, }, Cmd: exec.Command(bin, "logmon"), AllowedProtocols: []plugin.Protocol{ plugin.ProtocolGRPC, }, + Logger: logger, }) rpcClient, err := client.Client() diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index a9f21eac6a82..fe42c196ea3c 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -9,6 +9,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/loader" "github.com/hashicorp/nomad/plugins/shared/singleton" @@ -447,6 +448,11 @@ func (i *instanceManager) handleEvents() { // handleEvent looks up the event handler(s) for the event and runs them func (i *instanceManager) handleEvent(ev *drivers.TaskEvent) { + // Do not emit that the plugin is shutdown + if ev.Err != nil && ev.Err == bstructs.ErrPluginShutdown { + return + } + if handler := i.eventHandlerFactory(ev.AllocID, ev.TaskName); handler != nil { i.logger.Trace("task event received", "event", ev) handler(ev) diff --git a/client/structs/structs.go b/client/structs/structs.go index 8f6da2d7909d..6edcbf8d01f0 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -180,6 +180,10 @@ type MemoryStats struct { } func (ms *MemoryStats) Add(other *MemoryStats) { + if other == nil { + return + } + ms.RSS += other.RSS ms.Cache += other.Cache ms.Swap += other.Swap @@ -203,6 +207,10 @@ type CpuStats struct { } func (cs *CpuStats) Add(other *CpuStats) { + if other == nil { + return + } + cs.SystemMode += other.SystemMode cs.UserMode += other.UserMode cs.TotalTicks += other.TotalTicks @@ -229,7 +237,7 @@ func (ru *ResourceUsage) Add(other *ResourceUsage) { // and the resource usage of the individual pids type TaskResourceUsage struct { ResourceUsage *ResourceUsage - Timestamp int64 + Timestamp int64 // UnixNano Pids map[string]*ResourceUsage } diff --git a/command/executor_plugin.go b/command/executor_plugin.go index da75450e037a..7e7522bc54eb 100644 --- a/command/executor_plugin.go +++ b/command/executor_plugin.go @@ -6,7 +6,7 @@ import ( "strings" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/drivers/shared/executor" "github.com/hashicorp/nomad/plugins/base" diff --git a/command/logmon_plugin.go b/command/logmon_plugin.go index f9329e4237f8..57d7eb8c4699 100644 --- a/command/logmon_plugin.go +++ b/command/logmon_plugin.go @@ -25,10 +25,15 @@ func (e *LogMonPluginCommand) Synopsis() string { } func (e *LogMonPluginCommand) Run(args []string) int { + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + JSONFormat: true, + Name: "logmon", + }) plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: base.Handshake, Plugins: map[string]plugin.Plugin{ - "logmon": logmon.NewPlugin(logmon.NewLogMon(hclog.Default().Named("logmon"))), + "logmon": logmon.NewPlugin(logmon.NewLogMon(logger)), }, GRPCServer: plugin.DefaultGRPCServer, }) diff --git a/drivers/docker/cmd/main.go b/drivers/docker/cmd/main.go new file mode 100644 index 000000000000..e0f898f3aaf0 --- /dev/null +++ b/drivers/docker/cmd/main.go @@ -0,0 +1,44 @@ +// This package provides a mechanism to build the Docker driver plugin as an +// external binary. The binary has two entry points; the docker driver and the +// docker plugin's logging child binary. An example of using this is `go build +// -o 1 { + // Detect if we are being launched as a docker logging plugin + switch os.Args[1] { + case docklog.PluginName: + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: base.Handshake, + Plugins: map[string]plugin.Plugin{ + docklog.PluginName: docklog.NewPlugin(docklog.NewDockerLogger(log.Default().Named(docklog.PluginName))), + }, + GRPCServer: plugin.DefaultGRPCServer, + }) + + return + } + } + + // Serve the plugin + plugins.Serve(factory) +} + +// factory returns a new instance of the docker driver plugin +func factory(log log.Logger) interface{} { + return docker.NewDockerDriver(log) +} diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go index eee5c426e0f9..dec127da402d 100644 --- a/drivers/exec/driver_test.go +++ b/drivers/exec/driver_test.go @@ -21,8 +21,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" @@ -606,11 +606,11 @@ touch: cannot touch '/tmp/task-path-ro/testfile-from-ro': Read-only file system` func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/java/driver_test.go b/drivers/java/driver_test.go index 89966c571705..ce0ab2221149 100644 --- a/drivers/java/driver_test.go +++ b/drivers/java/driver_test.go @@ -19,8 +19,8 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -272,11 +272,11 @@ func encodeDriverHelper(t *testing.T, task *drivers.TaskConfig, taskConfig map[s t.Helper() evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(t, diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.Empty(t, diag.Errs()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(t, err) diff --git a/drivers/lxc/driver_test.go b/drivers/lxc/driver_test.go index 8a79510a12dc..313c9e480d62 100644 --- a/drivers/lxc/driver_test.go +++ b/drivers/lxc/driver_test.go @@ -19,8 +19,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" lxc "gopkg.in/lxc/go-lxc.v2" @@ -269,11 +269,11 @@ func requireLXC(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index f73335474c13..07b7344a6203 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -67,6 +68,7 @@ var ( "start_error_recoverable": hclspec.NewAttr("start_error_recoverable", "bool", false), "start_block_for": hclspec.NewAttr("start_block_for", "string", false), "kill_after": hclspec.NewAttr("kill_after", "string", false), + "plugin_exit_after": hclspec.NewAttr("plugin_exit_after", "string", false), "run_for": hclspec.NewAttr("run_for", "string", false), "exit_code": hclspec.NewAttr("exit_code", "number", false), "exit_signal": hclspec.NewAttr("exit_signal", "number", false), @@ -153,6 +155,10 @@ type Config struct { // TaskConfig is the driver configuration of a task within a job type TaskConfig struct { + // PluginExitAfter is the duration after which the mock driver indicates the + // plugin has exited via the WaitTask call. + PluginExitAfter string `codec:"plugin_exit_after"` + pluginExitAfterDuration time.Duration // StartErr specifies the error that should be returned when starting the // mock driver. @@ -213,8 +219,7 @@ type TaskConfig struct { } type MockTaskState struct { - TaskConfig *drivers.TaskConfig - StartedAt time.Time + StartedAt time.Time } func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { @@ -289,42 +294,95 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } -func (d *Driver) RecoverTask(h *drivers.TaskHandle) error { - if h == nil { +func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { + if handle == nil { return fmt.Errorf("handle cannot be nil") } - if _, ok := d.tasks.Get(h.Config.ID); ok { - d.logger.Debug("nothing to recover; task already exists", - "task_id", h.Config.ID, - "task_name", h.Config.Name, - ) - return nil + // Unmarshall the driver state and create a new handle + var taskState MockTaskState + if err := handle.GetDriverState(&taskState); err != nil { + d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID) + return fmt.Errorf("failed to decode task state from handle: %v", err) } - // Recovering a task requires the task to be running external to the - // plugin. Since the mock_driver runs all tasks in process it cannot - // recover tasks. - return fmt.Errorf("%s cannot recover tasks", pluginName) + driverCfg, err := parseDriverConfig(handle.Config) + if err != nil { + d.logger.Error("failed to parse driver config from handle", "error", err, "task_id", handle.Config.ID, "config", hclog.Fmt("%+v", handle.Config)) + return fmt.Errorf("failed to parse driver config from handle: %v", err) + } + + // Remove the plugin exit time if set + driverCfg.pluginExitAfterDuration = 0 + + // Correct the run_for time based on how long it has already been running + now := time.Now() + driverCfg.runForDuration = driverCfg.runForDuration - now.Sub(taskState.StartedAt) + + h := newTaskHandle(handle.Config, driverCfg, d.logger) + h.Recovered = true + d.tasks.Set(handle.Config.ID, h) + go h.run() + return nil } -func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { +func parseDriverConfig(cfg *drivers.TaskConfig) (*TaskConfig, error) { var driverConfig TaskConfig if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { - return nil, nil, err + return nil, err } var err error if driverConfig.startBlockForDuration, err = parseDuration(driverConfig.StartBlockFor); err != nil { - return nil, nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err) + return nil, fmt.Errorf("start_block_for %v not a valid duration: %v", driverConfig.StartBlockFor, err) } if driverConfig.runForDuration, err = parseDuration(driverConfig.RunFor); err != nil { - return nil, nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err) + return nil, fmt.Errorf("run_for %v not a valid duration: %v", driverConfig.RunFor, err) + } + + if driverConfig.pluginExitAfterDuration, err = parseDuration(driverConfig.PluginExitAfter); err != nil { + return nil, fmt.Errorf("plugin_exit_after %v not a valid duration: %v", driverConfig.PluginExitAfter, err) } if driverConfig.stdoutRepeatDuration, err = parseDuration(driverConfig.StdoutRepeatDur); err != nil { - return nil, nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err) + return nil, fmt.Errorf("stdout_repeat_duration %v not a valid duration: %v", driverConfig.stdoutRepeatDuration, err) + } + + return &driverConfig, nil +} + +func newTaskHandle(cfg *drivers.TaskConfig, driverConfig *TaskConfig, logger hclog.Logger) *taskHandle { + killCtx, killCancel := context.WithCancel(context.Background()) + h := &taskHandle{ + taskConfig: cfg, + runFor: driverConfig.runForDuration, + pluginExitAfter: driverConfig.pluginExitAfterDuration, + killAfter: driverConfig.killAfterDuration, + exitCode: driverConfig.ExitCode, + exitSignal: driverConfig.ExitSignal, + stdoutString: driverConfig.StdoutString, + stdoutRepeat: driverConfig.StdoutRepeat, + stdoutRepeatDur: driverConfig.stdoutRepeatDuration, + logger: logger.With("task_name", cfg.Name), + waitCh: make(chan struct{}), + killCh: killCtx.Done(), + kill: killCancel, + startedAt: time.Now(), + } + if driverConfig.ExitErrMsg != "" { + h.exitErr = errors.New(driverConfig.ExitErrMsg) + } + if driverConfig.SignalErr != "" { + h.signalErr = fmt.Errorf(driverConfig.SignalErr) + } + return h +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { + driverConfig, err := parseDriverConfig(cfg) + if err != nil { + return nil, nil, err } if driverConfig.startBlockForDuration != 0 { @@ -334,7 +392,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru // Store last configs d.lastMu.Lock() d.lastDriverTaskConfig = cfg - d.lastTaskConfig = &driverConfig + d.lastTaskConfig = driverConfig d.lastMu.Unlock() if driverConfig.StartErr != "" { @@ -358,32 +416,9 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstru net.PortMap = map[string]int{parts[0]: port} } - killCtx, killCancel := context.WithCancel(context.Background()) - - h := &taskHandle{ - taskConfig: cfg, - runFor: driverConfig.runForDuration, - killAfter: driverConfig.killAfterDuration, - exitCode: driverConfig.ExitCode, - exitSignal: driverConfig.ExitSignal, - stdoutString: driverConfig.StdoutString, - stdoutRepeat: driverConfig.StdoutRepeat, - stdoutRepeatDur: driverConfig.stdoutRepeatDuration, - logger: d.logger.With("task_name", cfg.Name), - waitCh: make(chan struct{}), - killCh: killCtx.Done(), - kill: killCancel, - } - if driverConfig.ExitErrMsg != "" { - h.exitErr = errors.New(driverConfig.ExitErrMsg) - } - if driverConfig.SignalErr != "" { - h.signalErr = fmt.Errorf(driverConfig.SignalErr) - } - + h := newTaskHandle(cfg, driverConfig, d.logger) driverState := MockTaskState{ - TaskConfig: cfg, - StartedAt: h.startedAt, + StartedAt: h.startedAt, } handle := drivers.NewTaskHandle(pluginName) handle.Config = cfg @@ -461,8 +496,17 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { } func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { - //TODO return an error? - return nil, nil + // Generate random value for the memory usage + s := &cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{ + RSS: rand.Uint64(), + Measured: []string{"RSS"}, + }, + }, + Timestamp: time.Now().UTC().UnixNano(), + } + return s, nil } func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { @@ -499,3 +543,10 @@ func (d *Driver) GetTaskConfig() (*drivers.TaskConfig, *TaskConfig) { defer d.lastMu.Unlock() return d.lastDriverTaskConfig, d.lastTaskConfig } + +// GetHandle is unique to the mock driver and for testing purposes only. It +// returns the handle of the given task ID +func (d *Driver) GetHandle(taskID string) *taskHandle { + h, _ := d.tasks.Get(taskID) + return h +} diff --git a/drivers/mock/handle.go b/drivers/mock/handle.go index 44ad4932e0ec..81d204d9d219 100644 --- a/drivers/mock/handle.go +++ b/drivers/mock/handle.go @@ -8,6 +8,7 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/lib/fifo" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -16,6 +17,7 @@ type taskHandle struct { logger hclog.Logger runFor time.Duration + pluginExitAfter time.Duration killAfter time.Duration waitCh chan struct{} exitCode int @@ -39,6 +41,9 @@ type taskHandle struct { // Calling kill closes killCh if it is not already closed kill context.CancelFunc killCh <-chan struct{} + + // Recovered is set to true if the handle was created while being recovered + Recovered bool } func (h *taskHandle) TaskStatus() *drivers.TaskStatus { @@ -79,18 +84,30 @@ func (h *taskHandle) run() { errCh := make(chan error, 1) // Setup logging output - if h.stdoutString != "" { - go h.handleLogging(errCh) - } + go h.handleLogging(errCh) timer := time.NewTimer(h.runFor) defer timer.Stop() + var pluginExitTimer <-chan time.Time + if h.pluginExitAfter != 0 { + timer := time.NewTimer(h.pluginExitAfter) + defer timer.Stop() + pluginExitTimer = timer.C + } + select { case <-timer.C: h.logger.Debug("run_for time elapsed; exiting", "run_for", h.runFor) case <-h.killCh: h.logger.Debug("killed; exiting") + case <-pluginExitTimer: + h.logger.Debug("exiting plugin") + h.exitResult = &drivers.ExitResult{ + Err: bstructs.ErrPluginShutdown, + } + + return case err := <-errCh: h.logger.Error("error running mock task; exiting", "error", err) h.exitResult = &drivers.ExitResult{ @@ -114,6 +131,18 @@ func (h *taskHandle) handleLogging(errCh chan<- error) { errCh <- err return } + stderr, err := fifo.Open(h.taskConfig.StderrPath) + if err != nil { + h.logger.Error("failed to write to stderr", "error", err) + errCh <- err + return + } + defer stderr.Close() + + if h.stdoutString == "" { + return + } + if _, err := io.WriteString(stdout, h.stdoutString); err != nil { h.logger.Error("failed to write to stdout", "error", err) errCh <- err diff --git a/drivers/qemu/driver_test.go b/drivers/qemu/driver_test.go index 53901f0b7612..d0e204c52394 100644 --- a/drivers/qemu/driver_test.go +++ b/drivers/qemu/driver_test.go @@ -17,8 +17,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -203,11 +203,11 @@ func TestQemuDriver_GetMonitorPathNewQemu(t *testing.T) { //encodeDriverhelper sets up the task config spec and encodes qemu specific driver configuration func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors(), diag.Error()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors(), diag.Error()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index b65becafa2f1..186475747a2e 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -20,8 +20,8 @@ import ( basePlug "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -500,11 +500,11 @@ func TestRawExecDriver_Exec(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) require.False(diag.HasErrors()) err := task.EncodeDriverConfig(taskConfigCtyVal) require.Nil(err) diff --git a/drivers/rkt/driver.go b/drivers/rkt/driver.go index c2be6494e1be..189b54f478ab 100644 --- a/drivers/rkt/driver.go +++ b/drivers/rkt/driver.go @@ -16,14 +16,15 @@ import ( "regexp" "strconv" "strings" + "sync" "syscall" "time" appcschema "github.com/appc/spec/schema" "github.com/hashicorp/consul-template/signals" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" - "github.com/hashicorp/go-version" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" @@ -193,6 +194,10 @@ type Driver struct { // logger will log to the Nomad agent logger hclog.Logger + + // hasFingerprinted is used to store whether we have fingerprinted before + hasFingerprinted bool + fingerprintLock sync.Mutex } func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin { @@ -261,7 +266,25 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerp } } +// setFingerprinted marks the driver as having fingerprinted once before +func (d *Driver) setFingerprinted() { + d.fingerprintLock.Lock() + d.hasFingerprinted = true + d.fingerprintLock.Unlock() +} + +// fingerprinted returns whether the driver has fingerprinted before +func (d *Driver) fingerprinted() bool { + d.fingerprintLock.Lock() + defer d.fingerprintLock.Unlock() + return d.hasFingerprinted +} + func (d *Driver) buildFingerprint() *drivers.Fingerprint { + defer func() { + d.setFingerprinted() + }() + fingerprint := &drivers.Fingerprint{ Attributes: map[string]*pstructs.Attribute{}, Health: drivers.HealthStateHealthy, @@ -270,6 +293,9 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { // Only enable if we are root if syscall.Geteuid() != 0 { + if !d.fingerprinted() { + d.logger.Debug("must run as root user, disabling") + } fingerprint.Health = drivers.HealthStateUndetected fingerprint.HealthDescription = drivers.DriverRequiresRootMessage return fingerprint @@ -297,6 +323,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { // Do not allow ancient rkt versions fingerprint.Health = drivers.HealthStateUndetected fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion) + if !d.fingerprinted() { + d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(), + "rkt_version", currentVersion) + } return fingerprint } diff --git a/drivers/rkt/driver_test.go b/drivers/rkt/driver_test.go index 95a75e3719e4..2df70aa97428 100644 --- a/drivers/rkt/driver_test.go +++ b/drivers/rkt/driver_test.go @@ -22,8 +22,8 @@ import ( basePlug "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" @@ -874,11 +874,11 @@ func TestRktDriver_Stats(t *testing.T) { func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { evalCtx := &hcl.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } spec, diag := hclspec.Convert(taskConfigSpec) require.False(diag.HasErrors()) - taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + taskConfigCtyVal, diag := hclutils.ParseHclInterface(taskConfig, spec, evalCtx) if diag.HasErrors() { fmt.Println("conversion error", diag.Error()) } diff --git a/drivers/shared/executor/plugins.go b/drivers/shared/executor/plugins.go index d2478e568525..55441e702764 100644 --- a/drivers/shared/executor/plugins.go +++ b/drivers/shared/executor/plugins.go @@ -5,7 +5,7 @@ import ( "net" hclog "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + plugin "github.com/hashicorp/go-plugin" ) // ExecutorConfig is the config that Nomad passes to the executor diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6a333eabce27..796b497d084f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -28,7 +28,7 @@ import ( "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-version" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" @@ -5743,17 +5743,7 @@ func (ts *TaskState) Copy() *TaskState { // have meaning on a non-batch allocation because a service and system // allocation should not finish. func (ts *TaskState) Successful() bool { - l := len(ts.Events) - if ts.State != TaskStateDead || l == 0 { - return false - } - - e := ts.Events[l-1] - if e.Type != TaskTerminated { - return false - } - - return e.ExitCode == 0 + return ts.State == TaskStateDead && !ts.Failed } const ( diff --git a/plugins/base/client.go b/plugins/base/client.go index 80bc7ef4b9cf..9dadaeed4354 100644 --- a/plugins/base/client.go +++ b/plugins/base/client.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/hashicorp/nomad/plugins/base/proto" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" "github.com/hashicorp/nomad/plugins/shared/hclspec" ) @@ -20,7 +21,7 @@ type BasePluginClient struct { func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) { presp, err := b.Client.PluginInfo(b.DoneCtx, &proto.PluginInfoRequest{}) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx) } var ptype string @@ -46,7 +47,7 @@ func (b *BasePluginClient) PluginInfo() (*PluginInfoResponse, error) { func (b *BasePluginClient) ConfigSchema() (*hclspec.Spec, error) { presp, err := b.Client.ConfigSchema(b.DoneCtx, &proto.ConfigSchemaRequest{}) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, b.DoneCtx) } return presp.GetSpec(), nil @@ -60,5 +61,5 @@ func (b *BasePluginClient) SetConfig(c *Config) error { PluginApiVersion: c.ApiVersion, }) - return err + return grpcutils.HandleGrpcErr(err, b.DoneCtx) } diff --git a/plugins/base/plugin.go b/plugins/base/plugin.go index 411c796629f2..f511a3d45067 100644 --- a/plugins/base/plugin.go +++ b/plugins/base/plugin.go @@ -3,7 +3,6 @@ package base import ( "bytes" "context" - "errors" "reflect" plugin "github.com/hashicorp/go-plugin" @@ -30,9 +29,6 @@ var ( MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", } - - // ErrPluginShutdown is returned when the plugin has shutdown. - ErrPluginShutdown = errors.New("plugin is shut down") ) // PluginBase is wraps a BasePlugin and implements go-plugins GRPCPlugin diff --git a/plugins/base/structs/errors.go b/plugins/base/structs/errors.go new file mode 100644 index 000000000000..0a5a7a6d6a7c --- /dev/null +++ b/plugins/base/structs/errors.go @@ -0,0 +1,12 @@ +package structs + +import "errors" + +const ( + errPluginShutdown = "plugin is shut down" +) + +var ( + // ErrPluginShutdown is returned when the plugin has shutdown. + ErrPluginShutdown = errors.New(errPluginShutdown) +) diff --git a/plugins/device/client.go b/plugins/device/client.go index ffbb80166fad..4dc1874532a1 100644 --- a/plugins/device/client.go +++ b/plugins/device/client.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/device/proto" - "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" ) // devicePluginClient implements the client side of a remote device plugin, using @@ -30,12 +30,12 @@ type devicePluginClient struct { // cancelled, the error will be propogated. func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *FingerprintResponse, error) { // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) var req proto.FingerprintRequest - stream, err := d.client.Fingerprint(ctx, &req) + stream, err := d.client.Fingerprint(joinedCtx, &req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } out := make(chan *FingerprintResponse, 1) @@ -47,7 +47,7 @@ func (d *devicePluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri // the gRPC stream to a channel. Exits either when context is cancelled or the // stream has an error. func (d *devicePluginClient) handleFingerprint( - ctx context.Context, + reqCtx context.Context, stream proto.DevicePlugin_FingerprintClient, out chan *FingerprintResponse) { @@ -57,7 +57,7 @@ func (d *devicePluginClient) handleFingerprint( if err != nil { if err != io.EOF { out <- &FingerprintResponse{ - Error: shared.HandleStreamErr(err, ctx, d.doneCtx), + Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -70,7 +70,7 @@ func (d *devicePluginClient) handleFingerprint( Devices: convertProtoDeviceGroups(resp.GetDeviceGroup()), } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case out <- f: } @@ -86,7 +86,7 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, // Make the request resp, err := d.client.Reserve(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } // Convert the response @@ -100,14 +100,14 @@ func (d *devicePluginClient) Reserve(deviceIDs []string) (*ContainerReservation, // propogated. func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) (<-chan *StatsResponse, error) { // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) req := proto.StatsRequest{ CollectionInterval: ptypes.DurationProto(interval), } - stream, err := d.client.Stats(ctx, &req) + stream, err := d.client.Stats(joinedCtx, &req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } out := make(chan *StatsResponse, 1) @@ -119,7 +119,7 @@ func (d *devicePluginClient) Stats(ctx context.Context, interval time.Duration) // the gRPC stream to a channel. Exits either when context is cancelled or the // stream has an error. func (d *devicePluginClient) handleStats( - ctx context.Context, + reqCtx context.Context, stream proto.DevicePlugin_StatsClient, out chan *StatsResponse) { @@ -129,7 +129,7 @@ func (d *devicePluginClient) handleStats( if err != nil { if err != io.EOF { out <- &StatsResponse{ - Error: shared.HandleStreamErr(err, ctx, d.doneCtx), + Error: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -142,7 +142,7 @@ func (d *devicePluginClient) handleStats( Groups: convertProtoDeviceGroupsStats(resp.GetGroups()), } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case out <- s: } diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index edefec092cd5..98adc86328f8 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -8,12 +8,11 @@ import ( "github.com/LK4D4/joincontext" "github.com/golang/protobuf/ptypes" - hclog "github.com/hashicorp/go-hclog" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers/proto" - "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/grpcutils" "github.com/hashicorp/nomad/plugins/shared/hclspec" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" sproto "github.com/hashicorp/nomad/plugins/shared/structs/proto" @@ -26,7 +25,6 @@ type driverPluginClient struct { *base.BasePluginClient client proto.DriverClient - logger hclog.Logger // doneCtx is closed when the plugin exits doneCtx context.Context @@ -37,7 +35,7 @@ func (d *driverPluginClient) TaskConfigSchema() (*hclspec.Spec, error) { resp, err := d.client.TaskConfigSchema(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } return resp.Spec, nil @@ -48,7 +46,7 @@ func (d *driverPluginClient) Capabilities() (*Capabilities, error) { resp, err := d.client.Capabilities(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } caps := &Capabilities{} @@ -76,11 +74,11 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri req := &proto.FingerprintRequest{} // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) - stream, err := d.client.Fingerprint(ctx, req) + stream, err := d.client.Fingerprint(joinedCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } ch := make(chan *Fingerprint, 1) @@ -89,15 +87,14 @@ func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerpri return ch, nil } -func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) { +func (d *driverPluginClient) handleFingerprint(reqCtx context.Context, ch chan *Fingerprint, stream proto.Driver_FingerprintClient) { defer close(ch) for { pb, err := stream.Recv() if err != nil { if err != io.EOF { - d.logger.Error("error receiving stream from Fingerprint driver RPC", "error", err) ch <- &Fingerprint{ - Err: shared.HandleStreamErr(err, ctx, d.doneCtx), + Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -112,7 +109,7 @@ func (d *driverPluginClient) handleFingerprint(ctx context.Context, ch chan *Fin } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case ch <- f: } @@ -125,7 +122,7 @@ func (d *driverPluginClient) RecoverTask(h *TaskHandle) error { req := &proto.RecoverTaskRequest{Handle: taskHandleToProto(h)} _, err := d.client.RecoverTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // StartTask starts execution of a task with the given TaskConfig. A TaskHandle @@ -144,7 +141,7 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr return nil, nil, structs.NewRecoverableError(err, rec.Recoverable) } } - return nil, nil, err + return nil, nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } var net *cstructs.DriverNetwork @@ -168,10 +165,6 @@ func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, *cstructs.Dr // the same task without issue. func (d *driverPluginClient) WaitTask(ctx context.Context, id string) (<-chan *ExitResult, error) { ch := make(chan *ExitResult) - - // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) - go d.handleWaitTask(ctx, id, ch) return ch, nil } @@ -183,9 +176,12 @@ func (d *driverPluginClient) handleWaitTask(ctx context.Context, id string, ch c TaskId: id, } - resp, err := d.client.WaitTask(ctx, req) + // Join the passed context and the shutdown context + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) + + resp, err := d.client.WaitTask(joinedCtx, req) if err != nil { - result.Err = err + result.Err = grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } else { result.ExitCode = int(resp.Result.ExitCode) result.Signal = int(resp.Result.Signal) @@ -209,7 +205,7 @@ func (d *driverPluginClient) StopTask(taskID string, timeout time.Duration, sign } _, err := d.client.StopTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // DestroyTask removes the task from the driver's in memory state. The task @@ -222,7 +218,7 @@ func (d *driverPluginClient) DestroyTask(taskID string, force bool) error { } _, err := d.client.DestroyTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // InspectTask returns status information for a task @@ -231,7 +227,7 @@ func (d *driverPluginClient) InspectTask(taskID string) (*TaskStatus, error) { resp, err := d.client.InspectTask(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } status, err := taskStatusFromProto(resp.Task) @@ -262,7 +258,7 @@ func (d *driverPluginClient) TaskStats(taskID string) (*cstructs.TaskResourceUsa resp, err := d.client.TaskStats(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } stats, err := TaskStatsFromProto(resp.Stats) @@ -279,11 +275,11 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent, req := &proto.TaskEventsRequest{} // Join the passed context and the shutdown context - ctx, _ = joincontext.Join(ctx, d.doneCtx) + joinedCtx, _ := joincontext.Join(ctx, d.doneCtx) - stream, err := d.client.TaskEvents(ctx, req) + stream, err := d.client.TaskEvents(joinedCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleReqCtxGrpcErr(err, ctx, d.doneCtx) } ch := make(chan *TaskEvent, 1) @@ -291,15 +287,14 @@ func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent, return ch, nil } -func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) { +func (d *driverPluginClient) handleTaskEvents(reqCtx context.Context, ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) { defer close(ch) for { ev, err := stream.Recv() if err != nil { if err != io.EOF { - d.logger.Error("error receiving stream from TaskEvents driver RPC", "error", err) ch <- &TaskEvent{ - Err: shared.HandleStreamErr(err, ctx, d.doneCtx), + Err: grpcutils.HandleReqCtxGrpcErr(err, reqCtx, d.doneCtx), } } @@ -317,7 +312,7 @@ func (d *driverPluginClient) handleTaskEvents(ctx context.Context, ch chan *Task Timestamp: timestamp, } select { - case <-ctx.Done(): + case <-reqCtx.Done(): return case ch <- event: } @@ -331,7 +326,7 @@ func (d *driverPluginClient) SignalTask(taskID string, signal string) error { Signal: signal, } _, err := d.client.SignalTask(d.doneCtx, req) - return err + return grpcutils.HandleGrpcErr(err, d.doneCtx) } // ExecTask will run the given command within the execution context of the task. @@ -347,7 +342,7 @@ func (d *driverPluginClient) ExecTask(taskID string, cmd []string, timeout time. resp, err := d.client.ExecTask(d.doneCtx, req) if err != nil { - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } result := &ExecTaskResult{ diff --git a/plugins/drivers/plugin.go b/plugins/drivers/plugin.go index 524123cba182..b3fcfcfc45ae 100644 --- a/plugins/drivers/plugin.go +++ b/plugins/drivers/plugin.go @@ -19,10 +19,9 @@ type PluginDriver struct { logger hclog.Logger } -func NewDriverPlugin(d DriverPlugin, logger hclog.Logger) plugin.GRPCPlugin { +func NewDriverPlugin(d DriverPlugin) plugin.GRPCPlugin { return &PluginDriver{ - impl: d, - logger: logger.Named("driver_plugin"), + impl: d, } } @@ -42,7 +41,6 @@ func (p *PluginDriver) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker Client: baseproto.NewBasePluginClient(c), }, client: proto.NewDriverClient(c), - logger: p.logger, doneCtx: ctx, }, nil } diff --git a/plugins/drivers/proto/driver.pb.go b/plugins/drivers/proto/driver.pb.go index 006a18dec7a8..6c9e3e0f9f97 100644 --- a/plugins/drivers/proto/driver.pb.go +++ b/plugins/drivers/proto/driver.pb.go @@ -50,7 +50,7 @@ func (x TaskState) String() string { return proto.EnumName(TaskState_name, int32(x)) } func (TaskState) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{0} } type FingerprintResponse_HealthState int32 @@ -76,7 +76,7 @@ func (x FingerprintResponse_HealthState) String() string { return proto.EnumName(FingerprintResponse_HealthState_name, int32(x)) } func (FingerprintResponse_HealthState) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{5, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{5, 0} } type StartTaskResponse_Result int32 @@ -102,7 +102,7 @@ func (x StartTaskResponse_Result) String() string { return proto.EnumName(StartTaskResponse_Result_name, int32(x)) } func (StartTaskResponse_Result) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{9, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{9, 0} } type DriverCapabilities_FSIsolation int32 @@ -128,7 +128,7 @@ func (x DriverCapabilities_FSIsolation) String() string { return proto.EnumName(DriverCapabilities_FSIsolation_name, int32(x)) } func (DriverCapabilities_FSIsolation) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{25, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{25, 0} } type CPUUsage_Fields int32 @@ -163,7 +163,7 @@ func (x CPUUsage_Fields) String() string { return proto.EnumName(CPUUsage_Fields_name, int32(x)) } func (CPUUsage_Fields) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{43, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{43, 0} } type MemoryUsage_Fields int32 @@ -195,7 +195,7 @@ func (x MemoryUsage_Fields) String() string { return proto.EnumName(MemoryUsage_Fields_name, int32(x)) } func (MemoryUsage_Fields) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{44, 0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{44, 0} } type TaskConfigSchemaRequest struct { @@ -208,7 +208,7 @@ func (m *TaskConfigSchemaRequest) Reset() { *m = TaskConfigSchemaRequest func (m *TaskConfigSchemaRequest) String() string { return proto.CompactTextString(m) } func (*TaskConfigSchemaRequest) ProtoMessage() {} func (*TaskConfigSchemaRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{0} + return fileDescriptor_driver_66cfa35dd20ec741, []int{0} } func (m *TaskConfigSchemaRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfigSchemaRequest.Unmarshal(m, b) @@ -240,7 +240,7 @@ func (m *TaskConfigSchemaResponse) Reset() { *m = TaskConfigSchemaRespon func (m *TaskConfigSchemaResponse) String() string { return proto.CompactTextString(m) } func (*TaskConfigSchemaResponse) ProtoMessage() {} func (*TaskConfigSchemaResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{1} + return fileDescriptor_driver_66cfa35dd20ec741, []int{1} } func (m *TaskConfigSchemaResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfigSchemaResponse.Unmarshal(m, b) @@ -277,7 +277,7 @@ func (m *CapabilitiesRequest) Reset() { *m = CapabilitiesRequest{} } func (m *CapabilitiesRequest) String() string { return proto.CompactTextString(m) } func (*CapabilitiesRequest) ProtoMessage() {} func (*CapabilitiesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{2} + return fileDescriptor_driver_66cfa35dd20ec741, []int{2} } func (m *CapabilitiesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CapabilitiesRequest.Unmarshal(m, b) @@ -312,7 +312,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} } func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) } func (*CapabilitiesResponse) ProtoMessage() {} func (*CapabilitiesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{3} + return fileDescriptor_driver_66cfa35dd20ec741, []int{3} } func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CapabilitiesResponse.Unmarshal(m, b) @@ -349,7 +349,7 @@ func (m *FingerprintRequest) Reset() { *m = FingerprintRequest{} } func (m *FingerprintRequest) String() string { return proto.CompactTextString(m) } func (*FingerprintRequest) ProtoMessage() {} func (*FingerprintRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{4} + return fileDescriptor_driver_66cfa35dd20ec741, []int{4} } func (m *FingerprintRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FingerprintRequest.Unmarshal(m, b) @@ -392,7 +392,7 @@ func (m *FingerprintResponse) Reset() { *m = FingerprintResponse{} } func (m *FingerprintResponse) String() string { return proto.CompactTextString(m) } func (*FingerprintResponse) ProtoMessage() {} func (*FingerprintResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{5} + return fileDescriptor_driver_66cfa35dd20ec741, []int{5} } func (m *FingerprintResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FingerprintResponse.Unmarshal(m, b) @@ -447,7 +447,7 @@ func (m *RecoverTaskRequest) Reset() { *m = RecoverTaskRequest{} } func (m *RecoverTaskRequest) String() string { return proto.CompactTextString(m) } func (*RecoverTaskRequest) ProtoMessage() {} func (*RecoverTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{6} + return fileDescriptor_driver_66cfa35dd20ec741, []int{6} } func (m *RecoverTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RecoverTaskRequest.Unmarshal(m, b) @@ -491,7 +491,7 @@ func (m *RecoverTaskResponse) Reset() { *m = RecoverTaskResponse{} } func (m *RecoverTaskResponse) String() string { return proto.CompactTextString(m) } func (*RecoverTaskResponse) ProtoMessage() {} func (*RecoverTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{7} + return fileDescriptor_driver_66cfa35dd20ec741, []int{7} } func (m *RecoverTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RecoverTaskResponse.Unmarshal(m, b) @@ -523,7 +523,7 @@ func (m *StartTaskRequest) Reset() { *m = StartTaskRequest{} } func (m *StartTaskRequest) String() string { return proto.CompactTextString(m) } func (*StartTaskRequest) ProtoMessage() {} func (*StartTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{8} + return fileDescriptor_driver_66cfa35dd20ec741, []int{8} } func (m *StartTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StartTaskRequest.Unmarshal(m, b) @@ -577,7 +577,7 @@ func (m *StartTaskResponse) Reset() { *m = StartTaskResponse{} } func (m *StartTaskResponse) String() string { return proto.CompactTextString(m) } func (*StartTaskResponse) ProtoMessage() {} func (*StartTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{9} + return fileDescriptor_driver_66cfa35dd20ec741, []int{9} } func (m *StartTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StartTaskResponse.Unmarshal(m, b) @@ -637,7 +637,7 @@ func (m *WaitTaskRequest) Reset() { *m = WaitTaskRequest{} } func (m *WaitTaskRequest) String() string { return proto.CompactTextString(m) } func (*WaitTaskRequest) ProtoMessage() {} func (*WaitTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{10} + return fileDescriptor_driver_66cfa35dd20ec741, []int{10} } func (m *WaitTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_WaitTaskRequest.Unmarshal(m, b) @@ -678,7 +678,7 @@ func (m *WaitTaskResponse) Reset() { *m = WaitTaskResponse{} } func (m *WaitTaskResponse) String() string { return proto.CompactTextString(m) } func (*WaitTaskResponse) ProtoMessage() {} func (*WaitTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{11} + return fileDescriptor_driver_66cfa35dd20ec741, []int{11} } func (m *WaitTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_WaitTaskResponse.Unmarshal(m, b) @@ -730,7 +730,7 @@ func (m *StopTaskRequest) Reset() { *m = StopTaskRequest{} } func (m *StopTaskRequest) String() string { return proto.CompactTextString(m) } func (*StopTaskRequest) ProtoMessage() {} func (*StopTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{12} + return fileDescriptor_driver_66cfa35dd20ec741, []int{12} } func (m *StopTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StopTaskRequest.Unmarshal(m, b) @@ -781,7 +781,7 @@ func (m *StopTaskResponse) Reset() { *m = StopTaskResponse{} } func (m *StopTaskResponse) String() string { return proto.CompactTextString(m) } func (*StopTaskResponse) ProtoMessage() {} func (*StopTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{13} + return fileDescriptor_driver_66cfa35dd20ec741, []int{13} } func (m *StopTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StopTaskResponse.Unmarshal(m, b) @@ -815,7 +815,7 @@ func (m *DestroyTaskRequest) Reset() { *m = DestroyTaskRequest{} } func (m *DestroyTaskRequest) String() string { return proto.CompactTextString(m) } func (*DestroyTaskRequest) ProtoMessage() {} func (*DestroyTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{14} + return fileDescriptor_driver_66cfa35dd20ec741, []int{14} } func (m *DestroyTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroyTaskRequest.Unmarshal(m, b) @@ -859,7 +859,7 @@ func (m *DestroyTaskResponse) Reset() { *m = DestroyTaskResponse{} } func (m *DestroyTaskResponse) String() string { return proto.CompactTextString(m) } func (*DestroyTaskResponse) ProtoMessage() {} func (*DestroyTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{15} + return fileDescriptor_driver_66cfa35dd20ec741, []int{15} } func (m *DestroyTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroyTaskResponse.Unmarshal(m, b) @@ -891,7 +891,7 @@ func (m *InspectTaskRequest) Reset() { *m = InspectTaskRequest{} } func (m *InspectTaskRequest) String() string { return proto.CompactTextString(m) } func (*InspectTaskRequest) ProtoMessage() {} func (*InspectTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{16} + return fileDescriptor_driver_66cfa35dd20ec741, []int{16} } func (m *InspectTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_InspectTaskRequest.Unmarshal(m, b) @@ -934,7 +934,7 @@ func (m *InspectTaskResponse) Reset() { *m = InspectTaskResponse{} } func (m *InspectTaskResponse) String() string { return proto.CompactTextString(m) } func (*InspectTaskResponse) ProtoMessage() {} func (*InspectTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{17} + return fileDescriptor_driver_66cfa35dd20ec741, []int{17} } func (m *InspectTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_InspectTaskResponse.Unmarshal(m, b) @@ -987,7 +987,7 @@ func (m *TaskStatsRequest) Reset() { *m = TaskStatsRequest{} } func (m *TaskStatsRequest) String() string { return proto.CompactTextString(m) } func (*TaskStatsRequest) ProtoMessage() {} func (*TaskStatsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{18} + return fileDescriptor_driver_66cfa35dd20ec741, []int{18} } func (m *TaskStatsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatsRequest.Unmarshal(m, b) @@ -1026,7 +1026,7 @@ func (m *TaskStatsResponse) Reset() { *m = TaskStatsResponse{} } func (m *TaskStatsResponse) String() string { return proto.CompactTextString(m) } func (*TaskStatsResponse) ProtoMessage() {} func (*TaskStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{19} + return fileDescriptor_driver_66cfa35dd20ec741, []int{19} } func (m *TaskStatsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatsResponse.Unmarshal(m, b) @@ -1063,7 +1063,7 @@ func (m *TaskEventsRequest) Reset() { *m = TaskEventsRequest{} } func (m *TaskEventsRequest) String() string { return proto.CompactTextString(m) } func (*TaskEventsRequest) ProtoMessage() {} func (*TaskEventsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{20} + return fileDescriptor_driver_66cfa35dd20ec741, []int{20} } func (m *TaskEventsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskEventsRequest.Unmarshal(m, b) @@ -1097,7 +1097,7 @@ func (m *SignalTaskRequest) Reset() { *m = SignalTaskRequest{} } func (m *SignalTaskRequest) String() string { return proto.CompactTextString(m) } func (*SignalTaskRequest) ProtoMessage() {} func (*SignalTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{21} + return fileDescriptor_driver_66cfa35dd20ec741, []int{21} } func (m *SignalTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SignalTaskRequest.Unmarshal(m, b) @@ -1141,7 +1141,7 @@ func (m *SignalTaskResponse) Reset() { *m = SignalTaskResponse{} } func (m *SignalTaskResponse) String() string { return proto.CompactTextString(m) } func (*SignalTaskResponse) ProtoMessage() {} func (*SignalTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{22} + return fileDescriptor_driver_66cfa35dd20ec741, []int{22} } func (m *SignalTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SignalTaskResponse.Unmarshal(m, b) @@ -1178,7 +1178,7 @@ func (m *ExecTaskRequest) Reset() { *m = ExecTaskRequest{} } func (m *ExecTaskRequest) String() string { return proto.CompactTextString(m) } func (*ExecTaskRequest) ProtoMessage() {} func (*ExecTaskRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{23} + return fileDescriptor_driver_66cfa35dd20ec741, []int{23} } func (m *ExecTaskRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecTaskRequest.Unmarshal(m, b) @@ -1235,7 +1235,7 @@ func (m *ExecTaskResponse) Reset() { *m = ExecTaskResponse{} } func (m *ExecTaskResponse) String() string { return proto.CompactTextString(m) } func (*ExecTaskResponse) ProtoMessage() {} func (*ExecTaskResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{24} + return fileDescriptor_driver_66cfa35dd20ec741, []int{24} } func (m *ExecTaskResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecTaskResponse.Unmarshal(m, b) @@ -1294,7 +1294,7 @@ func (m *DriverCapabilities) Reset() { *m = DriverCapabilities{} } func (m *DriverCapabilities) String() string { return proto.CompactTextString(m) } func (*DriverCapabilities) ProtoMessage() {} func (*DriverCapabilities) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{25} + return fileDescriptor_driver_66cfa35dd20ec741, []int{25} } func (m *DriverCapabilities) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DriverCapabilities.Unmarshal(m, b) @@ -1380,7 +1380,7 @@ func (m *TaskConfig) Reset() { *m = TaskConfig{} } func (m *TaskConfig) String() string { return proto.CompactTextString(m) } func (*TaskConfig) ProtoMessage() {} func (*TaskConfig) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{26} + return fileDescriptor_driver_66cfa35dd20ec741, []int{26} } func (m *TaskConfig) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskConfig.Unmarshal(m, b) @@ -1519,7 +1519,7 @@ func (m *Resources) Reset() { *m = Resources{} } func (m *Resources) String() string { return proto.CompactTextString(m) } func (*Resources) ProtoMessage() {} func (*Resources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{27} + return fileDescriptor_driver_66cfa35dd20ec741, []int{27} } func (m *Resources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Resources.Unmarshal(m, b) @@ -1566,7 +1566,7 @@ func (m *AllocatedTaskResources) Reset() { *m = AllocatedTaskResources{} func (m *AllocatedTaskResources) String() string { return proto.CompactTextString(m) } func (*AllocatedTaskResources) ProtoMessage() {} func (*AllocatedTaskResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{28} + return fileDescriptor_driver_66cfa35dd20ec741, []int{28} } func (m *AllocatedTaskResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedTaskResources.Unmarshal(m, b) @@ -1618,7 +1618,7 @@ func (m *AllocatedCpuResources) Reset() { *m = AllocatedCpuResources{} } func (m *AllocatedCpuResources) String() string { return proto.CompactTextString(m) } func (*AllocatedCpuResources) ProtoMessage() {} func (*AllocatedCpuResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{29} + return fileDescriptor_driver_66cfa35dd20ec741, []int{29} } func (m *AllocatedCpuResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedCpuResources.Unmarshal(m, b) @@ -1656,7 +1656,7 @@ func (m *AllocatedMemoryResources) Reset() { *m = AllocatedMemoryResourc func (m *AllocatedMemoryResources) String() string { return proto.CompactTextString(m) } func (*AllocatedMemoryResources) ProtoMessage() {} func (*AllocatedMemoryResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{30} + return fileDescriptor_driver_66cfa35dd20ec741, []int{30} } func (m *AllocatedMemoryResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_AllocatedMemoryResources.Unmarshal(m, b) @@ -1699,7 +1699,7 @@ func (m *NetworkResource) Reset() { *m = NetworkResource{} } func (m *NetworkResource) String() string { return proto.CompactTextString(m) } func (*NetworkResource) ProtoMessage() {} func (*NetworkResource) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{31} + return fileDescriptor_driver_66cfa35dd20ec741, []int{31} } func (m *NetworkResource) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkResource.Unmarshal(m, b) @@ -1773,7 +1773,7 @@ func (m *NetworkPort) Reset() { *m = NetworkPort{} } func (m *NetworkPort) String() string { return proto.CompactTextString(m) } func (*NetworkPort) ProtoMessage() {} func (*NetworkPort) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{32} + return fileDescriptor_driver_66cfa35dd20ec741, []int{32} } func (m *NetworkPort) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkPort.Unmarshal(m, b) @@ -1833,7 +1833,7 @@ func (m *LinuxResources) Reset() { *m = LinuxResources{} } func (m *LinuxResources) String() string { return proto.CompactTextString(m) } func (*LinuxResources) ProtoMessage() {} func (*LinuxResources) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{33} + return fileDescriptor_driver_66cfa35dd20ec741, []int{33} } func (m *LinuxResources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_LinuxResources.Unmarshal(m, b) @@ -1925,7 +1925,7 @@ func (m *Mount) Reset() { *m = Mount{} } func (m *Mount) String() string { return proto.CompactTextString(m) } func (*Mount) ProtoMessage() {} func (*Mount) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{34} + return fileDescriptor_driver_66cfa35dd20ec741, []int{34} } func (m *Mount) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Mount.Unmarshal(m, b) @@ -1988,7 +1988,7 @@ func (m *Device) Reset() { *m = Device{} } func (m *Device) String() string { return proto.CompactTextString(m) } func (*Device) ProtoMessage() {} func (*Device) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{35} + return fileDescriptor_driver_66cfa35dd20ec741, []int{35} } func (m *Device) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Device.Unmarshal(m, b) @@ -2046,7 +2046,7 @@ func (m *TaskHandle) Reset() { *m = TaskHandle{} } func (m *TaskHandle) String() string { return proto.CompactTextString(m) } func (*TaskHandle) ProtoMessage() {} func (*TaskHandle) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{36} + return fileDescriptor_driver_66cfa35dd20ec741, []int{36} } func (m *TaskHandle) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskHandle.Unmarshal(m, b) @@ -2106,7 +2106,7 @@ func (m *NetworkOverride) Reset() { *m = NetworkOverride{} } func (m *NetworkOverride) String() string { return proto.CompactTextString(m) } func (*NetworkOverride) ProtoMessage() {} func (*NetworkOverride) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{37} + return fileDescriptor_driver_66cfa35dd20ec741, []int{37} } func (m *NetworkOverride) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_NetworkOverride.Unmarshal(m, b) @@ -2164,7 +2164,7 @@ func (m *ExitResult) Reset() { *m = ExitResult{} } func (m *ExitResult) String() string { return proto.CompactTextString(m) } func (*ExitResult) ProtoMessage() {} func (*ExitResult) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{38} + return fileDescriptor_driver_66cfa35dd20ec741, []int{38} } func (m *ExitResult) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExitResult.Unmarshal(m, b) @@ -2227,7 +2227,7 @@ func (m *TaskStatus) Reset() { *m = TaskStatus{} } func (m *TaskStatus) String() string { return proto.CompactTextString(m) } func (*TaskStatus) ProtoMessage() {} func (*TaskStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{39} + return fileDescriptor_driver_66cfa35dd20ec741, []int{39} } func (m *TaskStatus) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStatus.Unmarshal(m, b) @@ -2302,7 +2302,7 @@ func (m *TaskDriverStatus) Reset() { *m = TaskDriverStatus{} } func (m *TaskDriverStatus) String() string { return proto.CompactTextString(m) } func (*TaskDriverStatus) ProtoMessage() {} func (*TaskDriverStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{40} + return fileDescriptor_driver_66cfa35dd20ec741, []int{40} } func (m *TaskDriverStatus) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskDriverStatus.Unmarshal(m, b) @@ -2347,7 +2347,7 @@ func (m *TaskStats) Reset() { *m = TaskStats{} } func (m *TaskStats) String() string { return proto.CompactTextString(m) } func (*TaskStats) ProtoMessage() {} func (*TaskStats) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{41} + return fileDescriptor_driver_66cfa35dd20ec741, []int{41} } func (m *TaskStats) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskStats.Unmarshal(m, b) @@ -2409,7 +2409,7 @@ func (m *TaskResourceUsage) Reset() { *m = TaskResourceUsage{} } func (m *TaskResourceUsage) String() string { return proto.CompactTextString(m) } func (*TaskResourceUsage) ProtoMessage() {} func (*TaskResourceUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{42} + return fileDescriptor_driver_66cfa35dd20ec741, []int{42} } func (m *TaskResourceUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_TaskResourceUsage.Unmarshal(m, b) @@ -2461,7 +2461,7 @@ func (m *CPUUsage) Reset() { *m = CPUUsage{} } func (m *CPUUsage) String() string { return proto.CompactTextString(m) } func (*CPUUsage) ProtoMessage() {} func (*CPUUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{43} + return fileDescriptor_driver_66cfa35dd20ec741, []int{43} } func (m *CPUUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CPUUsage.Unmarshal(m, b) @@ -2547,7 +2547,7 @@ func (m *MemoryUsage) Reset() { *m = MemoryUsage{} } func (m *MemoryUsage) String() string { return proto.CompactTextString(m) } func (*MemoryUsage) ProtoMessage() {} func (*MemoryUsage) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{44} + return fileDescriptor_driver_66cfa35dd20ec741, []int{44} } func (m *MemoryUsage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MemoryUsage.Unmarshal(m, b) @@ -2631,7 +2631,7 @@ func (m *DriverTaskEvent) Reset() { *m = DriverTaskEvent{} } func (m *DriverTaskEvent) String() string { return proto.CompactTextString(m) } func (*DriverTaskEvent) ProtoMessage() {} func (*DriverTaskEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_driver_de29bfae7a3376ed, []int{45} + return fileDescriptor_driver_66cfa35dd20ec741, []int{45} } func (m *DriverTaskEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DriverTaskEvent.Unmarshal(m, b) @@ -3339,10 +3339,10 @@ var _Driver_serviceDesc = grpc.ServiceDesc{ } func init() { - proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_de29bfae7a3376ed) + proto.RegisterFile("plugins/drivers/proto/driver.proto", fileDescriptor_driver_66cfa35dd20ec741) } -var fileDescriptor_driver_de29bfae7a3376ed = []byte{ +var fileDescriptor_driver_66cfa35dd20ec741 = []byte{ // 2940 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x59, 0xcb, 0x6f, 0x23, 0xc7, 0xd1, 0x17, 0x9f, 0x22, 0x8b, 0x12, 0x35, 0xdb, 0xbb, 0x6b, 0xd3, 0x34, 0xbe, 0xcf, 0xeb, 0x01, diff --git a/plugins/drivers/testutils/testing.go b/plugins/drivers/testutils/testing.go index 2394b4f1ba7d..f4c54702ff24 100644 --- a/plugins/drivers/testutils/testing.go +++ b/plugins/drivers/testutils/testing.go @@ -43,7 +43,7 @@ func (d *DriverHarness) Impl() drivers.DriverPlugin { func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness { logger := testlog.HCLogger(t).Named("driver_harness") - pd := drivers.NewDriverPlugin(d, logger).(*drivers.PluginDriver) + pd := drivers.NewDriverPlugin(d).(*drivers.PluginDriver) client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{ diff --git a/plugins/shared/cmd/launcher/command/device.go b/plugins/shared/cmd/launcher/command/device.go index 01855da7be7b..fc8b6b7af229 100644 --- a/plugins/shared/cmd/launcher/command/device.go +++ b/plugins/shared/cmd/launcher/command/device.go @@ -18,8 +18,8 @@ import ( "github.com/hashicorp/hcl2/hcldec" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/device" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/kr/pretty" "github.com/mitchellh/cli" "github.com/zclconf/go-cty/cty/msgpack" @@ -198,10 +198,10 @@ func (c *Device) setConfig(spec hcldec.Spec, apiVersion string, config []byte, n c.logger.Trace("raw hcl config", "config", hclog.Fmt("% #v", pretty.Formatter(configVal))) ctx := &hcl2.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } - val, diag := shared.ParseHclInterface(configVal, spec, ctx) + val, diag := hclutils.ParseHclInterface(configVal, spec, ctx) if diag.HasErrors() { errStr := "failed to parse config" for _, err := range diag.Errs() { diff --git a/plugins/shared/grpc_utils.go b/plugins/shared/grpc_utils.go deleted file mode 100644 index 34fb33a870c7..000000000000 --- a/plugins/shared/grpc_utils.go +++ /dev/null @@ -1,61 +0,0 @@ -package shared - -import ( - "context" - "time" - - "github.com/hashicorp/nomad/plugins/base" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// HandleStreamErr is used to handle a non io.EOF error in a stream. It handles -// detecting if the plugin has shutdown via the passeed pluginCtx. The -// parameters are: -// - err: the error returned from the streaming RPC -// - reqCtx: the context passed to the streaming request -// - pluginCtx: the plugins done ctx used to detect the plugin dying -// -// The return values are: -// - base.ErrPluginShutdown if the error is because the plugin shutdown -// - context.Canceled if the reqCtx is canceled -// - The original error -func HandleStreamErr(err error, reqCtx, pluginCtx context.Context) error { - if err == nil { - return nil - } - - // Determine if the error is because the plugin shutdown - if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable { - // Potentially wait a little before returning an error so we can detect - // the exit - select { - case <-pluginCtx.Done(): - err = base.ErrPluginShutdown - case <-reqCtx.Done(): - err = reqCtx.Err() - - // There is no guarantee that the select will choose the - // doneCtx first so we have to double check - select { - case <-pluginCtx.Done(): - err = base.ErrPluginShutdown - default: - } - case <-time.After(3 * time.Second): - // Its okay to wait a while since the connection isn't available and - // on local host it is likely shutting down. It is not expected for - // this to ever reach even close to 3 seconds. - } - - // It is an error we don't know how to handle, so return it - return err - } - - // Context was cancelled - if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled { - return context.Canceled - } - - return err -} diff --git a/plugins/shared/grpcutils/utils.go b/plugins/shared/grpcutils/utils.go new file mode 100644 index 000000000000..001cf4ad3a1d --- /dev/null +++ b/plugins/shared/grpcutils/utils.go @@ -0,0 +1,105 @@ +package grpcutils + +import ( + "context" + "time" + + "github.com/hashicorp/nomad/plugins/base/structs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// HandleReqCtxGrpcErr is used to handle a non io.EOF error in a GRPC request +// where a user supplied context is used. It handles detecting if the plugin has +// shutdown via the passeed pluginCtx. The parameters are: +// - err: the error returned from the streaming RPC +// - reqCtx: the user context passed to the request +// - pluginCtx: the plugins done ctx used to detect the plugin dying +// +// The return values are: +// - ErrPluginShutdown if the error is because the plugin shutdown +// - context.Canceled if the reqCtx is canceled +// - The original error +func HandleReqCtxGrpcErr(err error, reqCtx, pluginCtx context.Context) error { + if err == nil { + return nil + } + + // Determine if the error is because the plugin shutdown + if errStatus, ok := status.FromError(err); ok && + (errStatus.Code() == codes.Unavailable || errStatus.Code() == codes.Canceled) { + // Potentially wait a little before returning an error so we can detect + // the exit + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + case <-reqCtx.Done(): + err = reqCtx.Err() + + // There is no guarantee that the select will choose the + // doneCtx first so we have to double check + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + default: + } + case <-time.After(3 * time.Second): + // Its okay to wait a while since the connection isn't available and + // on local host it is likely shutting down. It is not expected for + // this to ever reach even close to 3 seconds. + } + + // It is an error we don't know how to handle, so return it + return err + } + + // Context was cancelled + if errStatus := status.FromContextError(reqCtx.Err()); errStatus.Code() == codes.Canceled { + return context.Canceled + } + + return err +} + +// HandleGrpcErr is used to handle errors made to a remote gRPC plugin. It +// handles detecting if the plugin has shutdown via the passeed pluginCtx. The +// parameters are: +// - err: the error returned from the streaming RPC +// - pluginCtx: the plugins done ctx used to detect the plugin dying +// +// The return values are: +// - ErrPluginShutdown if the error is because the plugin shutdown +// - The original error +func HandleGrpcErr(err error, pluginCtx context.Context) error { + if err == nil { + return nil + } + + if errStatus := status.FromContextError(pluginCtx.Err()); errStatus.Code() == codes.Canceled { + // See if the plugin shutdown + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + default: + } + } + + // Determine if the error is because the plugin shutdown + if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.Unavailable { + // Potentially wait a little before returning an error so we can detect + // the exit + select { + case <-pluginCtx.Done(): + err = structs.ErrPluginShutdown + case <-time.After(3 * time.Second): + // Its okay to wait a while since the connection isn't available and + // on local host it is likely shutting down. It is not expected for + // this to ever reach even close to 3 seconds. + } + + // It is an error we don't know how to handle, so return it + return err + } + + return err +} diff --git a/plugins/shared/util.go b/plugins/shared/hclutils/util.go similarity index 99% rename from plugins/shared/util.go rename to plugins/shared/hclutils/util.go index 9152915b42ca..86a8d2e6c1dc 100644 --- a/plugins/shared/util.go +++ b/plugins/shared/hclutils/util.go @@ -1,4 +1,4 @@ -package shared +package hclutils import ( "bytes" diff --git a/plugins/shared/util_test.go b/plugins/shared/hclutils/util_test.go similarity index 99% rename from plugins/shared/util_test.go rename to plugins/shared/hclutils/util_test.go index 3dc2488ceea6..bfbb7c0a6dd5 100644 --- a/plugins/shared/util_test.go +++ b/plugins/shared/hclutils/util_test.go @@ -1,4 +1,4 @@ -package shared +package hclutils import ( "testing" diff --git a/plugins/shared/loader/init.go b/plugins/shared/loader/init.go index 89f09198ce10..7af5c8e53fa3 100644 --- a/plugins/shared/loader/init.go +++ b/plugins/shared/loader/init.go @@ -13,8 +13,8 @@ import ( hcl2 "github.com/hashicorp/hcl2/hcl" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/base" - "github.com/hashicorp/nomad/plugins/shared" "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/plugins/shared/hclutils" "github.com/zclconf/go-cty/cty/msgpack" ) @@ -22,7 +22,7 @@ var ( // configParseCtx is the context used to parse a plugin's configuration // stanza configParseCtx = &hcl2.EvalContext{ - Functions: shared.GetStdlibFuncs(), + Functions: hclutils.GetStdlibFuncs(), } ) @@ -467,7 +467,7 @@ func (l *PluginLoader) validePluginConfig(id PluginID, info *pluginInfo) error { } // Parse the config using the spec - val, diag := shared.ParseHclInterface(info.config, spec, configParseCtx) + val, diag := hclutils.ParseHclInterface(info.config, spec, configParseCtx) if diag.HasErrors() { multierror.Append(&mErr, diag.Errs()...) return multierror.Prefix(&mErr, "failed parsing config:") diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index ba3aba4bde01..0e99efd6b500 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -86,14 +86,19 @@ func TestServiceSched_JobRegister(t *testing.T) { } // Ensure different ports were used. - used := make(map[int]struct{}) + used := make(map[int]map[string]struct{}) for _, alloc := range out { for _, resource := range alloc.TaskResources { for _, port := range resource.Networks[0].DynamicPorts { - if _, ok := used[port.Value]; ok { - t.Fatalf("Port collision %v", port.Value) + nodeMap, ok := used[port.Value] + if !ok { + nodeMap = make(map[string]struct{}) + used[port.Value] = nodeMap } - used[port.Value] = struct{}{} + if _, ok := nodeMap[alloc.NodeID]; ok { + t.Fatalf("Port collision on node %q %v", alloc.NodeID, port.Value) + } + nodeMap[alloc.NodeID] = struct{}{} } } }