From 9f44780f89202e015e29bf3b395e778f78fbdcd8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 22 Dec 2015 16:10:30 -0800 Subject: [PATCH 1/3] User specifiable kill timeout and operator configurable max --- api/tasks.go | 1 + client/config/config.go | 6 +++ client/driver/docker.go | 10 ++++- client/driver/docker_test.go | 3 +- client/driver/driver.go | 13 ++++++ client/driver/driver_test.go | 19 ++++++++ client/driver/exec.go | 55 +++++++++++++++++------ client/driver/java.go | 55 +++++++++++++++++------ client/driver/qemu.go | 55 +++++++++++++++++------ client/driver/raw_exec.go | 55 +++++++++++++++++------ client/driver/rkt.go | 45 ++++++++++--------- client/driver/rkt_test.go | 11 ++--- command/agent/agent.go | 7 +++ command/agent/config.go | 11 ++++- command/agent/config_test.go | 6 ++- jobspec/parse.go | 10 ++++- jobspec/parse_test.go | 1 + jobspec/test-fixtures/basic.hcl | 2 + nomad/structs/structs.go | 15 +++++++ website/source/docs/agent/config.html.md | 5 +++ website/source/docs/jobspec/index.html.md | 4 ++ 21 files changed, 300 insertions(+), 89 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 3691cc78bdc5..122cd804e040 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -85,6 +85,7 @@ type Task struct { Services []Service Resources *Resources Meta map[string]string + KillTimeout time.Duration } // NewTask creates and initializes a new Task. diff --git a/client/config/config.go b/client/config/config.go index 16cb50765a60..9e07bc12e30d 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -5,6 +5,7 @@ import ( "io" "strconv" "strings" + "time" "github.com/hashicorp/nomad/nomad/structs" ) @@ -41,6 +42,11 @@ type Config struct { // be determined dynamically. NetworkSpeed int + // MaxKillTimeout allows capping the user-specifiable KillTimeout. If the + // task's KillTimeout is greater than the MaxKillTimeout, MaxKillTimeout is + // used. + MaxKillTimeout time.Duration + // Servers is a list of known server addresses. These are as "host:port" Servers []string diff --git a/client/driver/docker.go b/client/driver/docker.go index 330acd75c978..db0acc9c5e26 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" docker "github.com/fsouza/go-dockerclient" @@ -67,6 +68,7 @@ func (c *DockerDriverConfig) Validate() error { type dockerPID struct { ImageID string ContainerID string + KillTimeout time.Duration } type DockerHandle struct { @@ -76,6 +78,7 @@ type DockerHandle struct { cleanupImage bool imageID string containerID string + killTimeout time.Duration waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -502,6 +505,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle logger: d.logger, imageID: dockerImage.ID, containerID: container.ID, + killTimeout: d.DriverContext.KillTimeout(task), doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -555,6 +559,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er logger: d.logger, imageID: pid.ImageID, containerID: pid.ContainerID, + killTimeout: pid.KillTimeout, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } @@ -567,6 +572,7 @@ func (h *DockerHandle) ID() string { pid := dockerPID{ ImageID: h.imageID, ContainerID: h.containerID, + KillTimeout: h.killTimeout, } data, err := json.Marshal(pid) if err != nil { @@ -588,10 +594,10 @@ func (h *DockerHandle) Update(task *structs.Task) error { return nil } -// Kill is used to terminate the task. This uses docker stop -t 5 +// Kill is used to terminate the task. This uses `docker stop -t killTimeout` func (h *DockerHandle) Kill() error { // Stop the container - err := h.client.StopContainer(h.containerID, 5) + err := h.client.StopContainer(h.containerID, uint(h.killTimeout.Seconds())) if err != nil { h.logger.Printf("[ERR] driver.docker: failed to stop container %s", h.containerID) return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err) diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 2213ecaefc69..8adcc60920bd 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -123,12 +123,13 @@ func TestDockerDriver_Handle(t *testing.T) { h := &DockerHandle{ imageID: "imageid", containerID: "containerid", + killTimeout: 5 * time.Nanosecond, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() - expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid"}` + expected := `DOCKER:{"ImageID":"imageid","ContainerID":"containerid","KillTimeout":5}` if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) } diff --git a/client/driver/driver.go b/client/driver/driver.go index a82847f4cdff..ba7e484f7a87 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -5,6 +5,7 @@ import ( "log" "path/filepath" "sync" + "time" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -80,6 +81,18 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node } } +// KillTimeout returns the timeout that should be used for the task between +// signaling and killing the task. +func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration { + max := d.config.MaxKillTimeout.Nanoseconds() + desired := task.KillTimeout.Nanoseconds() + if desired < max { + return task.KillTimeout + } + + return d.config.MaxKillTimeout +} + // DriverHandle is an opaque handle into a driver used for task // manipulation type DriverHandle interface { diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index e35c5c5ad058..92f72bf388fe 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "testing" + "time" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -60,6 +61,24 @@ func testDriverExecContext(task *structs.Task, driverCtx *DriverContext) *ExecCo return ctx } +func TestDriver_KillTimeout(t *testing.T) { + ctx := testDriverContext("foo") + ctx.config.MaxKillTimeout = 10 * time.Second + expected := 1 * time.Second + task := &structs.Task{KillTimeout: expected} + + if actual := ctx.KillTimeout(task); expected != actual { + t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected) + } + + expected = 10 * time.Second + task = &structs.Task{KillTimeout: 11 * time.Second} + + if actual := ctx.KillTimeout(task); expected != actual { + t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected) + } +} + func TestDriver_TaskEnvironmentVariables(t *testing.T) { t.Parallel() ctx := &ExecContext{} diff --git a/client/driver/exec.go b/client/driver/exec.go index a0a13652337e..ba4c02349cbc 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -1,7 +1,9 @@ package driver import ( + "encoding/json" "fmt" + "log" "path/filepath" "runtime" "syscall" @@ -32,9 +34,11 @@ type ExecDriverConfig struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { - cmd executor.Executor - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + cmd executor.Executor + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewExecDriver is used to create a new exec driver @@ -110,34 +114,57 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Return a driver handle h := &execHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } +type execId struct { + ExecutorId string + KillTimeout time.Duration +} + func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + id := &execId{} + if err := json.Unmarshal([]byte(handleID), id); err != nil { + return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) + } + // Find the process - cmd, err := executor.OpenId(handleID) + cmd, err := executor.OpenId(id.ExecutorId) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err) + return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) } // Return a driver handle h := &execHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *execHandle) ID() string { - id, _ := h.cmd.ID() - return id + executorId, _ := h.cmd.ID() + id := execId{ + ExecutorId: executorId, + KillTimeout: h.killTimeout, + } + + data, err := json.Marshal(id) + if err != nil { + h.logger.Printf("[ERR] driver.exec: failed to marshal ID to JSON: %s", err) + } + return string(data) } func (h *execHandle) WaitCh() chan *cstructs.WaitResult { @@ -154,7 +181,7 @@ func (h *execHandle) Kill() error { select { case <-h.doneCh: return nil - case <-time.After(5 * time.Second): + case <-time.After(h.killTimeout): return h.cmd.ForceStop() } } diff --git a/client/driver/java.go b/client/driver/java.go index eb475db32ddc..656b7ddd2524 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -2,7 +2,9 @@ package driver import ( "bytes" + "encoding/json" "fmt" + "log" "os/exec" "path/filepath" "runtime" @@ -36,9 +38,11 @@ type JavaDriverConfig struct { // javaHandle is returned from Start/Open as a handle to the PID type javaHandle struct { - cmd executor.Executor - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + cmd executor.Executor + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewJavaDriver is used to create a new exec driver @@ -158,27 +162,41 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Return a driver handle h := &javaHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } +type javaId struct { + ExecutorId string + KillTimeout time.Duration +} + func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + id := &javaId{} + if err := json.Unmarshal([]byte(handleID), id); err != nil { + return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) + } + // Find the process - cmd, err := executor.OpenId(handleID) + cmd, err := executor.OpenId(id.ExecutorId) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err) + return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) } // Return a driver handle h := &javaHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -186,8 +204,17 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro } func (h *javaHandle) ID() string { - id, _ := h.cmd.ID() - return id + executorId, _ := h.cmd.ID() + id := javaId{ + ExecutorId: executorId, + KillTimeout: h.killTimeout, + } + + data, err := json.Marshal(id) + if err != nil { + h.logger.Printf("[ERR] driver.java: failed to marshal ID to JSON: %s", err) + } + return string(data) } func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { @@ -204,7 +231,7 @@ func (h *javaHandle) Kill() error { select { case <-h.doneCh: return nil - case <-time.After(5 * time.Second): + case <-time.After(h.killTimeout): return h.cmd.ForceStop() } } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 640b05692c54..6380900a139c 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -1,7 +1,9 @@ package driver import ( + "encoding/json" "fmt" + "log" "os/exec" "path/filepath" "regexp" @@ -40,9 +42,11 @@ type QemuDriverConfig struct { // qemuHandle is returned from Start/Open as a handle to the PID type qemuHandle struct { - cmd executor.Executor - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + cmd executor.Executor + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewQemuDriver is used to create a new exec driver @@ -197,35 +201,58 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Create and Return Handle h := &qemuHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } +type qemuId struct { + ExecutorId string + KillTimeout time.Duration +} + func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + id := &qemuId{} + if err := json.Unmarshal([]byte(handleID), id); err != nil { + return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) + } + // Find the process - cmd, err := executor.OpenId(handleID) + cmd, err := executor.OpenId(id.ExecutorId) if err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err) + return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) } // Return a driver handle h := &execHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *qemuHandle) ID() string { - id, _ := h.cmd.ID() - return id + executorId, _ := h.cmd.ID() + id := qemuId{ + ExecutorId: executorId, + KillTimeout: h.killTimeout, + } + + data, err := json.Marshal(id) + if err != nil { + h.logger.Printf("[ERR] driver.qemu: failed to marshal ID to JSON: %s", err) + } + return string(data) } func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { @@ -244,7 +271,7 @@ func (h *qemuHandle) Kill() error { select { case <-h.doneCh: return nil - case <-time.After(5 * time.Second): + case <-time.After(h.killTimeout): return h.cmd.ForceStop() } } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 9491873ba453..dcfb72199e30 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -1,7 +1,9 @@ package driver import ( + "encoding/json" "fmt" + "log" "path/filepath" "time" @@ -30,9 +32,11 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { - cmd executor.Executor - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + cmd executor.Executor + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewRawExecDriver is used to create a new raw exec driver @@ -109,34 +113,57 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl // Return a driver handle h := &execHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } +type rawExecId struct { + ExecutorId string + KillTimeout time.Duration +} + func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { + id := &rawExecId{} + if err := json.Unmarshal([]byte(handleID), id); err != nil { + return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) + } + // Find the process cmd := executor.NewBasicExecutor() - if err := cmd.Open(handleID); err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", handleID, err) + if err := cmd.Open(id.ExecutorId); err != nil { + return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) } // Return a driver handle h := &execHandle{ - cmd: cmd, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + cmd: cmd, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *rawExecHandle) ID() string { - id, _ := h.cmd.ID() - return id + executorId, _ := h.cmd.ID() + id := rawExecId{ + ExecutorId: executorId, + KillTimeout: h.killTimeout, + } + + data, err := json.Marshal(id) + if err != nil { + h.logger.Printf("[ERR] driver.raw_exec: failed to marshal ID to JSON: %s", err) + } + return string(data) } func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { @@ -153,7 +180,7 @@ func (h *rawExecHandle) Kill() error { select { case <-h.doneCh: return nil - case <-time.After(5 * time.Second): + case <-time.After(h.killTimeout): return h.cmd.ForceStop() } } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 5da864bfa248..437445713e66 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -54,18 +54,20 @@ type RktDriverConfig struct { // rktHandle is returned from Start/Open as a handle to the PID type rktHandle struct { - proc *os.Process - image string - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + proc *os.Process + image string + logger *log.Logger + killTimeout time.Duration + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // rktPID is a struct to map the pid running the process to the vm image on // disk type rktPID struct { - Pid int - Image string + Pid int + Image string + KillTimeout time.Duration } // NewRktDriver is used to create a new exec driver @@ -218,11 +220,12 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmd.Args) h := &rktHandle{ - proc: cmd.Process, - image: img, - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + proc: cmd.Process, + image: img, + logger: d.logger, + killTimeout: d.DriverContext.KillTimeout(task), + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -244,11 +247,12 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error // Return a driver handle h := &rktHandle{ - proc: proc, - image: qpid.Image, - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + proc: proc, + image: qpid.Image, + logger: d.logger, + killTimeout: qpid.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -258,8 +262,9 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error func (h *rktHandle) ID() string { // Return a handle to the PID pid := &rktPID{ - Pid: h.proc.Pid, - Image: h.image, + Pid: h.proc.Pid, + Image: h.image, + KillTimeout: h.killTimeout, } data, err := json.Marshal(pid) if err != nil { @@ -284,7 +289,7 @@ func (h *rktHandle) Kill() error { select { case <-h.doneCh: return nil - case <-time.After(5 * time.Second): + case <-time.After(h.killTimeout): return h.proc.Kill() } } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 8ee4425c2127..13429c3cbca6 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -33,14 +33,15 @@ func TestRktVersionRegex(t *testing.T) { func TestRktDriver_Handle(t *testing.T) { h := &rktHandle{ - proc: &os.Process{Pid: 123}, - image: "foo", - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + proc: &os.Process{Pid: 123}, + image: "foo", + killTimeout: 5 * time.Nanosecond, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() - expected := `Rkt:{"Pid":123,"Image":"foo"}` + expected := `Rkt:{"Pid":123,"Image":"foo","KillTimeout":5}` if actual != expected { t.Errorf("Expected `%s`, found `%s`", expected, actual) } diff --git a/command/agent/agent.go b/command/agent/agent.go index 8ac791f61c19..bb9679551dd4 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -208,6 +208,13 @@ func (a *Agent) setupClient() error { if a.config.Client.NetworkSpeed != 0 { conf.NetworkSpeed = a.config.Client.NetworkSpeed } + if a.config.Client.MaxKillTimeout != "" { + dur, err := time.ParseDuration(a.config.Client.MaxKillTimeout) + if err != nil { + return fmt.Errorf("Error parsing retry interval: %s", err) + } + conf.MaxKillTimeout = dur + } // Setup the node conf.Node = new(structs.Node) diff --git a/command/agent/config.go b/command/agent/config.go index 52bd0911bab4..fed47144211c 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -156,6 +156,9 @@ type ClientConfig struct { // The network link speed to use if it can not be determined dynamically. NetworkSpeed int `hcl:"network_speed"` + + // MaxKillTimeout allows capping the user-specifiable KillTimeout. + MaxKillTimeout string `hcl:"max_kill_timeout"` } // ServerConfig is configuration specific to the server mode @@ -281,8 +284,9 @@ func DefaultConfig() *Config { AdvertiseAddrs: &AdvertiseAddrs{}, Atlas: &AtlasConfig{}, Client: &ClientConfig{ - Enabled: false, - NetworkSpeed: 100, + Enabled: false, + NetworkSpeed: 100, + MaxKillTimeout: "30s", }, Server: &ServerConfig{ Enabled: false, @@ -500,6 +504,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.NetworkSpeed != 0 { result.NetworkSpeed = b.NetworkSpeed } + if b.MaxKillTimeout != "" { + result.MaxKillTimeout = b.MaxKillTimeout + } // Add the servers result.Servers = append(result.Servers, b.Servers...) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index f89386a95d35..5c4c6c7131ea 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -40,7 +40,8 @@ func TestConfig_Merge(t *testing.T) { Options: map[string]string{ "foo": "bar", }, - NetworkSpeed: 100, + NetworkSpeed: 100, + MaxKillTimeout: "20s", }, Server: &ServerConfig{ Enabled: false, @@ -105,7 +106,8 @@ func TestConfig_Merge(t *testing.T) { "foo": "bar", "baz": "zip", }, - NetworkSpeed: 100, + NetworkSpeed: 105, + MaxKillTimeout: "50s", }, Server: &ServerConfig{ Enabled: true, diff --git a/jobspec/parse.go b/jobspec/parse.go index 963c0b3e4736..543f7e5dcfed 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -408,7 +408,15 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l if taskGroupName == "" { taskGroupName = n } - if err := mapstructure.WeakDecode(m, &t); err != nil { + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &t, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { return err } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 135275e13c1b..eab8a5f433c9 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -121,6 +121,7 @@ func TestParse(t *testing.T) { }, }, }, + KillTimeout: 22 * time.Second, }, &structs.Task{ Name: "storagelocker", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 549e380ef29d..50e91d441ac8 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -77,6 +77,8 @@ job "binstore-storagelocker" { port "admin" {} } } + + kill_timeout = "22s" } task "storagelocker" { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bed272309d9d..84716d39b6b3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1242,6 +1242,12 @@ func (s *Service) Hash() string { return fmt.Sprintf("%x", h.Sum(nil)) } +const ( + // DefaultKillTimeout is the default timeout between signaling a task it + // will be killed and killing it. + DefaultKillTimeout = 5 * time.Second +) + // Task is a single process typically that is executed as part of a task group. type Task struct { // Name of the task @@ -1269,11 +1275,20 @@ type Task struct { // Meta is used to associate arbitrary metadata with this // task. This is opaque to Nomad. Meta map[string]string + + // KillTimeout is the time between signaling a task that it will be + // killed and killing it. + KillTimeout time.Duration `mapstructure:"kill_timeout"` } // InitFields initializes fields in the task. func (t *Task) InitFields(job *Job, tg *TaskGroup) { t.InitServiceFields(job.Name, tg.Name) + + // Set the default timeout if it is not specified. + if t.KillTimeout == 0 { + t.KillTimeout = DefaultKillTimeout + } } // InitServiceFields interpolates values of Job, Task Group diff --git a/website/source/docs/agent/config.html.md b/website/source/docs/agent/config.html.md index 386ba64ed702..3c76bcee312e 100644 --- a/website/source/docs/agent/config.html.md +++ b/website/source/docs/agent/config.html.md @@ -281,6 +281,11 @@ configured on server nodes. * `network_speed`: This is an int that sets the default link speed of network interfaces, in megabits, if their speed can not be determined dynamically. + * `max_kill_timeout`: `max_kill_timeout` is a time duration that can be + specified using the `s`, `m`, and `h` suffixes, such as `30s`. If a job's + task specifies a `kill_timeout` greater than `max_kill_timeout`, + `max_kill_timeout` is used. This is to prevent a user being able to set an + unreasonable timeout. If unset, a default is used. ### Client Options Map diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index f55251c2b505..d1de8989d8cb 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -204,6 +204,10 @@ The `task` object supports the following keys: * `meta` - Annotates the task group with opaque metadata. +* `kill_timeout` - `kill_timeout` is a time duration that can be specified using + the `s`, `m`, and `h` suffixes, such as `30s`. It can be used to configure the + time between signaling a task it will be killed and actually killing it. + ### Resources The `resources` object supports the following keys: From d285df82759b24239b32c5a642908fb3c9bda8ef Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 4 Jan 2016 11:36:57 -0800 Subject: [PATCH 2/3] Add to nomad init --- command/init.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/command/init.go b/command/init.go index cf2d0aa3bc40..61f6c822b829 100644 --- a/command/init.go +++ b/command/init.go @@ -161,6 +161,10 @@ job "example" { } } } + + # Controls the timeout between signalling a task it will be killed + # and killing the task. If not set a default is used. + # kill_timeout = "20s" } } } From 62e6e88da44130b3872cd7cde40a691d149dc3bd Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 4 Jan 2016 11:41:31 -0800 Subject: [PATCH 3/3] Validate that the kill timeout is positive --- nomad/structs/structs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 84716d39b6b3..6c99356038da 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1414,6 +1414,9 @@ func (t *Task) Validate() error { if t.Resources == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing task resources")) } + if t.KillTimeout.Nanoseconds() < 0 { + mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value")) + } for idx, constr := range t.Constraints { if err := constr.Validate(); err != nil { outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)