Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: recover from client ACL lookup failures #6423

Merged
merged 1 commit into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context,

func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}
nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment about logging this error


if q == nil {
q = &QueryOptions{}
Expand All @@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)

conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
var conn *websocket.Conn

if nodeClient != nil {
conn, _, err = nodeClient.websocket(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
errCh <- err
return nil, nil
}
}

if conn == nil {
conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
Expand Down
179 changes: 61 additions & 118 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)

reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
})
}

// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
})
}

// Stream streams the content of a file blocking on EOF.
Expand All @@ -172,38 +124,17 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin

reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
})
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
errCh <- err
return nil, errCh
}

// Create the output channel
Expand Down Expand Up @@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, errCh
}

func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we could debug log this error? I see a future engineer trying to debug why an api call isn't able to hit the client directly and this error would probably be the key bit of knowledge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to best handle it. We currently don't log anything in api package, and introducing logging might be confusing for cli tools, specially if eventually requests succeeds through server.


if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
if customizeQ != nil {
customizeQ(q)
}

var r io.ReadCloser
var err error

if nodeClient != nil {
r, err = nodeClient.rawQuery(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
// found a non networking error talking to client directly
return nil, err
}

}

// failed to query node, access through server directly
// or network error when talking to the client directly
if r == nil {
return c.rawQuery(reqPath, q)
}

return r, err
}

// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
Expand All @@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str

errCh := make(chan error, 1)

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
})
if err != nil {
errCh <- err
return nil, errCh
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)

reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

Expand Down