Skip to content

Commit

Permalink
Merge pull request #6423 from hashicorp/b-direct-node-failure
Browse files Browse the repository at this point in the history
cli: recover from client ACL lookup failures
  • Loading branch information
Mahmood Ali committed Oct 15, 2019
2 parents da832a5 + 298c528 commit bf91e83
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 129 deletions.
19 changes: 8 additions & 11 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context,

func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}
nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)

if q == nil {
q = &QueryOptions{}
Expand All @@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)

conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
var conn *websocket.Conn

if nodeClient != nil {
conn, _, err = nodeClient.websocket(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
errCh <- err
return nil, nil
}
}

if conn == nil {
conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
Expand Down
179 changes: 61 additions & 118 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)

reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
})
}

// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
})
}

// Stream streams the content of a file blocking on EOF.
Expand All @@ -172,38 +124,17 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin

reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
})
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
errCh <- err
return nil, errCh
}

// Create the output channel
Expand Down Expand Up @@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, errCh
}

func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
if customizeQ != nil {
customizeQ(q)
}

var r io.ReadCloser
var err error

if nodeClient != nil {
r, err = nodeClient.rawQuery(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
// found a non networking error talking to client directly
return nil, err
}

}

// failed to query node, access through server directly
// or network error when talking to the client directly
if r == nil {
return c.rawQuery(reqPath, q)
}

return r, err
}

// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
Expand All @@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str

errCh := make(chan error, 1)

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
})
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)

reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

Expand Down

0 comments on commit bf91e83

Please sign in to comment.