Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Updating kill timeout adheres to operator specified maximum #878

Merged
merged 2 commits into from
Mar 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ func (c *DockerDriverConfig) Validate() error {
}

type dockerPID struct {
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
Version string
ImageID string
ContainerID string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
}

type DockerHandle struct {
Expand All @@ -101,6 +102,7 @@ type DockerHandle struct {
containerID string
version string
killTimeout time.Duration
maxKillTimeout time.Duration
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
Expand Down Expand Up @@ -600,6 +602,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
logCollector: logCollector,
Expand All @@ -610,7 +613,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
imageID: dockerImage.ID,
containerID: container.ID,
version: d.config.Version,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand Down Expand Up @@ -679,6 +683,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
containerID: pid.ContainerID,
version: pid.Version,
killTimeout: pid.KillTimeout,
maxKillTimeout: pid.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -689,11 +694,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
Version: h.version,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(pid)
if err != nil {
Expand All @@ -712,7 +718,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {

func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
}
Expand Down
19 changes: 10 additions & 9 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,19 @@ func TestDockerDriver_Handle(t *testing.T) {
defer pluginClient.Kill()

h := &DockerHandle{
version: "version",
imageID: "imageid",
logCollector: logCollector,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
version: "version",
imageID: "imageid",
logCollector: logCollector,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
maxKillTimeout: 15 * time.Nanosecond,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}

actual := h.ID()
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"MaxKillTimeout\":15,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String())
if actual != expected {
t.Errorf("Expected `%s`, found `%s`", expected, actual)
Expand Down
19 changes: 0 additions & 19 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
}
}

// KillTimeout returns the timeout that should be used for the task between
// signaling and killing the task.
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()

// Make the minimum time between signal and kill, 1 second.
if desired == 0 {
desired = (1 * time.Second).Nanoseconds()
}

if desired < max {
return time.Duration(desired)
}

return d.config.MaxKillTimeout
}

// DriverHandle is an opaque handle into a driver used for task
// manipulation
type DriverHandle interface {
Expand Down
18 changes: 0 additions & 18 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,6 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
return driverCtx, execCtx
}

func TestDriver_KillTimeout(t *testing.T) {
expected := 1 * time.Second
task := &structs.Task{Name: "foo", KillTimeout: expected}
ctx, _ := testDriverContexts(task)
ctx.config.MaxKillTimeout = 10 * time.Second

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}

expected = 10 * time.Second
task = &structs.Task{KillTimeout: 11 * time.Second}

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
}
}

func TestDriver_GetTaskEnv(t *testing.T) {
t.Parallel()
task := &structs.Task{
Expand Down
10 changes: 8 additions & 2 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type execHandle struct {
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
Expand Down Expand Up @@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &execHandle{
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
allocDir: ctx.AllocDir,
isolationConfig: ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
logger: d.logger,
version: d.config.Version,
doneCh: make(chan struct{}),
Expand All @@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
type execId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
Expand Down Expand Up @@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -209,6 +214,7 @@ func (h *execHandle) ID() string {
id := execId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
Expand All @@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {

func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
24 changes: 15 additions & 9 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type javaHandle struct {
executor executor.Executor
isolationConfig *cstructs.IsolationConfig

taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
maxKillTimeout time.Duration
version string
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

// NewJavaDriver is used to create a new exec driver
Expand Down Expand Up @@ -182,14 +183,16 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)

// Return a driver handle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
version: d.config.Version,
logger: d.logger,
doneCh: make(chan struct{}),
Expand All @@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
type javaId struct {
Version string
KillTimeout time.Duration
MaxKillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *cstructs.IsolationConfig
TaskDir string
Expand Down Expand Up @@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
logger: d.logger,
version: id.Version,
killTimeout: id.KillTimeout,
maxKillTimeout: id.MaxKillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
Expand All @@ -269,6 +274,7 @@ func (h *javaHandle) ID() string {
id := javaId{
Version: h.version,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
Expand All @@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {

func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)

// Update is not possible
Expand Down
Loading