Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad exec part 3: executor based drivers #5634

Merged
merged 5 commits into from
May 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions drivers/exec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,22 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
},
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have atleast one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}
30 changes: 30 additions & 0 deletions drivers/exec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,33 @@ func TestExecDriver_StartWaitStop(t *testing.T) {

require.NoError(harness.DestroyTask(task.ID, true))
}

func TestExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
require := require.New(t)

d := NewExecDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

tc := &TaskConfig{
Command: "/bin/sleep",
Args: []string{"9000"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
19 changes: 19 additions & 0 deletions drivers/java/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,25 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have atleast one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}

// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
Expand Down
29 changes: 29 additions & 0 deletions drivers/java/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ func TestJavaCmdArgs(t *testing.T) {
}
}

func TestJavaDriver_ExecTaskStreaming(t *testing.T) {
javaCompatible(t)
if !testutil.IsCI() {
t.Parallel()
}

require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

tc := &TaskConfig{
Class: "Hello",
Args: []string{"900"},
}
task := basicTask(t, "demo-app", tc)

cleanup := harness.MkAllocDir(task, true)
defer cleanup()

copyFile("./test-resources/Hello.class", filepath.Join(task.TaskDir().Dir, "Hello.class"), t)

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
func basicTask(t *testing.T, name string, taskConfig *TaskConfig) *drivers.TaskConfig {
t.Helper()

Expand Down
19 changes: 19 additions & 0 deletions drivers/rawexec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,22 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
},
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}
34 changes: 34 additions & 0 deletions drivers/rawexec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,37 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) {

require.NoError(harness.DestroyTask(task.ID, true))
}

func TestRawExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
if runtime.GOOS == "darwin" {
t.Skip("skip running exec tasks on darwin as darwin has restrictions on starting tty shells")
}
require := require.New(t)

d := NewRawExecDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

tc := &TaskConfig{
Command: testtask.Path(),
Args: []string{"sleep", "9000s"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))
testtask.SetTaskConfigEnv(task)

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
73 changes: 73 additions & 0 deletions drivers/shared/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
"github.com/hashicorp/nomad/plugins/drivers"
dproto "github.com/hashicorp/nomad/plugins/drivers/proto"
)

var _ Executor = (*grpcExecutorClient)(nil)
Expand Down Expand Up @@ -181,3 +183,74 @@ func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string)

return resp.Output, int(resp.ExitCode), nil
}

func (d *grpcExecutorClient) ExecStreaming(ctx context.Context,
command []string,
tty bool,
execStream drivers.ExecTaskStream) error {

err := d.execStreaming(ctx, command, tty, execStream)
if err != nil {
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
return nil
}

func (d *grpcExecutorClient) execStreaming(ctx context.Context,
command []string,
tty bool,
execStream drivers.ExecTaskStream) error {

stream, err := d.client.ExecStreaming(ctx)
if err != nil {
return err
}

err = stream.Send(&dproto.ExecTaskStreamingRequest{
Setup: &dproto.ExecTaskStreamingRequest_Setup{
Command: command,
Tty: tty,
},
})
if err != nil {
return err
}

errCh := make(chan error, 1)
go func() {
for {
m, err := execStream.Recv()
if err == io.EOF {
return
} else if err != nil {
errCh <- err
return
}

if err := stream.Send(m); err != nil {
errCh <- err
return
}

}
}()

for {
select {
case err := <-errCh:
return err
default:
}

m, err := stream.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}

if err := execStream.Send(m); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all returns need grpcutils.HandleGrpcErr? I'm a little fuzzy on where that needs to be called.

If all returns do need that check maybe just wrap the whole method in a shim to do the check?

func (d *grpcExecutorClient) ExecStreaming(...) error {
  if err := d.execStreaming(...); err != nil {
    return grpcutils.HandleGrpcErr(err, d.doneCtx)
  }
}

// actual impl  in execStreaming method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't call grpcutils.HandleGrpcErr anywhere here. It seems mostly for detecting shutdown and being able to handle it (by restarting, etc), in this case, we don't want to handle executor shutdown any specially.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! execStream is "this" (the driver) side whereas the HandleGrpcErr calls above are for remote calls to executors? I guess I'm still confused why Recv wouldn't call HandleGrpcErr as its the executor side...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see - I missed it - i'll add it.

}
}
}
Loading