Skip to content

Commit

Permalink
cli: recover from client ACL lookup failures
Browse files Browse the repository at this point in the history
This fixes a bug in the CLI handling of node lookup failures when
querying allocation and FS endpoints.

Allocation and FS endpoint are handled by the client; one can query the
relevant client directly, or query a server to have it forwarded
transparently to relevant client.  Querying the client directly is
benefecial to avoid loading servers with IO.

As an optimization, the CLI attempts to query the client directly, but
then falls back to using server forwarding path if it encounters network
or connection errors (e.g. clients are locked down or in a separate
inaccessible network).

Here, we fix a bug where if the CLI fails to find to lookup the client
details because it lacks ACL capability or other unexpected reasons, the
CLI will not go through fallback path.
  • Loading branch information
Mahmood Ali committed Oct 4, 2019
1 parent 550e1b1 commit 298c528
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 129 deletions.
19 changes: 8 additions & 11 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context,

func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}
nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)

if q == nil {
q = &QueryOptions{}
Expand All @@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)

conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
var conn *websocket.Conn

if nodeClient != nil {
conn, _, err = nodeClient.websocket(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
errCh <- err
return nil, nil
}
}

if conn == nil {
conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
Expand Down
179 changes: 61 additions & 118 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)

reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
})
}

// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
})
}

// Stream streams the content of a file blocking on EOF.
Expand All @@ -172,38 +124,17 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin

reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
})
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
errCh <- err
return nil, errCh
}

// Create the output channel
Expand Down Expand Up @@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, errCh
}

func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
if customizeQ != nil {
customizeQ(q)
}

var r io.ReadCloser
var err error

if nodeClient != nil {
r, err = nodeClient.rawQuery(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
// found a non networking error talking to client directly
return nil, err
}

}

// failed to query node, access through server directly
// or network error when talking to the client directly
if r == nil {
return c.rawQuery(reqPath, q)
}

return r, err
}

// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
Expand All @@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str

errCh := make(chan error, 1)

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
})
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)

reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

Expand Down

0 comments on commit 298c528

Please sign in to comment.