Skip to content

Commit

Permalink
agent: add websocket handler for nomad exec
Browse files Browse the repository at this point in the history
This adds a websocket endpoint for handling `nomad exec`.

The endpoint is a websocket interface, as we require a bi-directional
streaming (to handle both input and output), which is not very appropriate for
plain HTTP 1.0. Using websocket makes implementing the web ui a bit simpler. I
considered using golang http hijack capability to treat http request as a plain
connection, but the web interface would be too complicated potentially.

Furthermore, the API endpoint operates against the raw core nomad exec streaming
datastructures, defined in protobuf, with json serializer.  Our APIs use json
interfaces in general, and protobuf generates json friendly golang structs.
Reusing the structs here simplify interface and reduce conversion overhead.
  • Loading branch information
Mahmood Ali committed Apr 30, 2019
1 parent 781b255 commit 58a4c71
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 0 deletions.
174 changes: 174 additions & 0 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package agent

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"

"github.com/golang/snappy"
"github.com/gorilla/websocket"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/ugorji/go/codec"
)

const (
Expand Down Expand Up @@ -129,6 +135,8 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
switch tokens[1] {
case "stats":
return s.allocStats(allocID, resp, req)
case "exec":
return s.allocExec(allocID, resp, req)
case "snapshot":
if s.agent.client == nil {
return nil, clientNotRunning
Expand Down Expand Up @@ -347,3 +355,169 @@ func (s *HTTPServer) allocStats(allocID string, resp http.ResponseWriter, req *h

return reply.Stats, rpcErr
}

func (s *HTTPServer) allocExec(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Build the request and parse the ACL token
task := req.URL.Query().Get("task")
cmdJsonStr := req.URL.Query().Get("command")
var command []string
json.Unmarshal([]byte(cmdJsonStr), &command)

tty := req.URL.Query().Get("tty")
args := cstructs.AllocExecRequest{
AllocID: allocID,
Task: task,
Cmd: command,
Tty: tty == "true",
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)

conn, err := s.wsUpgrader.Upgrade(resp, req, nil)
if err != nil {
return nil, fmt.Errorf("failed to upgrade connection: %v", err)
}

return s.execStreamImpl(conn, &args)
}

func (s *HTTPServer) execStreamImpl(ws *websocket.Conn, args *cstructs.AllocExecRequest) (interface{}, error) {
allocID := args.AllocID
method := "Allocations.Exec"

// Get the correct handler
localClient, remoteClient, localServer := s.rpcHandlerForAlloc(allocID)
var handler structs.StreamingRpcHandler
var handlerErr error
if localClient {
handler, handlerErr = s.agent.Client().StreamingRpcHandler(method)
} else if remoteClient {
handler, handlerErr = s.agent.Client().RemoteStreamingRpcHandler(method)
} else if localServer {
handler, handlerErr = s.agent.Server().StreamingRpcHandler(method)
}

if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}

// Create a pipe connecting the (possibly remote) handler to the http response
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)

// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(context.TODO())
go func() {
<-ctx.Done()
httpPipe.Close()

// don't close ws - wait to drain messages
}()

// Create a channel that decodes the results
errCh := make(chan HTTPCodedError)

// stream response
go func() {
defer cancel()

// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}

go forwardExecInput(encoder, ws, errCh)

for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}

var res cstructs.StreamErrWrapper
err := decoder.Decode(&res)
if isClosedError(err) {
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
errCh <- nil
return
}

if err != nil {
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)

if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}

if err := ws.WriteMessage(websocket.TextMessage, res.Payload); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()

handler(handlerPipe)
cancel()
codedErr := <-errCh

if isClosedError(codedErr) {
codedErr = nil
} else if codedErr != nil {
ws.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(toWsCode(codedErr.Code()), codedErr.Error()))
}
ws.Close()

return nil, codedErr
}

func toWsCode(httpCode int) int {
switch httpCode {
case 500:
return websocket.CloseInternalServerErr
default:
// placeholder error code
return websocket.ClosePolicyViolation
}
}

func isClosedError(err error) bool {
if err == nil {
return false
}

// TODO: disambiguite unexpectedly closed connections
return err == io.EOF ||
err == io.ErrClosedPipe ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "EOF")
}

func forwardExecInput(encoder *codec.Encoder, ws *websocket.Conn, errCh chan<- HTTPCodedError) {
for {
sf := &drivers.ExecTaskStreamingRequestMsg{}
err := ws.ReadJSON(sf)
if err == io.EOF {
return
}

if err != nil {
errCh <- CodedError(500, err.Error())
return
}

err = encoder.Encode(sf)
if err != nil {
errCh <- CodedError(500, err.Error())
}
}
}
9 changes: 9 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/NYTimes/gziphandler"
assetfs "github.com/elazarl/go-bindata-assetfs"
"github.com/gorilla/websocket"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -54,6 +55,8 @@ type HTTPServer struct {
listenerCh chan struct{}
logger log.Logger
Addr string

wsUpgrader *websocket.Upgrader
}

// NewHTTPServer starts new HTTP server over the agent
Expand Down Expand Up @@ -85,6 +88,11 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
// Create the mux
mux := http.NewServeMux()

wsUpgrader := &websocket.Upgrader{
ReadBufferSize: 2048,
WriteBufferSize: 2048,
}

// Create the server
srv := &HTTPServer{
agent: agent,
Expand All @@ -93,6 +101,7 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
listenerCh: make(chan struct{}),
logger: agent.httpLogger,
Addr: ln.Addr().String(),
wsUpgrader: wsUpgrader,
}
srv.registerHandlers(config.EnableDebug)

Expand Down

0 comments on commit 58a4c71

Please sign in to comment.