Skip to content

Commit

Permalink
Merge pull request #5632 from hashicorp/f-nomad-exec-parts-01-base
Browse files Browse the repository at this point in the history
nomad exec part 1: plumbing and docker driver
  • Loading branch information
Mahmood Ali committed May 9, 2019
2 parents ac76fc2 + 980e4f5 commit 5abbee5
Show file tree
Hide file tree
Showing 65 changed files with 7,604 additions and 478 deletions.
6 changes: 5 additions & 1 deletion acl/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
)
Expand Down Expand Up @@ -94,7 +96,8 @@ func isNamespaceCapabilityValid(cap string) bool {
switch cap {
case NamespaceCapabilityDeny, NamespaceCapabilityListJobs, NamespaceCapabilityReadJob,
NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle:
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec:
return true
// Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride:
Expand Down Expand Up @@ -123,6 +126,7 @@ func expandNamespacePolicy(policy string) []string {
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
}
default:
Expand Down
1 change: 1 addition & 0 deletions acl/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestParse(t *testing.T) {
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS,
NamespaceCapabilityAllocExec,
NamespaceCapabilityAllocLifecycle,
},
},
Expand Down
264 changes: 264 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sort"
"strconv"
"sync"
"time"

"github.com/gorilla/websocket"
)

var (
Expand Down Expand Up @@ -61,6 +70,222 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
return &resp, qm, nil
}

// Exec is used to execute a command inside a running task. The command is to run inside
// the task environment.
//
// The parameters are:
// * ctx: context to set deadlines or timeout
// * allocation: the allocation to execute command inside
// * task: the task's name to execute command in
// * tty: indicates whether to start a pseudo-tty for the command
// * stdin, stdout, stderr: the std io to pass to command.
// If tty is true, then streams need to point to a tty that's alive for the whole process
// * terminalSizeCh: A channel to send new tty terminal sizes
//
// The call blocks until command terminates (or an error occurs), and returns the exit code.
func (a *Allocations) Exec(ctx context.Context,
alloc *Allocation, task string, tty bool, command []string,
stdin io.Reader, stdout, stderr io.Writer,
terminalSizeCh <-chan TerminalSize, q *QueryOptions) (exitCode int, err error) {

ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()

errCh := make(chan error, 4)

sender, output := a.execFrames(ctx, alloc, task, tty, command, errCh, q)

select {
case err := <-errCh:
return -2, err
default:
}

// Errors resulting from sending input (in goroutines) are silently dropped.
// To mitigate this, extra care is needed to distinguish between actual send errors
// and from send errors due to command terminating and our race to detect failures.
// If we have an actual network failure or send a bad input, we'd get an
// error in the reading side of websocket.

go func() {

bytes := make([]byte, 2048)
for {
if ctx.Err() != nil {
return
}

input := ExecStreamingInput{Stdin: &ExecStreamingIOOperation{}}

n, err := stdin.Read(bytes)

// always send data if we read some
if n != 0 {
input.Stdin.Data = bytes[:n]
sender(&input)
}

// then handle error
if err == io.EOF {
// if n != 0, send data and we'll get n = 0 on next read
if n == 0 {
input.Stdin.Close = true
sender(&input)
return
}
} else if err != nil {
errCh <- err
return
}
}
}()

// forwarding terminal size
go func() {
for {
resizeInput := ExecStreamingInput{}

select {
case <-ctx.Done():
return
case size, ok := <-terminalSizeCh:
if !ok {
return
}
resizeInput.TTYSize = &size
sender(&resizeInput)
}

}
}()

// send a heartbeat every 10 seconds
go func() {
for {
select {
case <-ctx.Done():
return
// heartbeat message
case <-time.After(10 * time.Second):
sender(&execStreamingInputHeartbeat)
}

}
}()

for {
select {
case err := <-errCh:
// drop websocket code, not relevant to user
if wsErr, ok := err.(*websocket.CloseError); ok && wsErr.Text != "" {
return -2, errors.New(wsErr.Text)
}
return -2, err
case <-ctx.Done():
return -2, ctx.Err()
case frame, ok := <-output:
if !ok {
return -2, errors.New("disconnected without receiving the exit code")
}

switch {
case frame.Stdout != nil:
if len(frame.Stdout.Data) != 0 {
stdout.Write(frame.Stdout.Data)
}
// don't really do anything if stdout is closing
case frame.Stderr != nil:
if len(frame.Stderr.Data) != 0 {
stderr.Write(frame.Stderr.Data)
}
// don't really do anything if stderr is closing
case frame.Exited && frame.Result != nil:
return frame.Result.ExitCode, nil
default:
// noop - heartbeat
}
}
}
}

