Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: make plugin health_timeout configurable in csi_plugin stanza #13340

Merged
merged 3 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13340.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvements
csi: Made the CSI Plugin supervisor health check configurable with a new CSI Stanza health_timeout field
```
8 changes: 8 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,10 +1039,18 @@ type TaskCSIPluginConfig struct {
//
// Default is /csi.
MountDir string `mapstructure:"mount_dir" hcl:"mount_dir,optional"`

// HealthTimeout is the time after which the CSI plugin tasks will be killed
// if the CSI Plugin is not healthy.
HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"`
}

func (t *TaskCSIPluginConfig) Canonicalize() {
if t.MountDir == "" {
t.MountDir = "/csi"
}

if t.HealthTimeout == 0 {
t.HealthTimeout = 30 * time.Second
}
}
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {

// We're in Poststart at this point, so if we can't connect within
// this deadline, assume it's broken so we can restart the task
startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second)
startCtx, startCancelFn := context.WithTimeout(ctx, h.task.CSIPluginConfig.HealthTimeout)
defer startCancelFn()

var err error
Expand Down Expand Up @@ -441,7 +441,7 @@ func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) {
if err := h.lifecycle.Kill(ctx,
structs.NewTaskEvent(structs.TaskKilling).
SetFailsTask().
SetDisplayMessage("CSI plugin did not become healthy before timeout"),
SetDisplayMessage(fmt.Sprintf("CSI plugin did not become healthy before configured %v health timeout", h.task.CSIPluginConfig.HealthTimeout.String())),
); err != nil {
h.logger.Error("failed to kill task", "kill_reason", reason, "error", err)
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,7 @@ func ApiCSIPluginConfigToStructsCSIPluginConfig(apiConfig *api.TaskCSIPluginConf
sc.ID = apiConfig.ID
sc.Type = structs.CSIPluginType(apiConfig.Type)
sc.MountDir = apiConfig.MountDir
sc.HealthTimeout = apiConfig.HealthTimeout
return sc
}

Expand Down
12 changes: 10 additions & 2 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,20 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
i := o.Elem().Items[0]

var m map[string]interface{}
var cfg api.TaskCSIPluginConfig
if err := hcl.DecodeObject(&m, i.Val); err != nil {
return nil, err
}

var cfg api.TaskCSIPluginConfig
if err := mapstructure.WeakDecode(m, &cfg); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &cfg,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}

Expand Down
7 changes: 4 additions & 3 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,10 @@ func TestParse(t *testing.T) {
Name: "binstore",
Driver: "docker",
CSIPluginConfig: &api.TaskCSIPluginConfig{
ID: "org.hashicorp.csi",
Type: api.CSIPluginTypeMonolith,
MountDir: "/csi/test",
ID: "org.hashicorp.csi",
Type: api.CSIPluginTypeMonolith,
MountDir: "/csi/test",
HealthTimeout: 1 * time.Minute,
},
},
},
Expand Down
7 changes: 4 additions & 3 deletions jobspec/test-fixtures/csi-plugin.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ job "binstore-storagelocker" {
driver = "docker"

csi_plugin {
id = "org.hashicorp.csi"
type = "monolith"
mount_dir = "/csi/test"
id = "org.hashicorp.csi"
type = "monolith"
mount_dir = "/csi/test"
health_timeout = "1m"
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type TaskCSIPluginConfig struct {
// to be created by the plugin, and will provide references into
// "MountDir/CSIIntermediaryDirname/{VolumeName}/{AllocID} for mounts.
MountDir string

// HealthTimeout is the time after which the CSI plugin tasks will be killed
// if the CSI Plugin is not healthy.
HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"`
}

func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig {
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7255,6 +7255,10 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk, jobType string, tgServices
mErr.Errors = append(mErr.Errors, fmt.Errorf("CSIPluginConfig PluginType must be one of 'node', 'controller', or 'monolith', got: \"%s\"", t.CSIPluginConfig.Type))
}

if t.CSIPluginConfig.HealthTimeout == 0 {
t.CSIPluginConfig.HealthTimeout = 30 * time.Second
tgross marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Investigate validation of the PluginMountDir. Not much we can do apart from check IsAbs until after we understand its execution environment though :(
}

Expand Down