From 14a38bee7bc4386e74157f6a99f3db7382d7e6a5 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 21 Aug 2023 16:09:24 -0400 Subject: [PATCH] client: 404 when accessing files for GC'ed alloc (#18232) When an allocation is garbage collected from the client, but not from the servers, the API request is routed to the client and the client does attempt to read the file, but the alloc dir has already been deleted, resulting in a 500 error. This happens because the client GC only destroys the alloc runner (deleting the alloc dir), but it keeps a reference to the alloc runner until the alloc is garbage collected from the servers as well. This commit adjusts this logic by checking if the alloc runner (and the alloc files) has been destroyed, returning a 404 if so. --- .changelog/18232.txt | 3 + client/fs_endpoint.go | 74 +++++++++------ client/fs_endpoint_test.go | 185 ++++++++++++++++++++++++++++++++++++- 3 files changed, 234 insertions(+), 28 deletions(-) create mode 100644 .changelog/18232.txt diff --git a/.changelog/18232.txt b/.changelog/18232.txt new file mode 100644 index 000000000000..c16f3a3d46a2 --- /dev/null +++ b/.changelog/18232.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: return 404 instead of 500 when trying to access logs and files from allocations that have been garbage collected +``` diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 659cf81258a9..821967e19a87 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -169,32 +169,42 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) + return + } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), + encoder, + ) return } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { - handleStreamResultError(err, pointer.Of(int64(403)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder) return } else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) { - handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) + handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder) return } // Validate the arguments if req.Path == "" { - handleStreamResultError(pathNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -202,15 +212,15 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -220,13 +230,13 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Calculate the offset fileInfo, err := fs.Stat(req.Path) if err != nil { - handleStreamResultError(err, pointer.Of(int64(400)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder) return } if fileInfo.IsDir { handleStreamResultError( fmt.Errorf("file %q is a directory", req.Path), - pointer.Of(int64(400)), encoder) + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -328,7 +338,7 @@ OUTER: } if streamErr != nil { - handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) + handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } } @@ -344,19 +354,29 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) + return + } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), + encoder, + ) return } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { @@ -373,13 +393,13 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { // Validate the arguments if req.Task == "" { - handleStreamResultError(taskNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.LogType { case "stdout", "stderr": default: - handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -387,15 +407,15 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -404,9 +424,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { allocState, err := f.c.GetAllocState(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -418,7 +438,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState == nil { handleStreamResultError( fmt.Errorf("unknown task name %q", req.Task), - pointer.Of(int64(400)), + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -426,7 +446,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState.StartedAt.IsZero() { handleStreamResultError( fmt.Errorf("task %q not started yet. No logs available", req.Task), - pointer.Of(int64(404)), + pointer.Of(int64(http.StatusNotFound)), encoder) return } @@ -512,7 +532,7 @@ OUTER: if streamErr != nil { // If error has a Code, use it - var code int64 = 500 + var code int64 = http.StatusInternalServerError if codedErr, ok := streamErr.(interface{ Code() int }); ok { code = int64(codedErr.Code()) } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4f53efea9c9d..5ebe1930e7d7 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -9,6 +9,7 @@ import ( "io" "math" "net" + "net/http" "os" "path/filepath" "reflect" @@ -31,6 +32,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -394,6 +396,96 @@ OUTER: } } +// TestFS_Stream_GC asserts that reading files from an alloc that has been +// GC'ed from the client returns a 404 error. +func TestFS_Stream_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsStreamRequest{ + AllocID: alloc.ID, + Path: "alloc/logs/web.stdout.0", + PlainText: true, + Follow: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Stream") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "not found on client") + must.Eq(t, http.StatusNotFound, *msg.Error.Code) + return + } + } +} + func TestFS_Stream_ACL(t *testing.T) { ci.Parallel(t) @@ -1015,13 +1107,104 @@ func TestFS_Logs_TaskPending(t *testing.T) { case msg := <-streamMsg: require.NotNil(msg.Error) require.NotNil(msg.Error.Code) - require.EqualValues(404, *msg.Error.Code) + require.EqualValues(http.StatusNotFound, *msg.Error.Code) require.Contains(msg.Error.Message, "not started") return } } } +// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed +// from the client returns a 404 error. +func TestFS_Logs_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsLogsRequest{ + AllocID: alloc.ID, + Task: job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request. + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatalf("unexpected stream error: %v", err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "not found on client") + must.Eq(t, http.StatusNotFound, *msg.Error.Code) + return + } + } +} + func TestFS_Logs_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t)