func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

commandBytes, err := json.Marshal(command)
if err != nil {
errCh <- fmt.Errorf("failed to marshal command: %s", err)
return nil, nil
}

q.Params["tty"] = strconv.FormatBool(tty)
q.Params["task"] = task
q.Params["command"] = string(commandBytes)

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)

conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, nil
}

conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
return nil, nil
}
}

// Create the output channel
frames := make(chan *ExecStreamingOutput, 10)

go func() {
defer conn.Close()
for ctx.Err() == nil {

// Decode the next frame
var frame ExecStreamingOutput
err := conn.ReadJSON(&frame)
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
close(frames)
return
} else if err != nil {
errCh <- err
return
}

frames <- &frame
}
}()

var sendLock sync.Mutex
send := func(v *ExecStreamingInput) error {
sendLock.Lock()
defer sendLock.Unlock()

return conn.WriteJSON(v)
}

return send, frames

}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
var resp AllocResourceUsage
path := fmt.Sprintf("/v1/client/allocation/%s/stats", alloc.ID)
Expand Down Expand Up @@ -339,3 +564,42 @@ type DesiredTransition struct {
func (d DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}

// ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively)
type ExecStreamingIOOperation struct {
Data []byte `json:"data,omitempty"`
Close bool `json:"close,omitempty"`
}

// TerminalSize represents the size of the terminal
type TerminalSize struct {
Height int `json:"height,omitempty"`
Width int `json:"width,omitempty"`
}

var execStreamingInputHeartbeat = ExecStreamingInput{}

// ExecStreamingInput represents user input to be sent to nomad exec handler.
//
// At most one field should be set.
type ExecStreamingInput struct {
Stdin *ExecStreamingIOOperation `json:"stdin,omitempty"`
TTYSize *TerminalSize `json:"tty_size,omitempty"`
}

// ExecStreamingExitResults captures the exit code of just completed nomad exec command
type ExecStreamingExitResult struct {
ExitCode int `json:"exit_code"`
}

// ExecStreamingInput represents an output streaming entity, e.g. stdout/stderr update or termination
//
// At most one of these fields should be set: `Stdout`, `Stderr`, or `Result`.
// If `Exited` is true, then `Result` is non-nil, and other fields are nil.
type ExecStreamingOutput struct {
Stdout *ExecStreamingIOOperation `json:"stdout,omitempty"`
Stderr *ExecStreamingIOOperation `json:"stderr,omitempty"`

Exited bool `json:"exited,omitempty"`
Result *ExecStreamingExitResult `json:"result,omitempty"`
}
58 changes: 58 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"github.com/gorilla/websocket"
cleanhttp "github.com/hashicorp/go-cleanhttp"
rootcerts "github.com/hashicorp/go-rootcerts"
)
Expand Down Expand Up @@ -655,6 +656,63 @@ func (c *Client) rawQuery(endpoint string, q *QueryOptions) (io.ReadCloser, erro
return resp.Body, nil
}

// websocket makes a websocket request to the specific endpoint
func (c *Client) websocket(endpoint string, q *QueryOptions) (*websocket.Conn, *http.Response, error) {

transport, ok := c.config.httpClient.Transport.(*http.Transport)
if !ok {
return nil, nil, fmt.Errorf("unsupported transport")
}
dialer := websocket.Dialer{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
HandshakeTimeout: c.config.httpClient.Timeout,

// values to inherit from http client configuration
NetDial: transport.Dial,
NetDialContext: transport.DialContext,
Proxy: transport.Proxy,
TLSClientConfig: transport.TLSClientConfig,
}

// build request object for header and parameters
r, err := c.newRequest("GET", endpoint)
if err != nil {
return nil, nil, err
}
r.setQueryOptions(q)

rhttp, err := r.toHTTP()
if err != nil {
return nil, nil, err
}

// convert scheme
wsScheme := ""
switch rhttp.URL.Scheme {
case "http":
wsScheme = "ws"
case "https":
wsScheme = "wss"
default:
return nil, nil, fmt.Errorf("unsupported scheme: %v", rhttp.URL.Scheme)
}
rhttp.URL.Scheme = wsScheme

conn, resp, err := dialer.Dial(rhttp.URL.String(), rhttp.Header)

// check resp status code, as it's more informative than handshake error we get from ws library
if resp != nil && resp.StatusCode != 101 {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
resp.Body.Close()

return nil, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
}

return conn, resp, err
}

// query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Nomad conventions.
Expand Down
Loading

0 comments on commit 5abbee5

Please sign in to comment.