Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad exec part 1: plumbing and docker driver #5632

Merged
merged 15 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion acl/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
)
Expand Down Expand Up @@ -94,7 +96,8 @@ func isNamespaceCapabilityValid(cap string) bool {
switch cap {
case NamespaceCapabilityDeny, NamespaceCapabilityListJobs, NamespaceCapabilityReadJob,
NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle:
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec:
return true
// Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride:
Expand Down Expand Up @@ -123,6 +126,7 @@ func expandNamespacePolicy(policy string) []string {
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
}
default:
Expand Down
1 change: 1 addition & 0 deletions acl/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestParse(t *testing.T) {
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
},
},
Expand Down
264 changes: 264 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sort"
"strconv"
"sync"
"time"

"github.com/gorilla/websocket"
)

var (
Expand Down Expand Up @@ -61,6 +70,222 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
return &resp, qm, nil
}

// Exec is used to execute a command inside a running task. The command is to run inside
// the task environment.
//
// The parameters are:
// * ctx: context to set deadlines or timeout
// * allocation: the allocation to execute command inside
// * task: the task's name to execute command in
// * tty: indicates whether to start a pseudo-tty for the command
// * stdin, stdout, stderr: the std io to pass to command.
// If tty is true, then streams need to point to a tty that's alive for the whole process
// * terminalSizeCh: A channel to send new tty terminal sizes
//
// The call blocks until command terminates (or an error occurs), and returns the exit code.
func (a *Allocations) Exec(ctx context.Context,
notnoop marked this conversation as resolved.
Show resolved Hide resolved
alloc *Allocation, task string, tty bool, command []string,
stdin io.Reader, stdout, stderr io.Writer,
terminalSizeCh <-chan TerminalSize, q *QueryOptions) (exitCode int, err error) {

ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()

errCh := make(chan error, 4)

sender, output := a.execFrames(ctx, alloc, task, tty, command, errCh, q)

select {
case err := <-errCh:
return -2, err
default:
}

// Errors resulting from sending input (in goroutines) are silently dropped.
// To mitigate this, extra care is needed to distinguish between actual send errors
// and from send errors due to command terminating and our race to detect failures.
// If we have an actual network failure or send a bad input, we'd get an
// error in the reading side of websocket.

go func() {

bytes := make([]byte, 2048)
for {
if ctx.Err() != nil {
return
}

input := ExecStreamingInput{Stdin: &ExecStreamingIOOperation{}}

n, err := stdin.Read(bytes)

// always send data if we read some
if n != 0 {
input.Stdin.Data = bytes[:n]
sender(&input)
}

// then handle error
if err == io.EOF {
// if n != 0, send data and we'll get n = 0 on next read
if n == 0 {
input.Stdin.Close = true
sender(&input)
return
}
} else if err != nil {
errCh <- err
return
}
}
}()

// forwarding terminal size
go func() {
for {
resizeInput := ExecStreamingInput{}

select {
case <-ctx.Done():
return
case size, ok := <-terminalSizeCh:
if !ok {
return
}
resizeInput.TTYSize = &size
sender(&resizeInput)
schmichael marked this conversation as resolved.
Show resolved Hide resolved
}

}
}()

// send a heartbeat every 10 seconds
go func() {
for {
select {
case <-ctx.Done():
return
// heartbeat message
case <-time.After(10 * time.Second):
sender(&execStreamingInputHeartbeat)
schmichael marked this conversation as resolved.
Show resolved Hide resolved
}

}
}()

for {
select {
case err := <-errCh:
// drop websocket code, not relevant to user
if wsErr, ok := err.(*websocket.CloseError); ok && wsErr.Text != "" {
return -2, errors.New(wsErr.Text)
}
return -2, err
case <-ctx.Done():
return -2, ctx.Err()
case frame, ok := <-output:
if !ok {
return -2, errors.New("disconnected without receiving the exit code")
}

switch {
case frame.Stdout != nil:
if len(frame.Stdout.Data) != 0 {
stdout.Write(frame.Stdout.Data)
}
// don't really do anything if stdout is closing
case frame.Stderr != nil:
if len(frame.Stderr.Data) != 0 {
stderr.Write(frame.Stderr.Data)
}
// don't really do anything if stderr is closing
case frame.Exited && frame.Result != nil:
return frame.Result.ExitCode, nil
default:
// noop - heartbeat
}
}
}
}

func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

commandBytes, err := json.Marshal(command)
if err != nil {
errCh <- fmt.Errorf("failed to marshal command: %s", err)
return nil, nil
}

