Skip to content

Commit

Permalink
drivers/docker: implement streaming exec
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed Apr 30, 2019
1 parent d144553 commit a8e460a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
78 changes: 78 additions & 0 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,84 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
return h.Exec(ctx, cmd[0], cmd[1:])
}

var _ drivers.ExecTaskStreamingDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
defer opts.Stdout.Close()
defer opts.Stderr.Close()

done := make(chan interface{})
defer close(done)

h, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}

if len(opts.Command) == 0 {
return nil, fmt.Errorf("cmd is required but was empty")
}

createExecOpts := docker.CreateExecOptions{
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
Tty: opts.Tty,
Cmd: opts.Command,
Container: h.containerID,
Context: ctx,
}
exec, err := h.client.CreateExec(createExecOpts)
if err != nil {
return nil, fmt.Errorf("failed to create exec object: %v", err)
}

go func() {
for {
select {
case <-ctx.Done():
return
case <-done:
return
case s, ok := <-opts.ResizeCh:
if !ok {
return
}
client.ResizeExecTTY(exec.ID, s.Height, s.Width)
}
}
}()

startOpts := docker.StartExecOptions{
Detach: false,
Tty: opts.Tty,
RawTerminal: opts.Tty,
InputStream: opts.Stdin,
OutputStream: opts.Stdout,
ErrorStream: opts.Stderr,
Context: ctx,
}
if err := client.StartExec(exec.ID, startOpts); err != nil {
return nil, fmt.Errorf("failed to start exec: %v", err)
}

var res *docker.ExecInspect
for res == nil || res.Running {
res, err = client.InspectExec(exec.ID)
if err != nil {
return nil, fmt.Errorf("failed to inspect exec result: %v", err)
}
time.Sleep(100 * time.Millisecond)
}

opts.Stdout.Close()
opts.Stderr.Close()

return &drivers.ExitResult{
ExitCode: res.ExitCode,
}, nil
}

// dockerClients creates two *docker.Client, one for long running operations and
// the other for shorter operations. In test / dev mode we can use ENV vars to
// connect to the docker daemon. In production mode we will read docker.endpoint
Expand Down
30 changes: 30 additions & 0 deletions drivers/docker/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -754,3 +755,32 @@ func copyFile(src, dst string, t *testing.T) {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
}

func TestDocker_ExecTaskStreaming(t *testing.T) {
if !tu.IsCI() {
t.Parallel()
}
testutil.DockerCompatible(t)

taskCfg := newTaskConfig("", []string{"/bin/sleep", "1000"})
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "nc-demo",
AllocID: uuid.Generate(),
Resources: basicResources,
}
require.NoError(t, task.EncodeConcreteDriverConfig(&taskCfg))

d := dockerDriverHarness(t, nil)
cleanup := d.MkAllocDir(task, true)
defer cleanup()
copyImage(t, task.TaskDir(), "busybox.tar")

_, _, err := d.StartTask(task)
require.NoError(t, err)

defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, d, task.ID)

}

0 comments on commit a8e460a

Please sign in to comment.