q.Params["tty"] = strconv.FormatBool(tty)
q.Params["task"] = task
q.Params["command"] = string(commandBytes)

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)

conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, nil
}

conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
return nil, nil
}
}

// Create the output channel
frames := make(chan *ExecStreamingOutput, 10)

go func() {
defer conn.Close()
for ctx.Err() == nil {

// Decode the next frame
var frame ExecStreamingOutput
err := conn.ReadJSON(&frame)
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
close(frames)
return
} else if err != nil {
errCh <- err
return
}

frames <- &frame
}
}()

var sendLock sync.Mutex
send := func(v *ExecStreamingInput) error {
sendLock.Lock()
defer sendLock.Unlock()

return conn.WriteJSON(v)
}

return send, frames

}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
var resp AllocResourceUsage
path := fmt.Sprintf("/v1/client/allocation/%s/stats", alloc.ID)
Expand Down Expand Up @@ -339,3 +564,42 @@ type DesiredTransition struct {
func (d DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}

// ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively)
type ExecStreamingIOOperation struct {
Data []byte `json:"data,omitempty"`
Close bool `json:"close,omitempty"`
}

// TerminalSize represents the size of the terminal
type TerminalSize struct {
Height int `json:"height,omitempty"`
Width int `json:"width,omitempty"`
}

var execStreamingInputHeartbeat = ExecStreamingInput{}

// ExecStreamingInput represents user input to be sent to nomad exec handler.
//
// At most one field should be set.
type ExecStreamingInput struct {
Stdin *ExecStreamingIOOperation `json:"stdin,omitempty"`
TTYSize *TerminalSize `json:"tty_size,omitempty"`
}

// ExecStreamingExitResults captures the exit code of just completed nomad exec command
type ExecStreamingExitResult struct {
ExitCode int `json:"exit_code"`
}

// ExecStreamingInput represents an output streaming entity, e.g. stdout/stderr update or termination
//
// At most one of these fields should be set: `Stdout`, `Stderr`, or `Result`.
// If `Exited` is true, then `Result` is non-nil, and other fields are nil.
type ExecStreamingOutput struct {
Stdout *ExecStreamingIOOperation `json:"stdout,omitempty"`
Stderr *ExecStreamingIOOperation `json:"stderr,omitempty"`

Exited bool `json:"exited,omitempty"`
Result *ExecStreamingExitResult `json:"result,omitempty"`
}
58 changes: 58 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"github.com/gorilla/websocket"
cleanhttp "github.com/hashicorp/go-cleanhttp"
rootcerts "github.com/hashicorp/go-rootcerts"
)
Expand Down Expand Up @@ -655,6 +656,63 @@ func (c *Client) rawQuery(endpoint string, q *QueryOptions) (io.ReadCloser, erro
return resp.Body, nil
}

// websocket makes a websocket request to the specific endpoint
func (c *Client) websocket(endpoint string, q *QueryOptions) (*websocket.Conn, *http.Response, error) {
schmichael marked this conversation as resolved.
Show resolved Hide resolved

transport, ok := c.config.httpClient.Transport.(*http.Transport)
if !ok {
return nil, nil, fmt.Errorf("unsupported transport")
}
dialer := websocket.Dialer{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
HandshakeTimeout: c.config.httpClient.Timeout,

// values to inherit from http client configuration
NetDial: transport.Dial,
NetDialContext: transport.DialContext,
Proxy: transport.Proxy,
TLSClientConfig: transport.TLSClientConfig,
}

// build request object for header and parameters
r, err := c.newRequest("GET", endpoint)
if err != nil {
return nil, nil, err
}
r.setQueryOptions(q)

rhttp, err := r.toHTTP()
if err != nil {
return nil, nil, err
}

// convert scheme
wsScheme := ""
switch rhttp.URL.Scheme {
case "http":
wsScheme = "ws"
case "https":
wsScheme = "wss"
default:
return nil, nil, fmt.Errorf("unsupported scheme: %v", rhttp.URL.Scheme)
}
rhttp.URL.Scheme = wsScheme

conn, resp, err := dialer.Dial(rhttp.URL.String(), rhttp.Header)
schmichael marked this conversation as resolved.
Show resolved Hide resolved

// check resp status code, as it's more informative than handshake error we get from ws library
if resp != nil && resp.StatusCode != 101 {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
resp.Body.Close()

return nil, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
}

return conn, resp, err
}

// query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Nomad conventions.
Expand Down
Loading