From e7055585bd4d57bbb136de1da8084f6c17eefd70 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 16 Aug 2023 19:41:02 -0400 Subject: [PATCH 1/7] client: 404 when accessing files for GC'ed alloc When an allocation is garbage collected from the client, but not from the servers, the API request is routed to the client and the client does attempt to read the file, but the alloc dir has already been deleted, resulting in a 500 error. This happens because the client GC only destroys the alloc runner (deleting the alloc dir), but it keeps a reference to the alloc runner until the alloc is garbage collected from the servers as well. This commit adjusts this logic by checking if the alloc runner (and the alloc files) has been destroyed, returning a 404 if so. --- client/fs_endpoint.go | 22 ++++- client/fs_endpoint_test.go | 182 +++++++++++++++++++++++++++++++++++++ 2 files changed, 202 insertions(+), 2 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 659cf81258a9..09426441efd0 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -177,11 +177,20 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) return } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("allocation %s destroyed from client", req.AllocID), + pointer.Of(int64(404)), + encoder, + ) + } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { @@ -352,11 +361,20 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) return } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("allocation %s destroyed from client", req.AllocID), + pointer.Of(int64(404)), + encoder, + ) + } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4f53efea9c9d..52ff44ebe249 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -31,6 +31,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -394,6 +395,96 @@ OUTER: } } +// TestFS_Stream_GC asserts that reading files from an alloc that has been +// GC'ed from the client returns a 404 error. +func TestFS_Stream_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsStreamRequest{ + AllocID: alloc.ID, + Path: "alloc/logs/web.stdout.0", + PlainText: true, + Follow: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Stream") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "destroyed from client") + must.Eq(t, 404, *msg.Error.Code) + return + } + } +} + func TestFS_Stream_ACL(t *testing.T) { ci.Parallel(t) @@ -1022,6 +1113,97 @@ func TestFS_Logs_TaskPending(t *testing.T) { } } +// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed +// from the client returns a 404 error. +func TestFS_Logs_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsLogsRequest{ + AllocID: alloc.ID, + Task: job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request. + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatalf("unexpected stream error: %v", err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "destroyed from client") + must.Eq(t, 404, *msg.Error.Code) + return + } + } +} + func TestFS_Logs_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t) From 23d06cdd1a28573946106b4ceb31ccdeb39d1202 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 16 Aug 2023 20:07:16 -0400 Subject: [PATCH 2/7] changelog: add entry for #18232 --- .changelog/18232.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/18232.txt diff --git a/.changelog/18232.txt b/.changelog/18232.txt new file mode 100644 index 000000000000..553520d24ab6 --- /dev/null +++ b/.changelog/18232.txt @@ -0,0 +1,3 @@ +```release-note:improvement +client: return 404 when trying to access logs and files from allocations that have been garbage collected +``` From eb0a0877f92fd44578e1844c8aea865064c69da5 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Aug 2023 11:47:02 -0400 Subject: [PATCH 3/7] adjust error message on client gc alloc --- client/fs_endpoint.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 09426441efd0..55ec8360872b 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -185,8 +185,8 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { } if ar.IsDestroyed() { handleStreamResultError( - fmt.Errorf("allocation %s destroyed from client", req.AllocID), - pointer.Of(int64(404)), + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), encoder, ) } @@ -369,8 +369,8 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { } if ar.IsDestroyed() { handleStreamResultError( - fmt.Errorf("allocation %s destroyed from client", req.AllocID), - pointer.Of(int64(404)), + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), encoder, ) } From 48a5407d5fb614f1b282d2374d6e46b977807e83 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Aug 2023 11:53:13 -0400 Subject: [PATCH 4/7] use net status constants instead of int --- client/fs_endpoint.go | 50 +++++++++++++++++++------------------- client/fs_endpoint_test.go | 7 +++--- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 55ec8360872b..459ed48e4b28 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -169,18 +169,18 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) return } if ar.IsDestroyed() { @@ -194,16 +194,16 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { - handleStreamResultError(err, pointer.Of(int64(403)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder) return } else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) { - handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) + handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder) return } // Validate the arguments if req.Path == "" { - handleStreamResultError(pathNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -211,15 +211,15 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -229,13 +229,13 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Calculate the offset fileInfo, err := fs.Stat(req.Path) if err != nil { - handleStreamResultError(err, pointer.Of(int64(400)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder) return } if fileInfo.IsDir { handleStreamResultError( fmt.Errorf("file %q is a directory", req.Path), - pointer.Of(int64(400)), encoder) + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -337,7 +337,7 @@ OUTER: } if streamErr != nil { - handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) + handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } } @@ -353,18 +353,18 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) return } if ar.IsDestroyed() { @@ -391,13 +391,13 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { // Validate the arguments if req.Task == "" { - handleStreamResultError(taskNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.LogType { case "stdout", "stderr": default: - handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -405,15 +405,15 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -422,9 +422,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { allocState, err := f.c.GetAllocState(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -436,7 +436,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState == nil { handleStreamResultError( fmt.Errorf("unknown task name %q", req.Task), - pointer.Of(int64(400)), + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -444,7 +444,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState.StartedAt.IsZero() { handleStreamResultError( fmt.Errorf("task %q not started yet. No logs available", req.Task), - pointer.Of(int64(404)), + pointer.Of(int64(http.StatusNotFound)), encoder) return } @@ -530,7 +530,7 @@ OUTER: if streamErr != nil { // If error has a Code, use it - var code int64 = 500 + var code int64 = http.StatusInternalServerError if codedErr, ok := streamErr.(interface{ Code() int }); ok { code = int64(codedErr.Code()) } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 52ff44ebe249..47cb353e0590 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -9,6 +9,7 @@ import ( "io" "math" "net" + "net/http" "os" "path/filepath" "reflect" @@ -479,7 +480,7 @@ func TestFS_Stream_GC(t *testing.T) { case msg := <-streamMsg: must.Error(t, msg.Error) must.ErrorContains(t, msg.Error, "destroyed from client") - must.Eq(t, 404, *msg.Error.Code) + must.Eq(t, http.StatusNotFound, *msg.Error.Code) return } } @@ -1106,7 +1107,7 @@ func TestFS_Logs_TaskPending(t *testing.T) { case msg := <-streamMsg: require.NotNil(msg.Error) require.NotNil(msg.Error.Code) - require.EqualValues(404, *msg.Error.Code) + require.EqualValues(http.StatusNotFound, *msg.Error.Code) require.Contains(msg.Error.Message, "not started") return } @@ -1198,7 +1199,7 @@ func TestFS_Logs_GC(t *testing.T) { case msg := <-streamMsg: must.Error(t, msg.Error) must.ErrorContains(t, msg.Error, "destroyed from client") - must.Eq(t, 404, *msg.Error.Code) + must.Eq(t, http.StatusNotFound, *msg.Error.Code) return } } From c14d770057f6eef2cd4c773837f63f25b345934b Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Aug 2023 15:50:12 -0400 Subject: [PATCH 5/7] client: return early in case of error --- client/fs_endpoint.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 459ed48e4b28..821967e19a87 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -189,6 +189,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { pointer.Of(int64(http.StatusNotFound)), encoder, ) + return } alloc := ar.Alloc() @@ -373,6 +374,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { pointer.Of(int64(http.StatusNotFound)), encoder, ) + return } alloc := ar.Alloc() From 36e48e150ab0a847507e914ea3bc0b105714c7dd Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Aug 2023 17:42:53 -0400 Subject: [PATCH 6/7] fix tests --- client/fs_endpoint_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 47cb353e0590..5ebe1930e7d7 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -479,7 +479,7 @@ func TestFS_Stream_GC(t *testing.T) { t.Fatal(err) case msg := <-streamMsg: must.Error(t, msg.Error) - must.ErrorContains(t, msg.Error, "destroyed from client") + must.ErrorContains(t, msg.Error, "not found on client") must.Eq(t, http.StatusNotFound, *msg.Error.Code) return } @@ -1198,7 +1198,7 @@ func TestFS_Logs_GC(t *testing.T) { t.Fatalf("unexpected stream error: %v", err) case msg := <-streamMsg: must.Error(t, msg.Error) - must.ErrorContains(t, msg.Error, "destroyed from client") + must.ErrorContains(t, msg.Error, "not found on client") must.Eq(t, http.StatusNotFound, *msg.Error.Code) return } From 15f943066374858faad0773b8dfbce7c19b16a8f Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Mon, 21 Aug 2023 16:06:50 -0400 Subject: [PATCH 7/7] changelog: adjust entry for #18232 --- .changelog/18232.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changelog/18232.txt b/.changelog/18232.txt index 553520d24ab6..c16f3a3d46a2 100644 --- a/.changelog/18232.txt +++ b/.changelog/18232.txt @@ -1,3 +1,3 @@ -```release-note:improvement -client: return 404 when trying to access logs and files from allocations that have been garbage collected +```release-note:bug +client: return 404 instead of 500 when trying to access logs and files from allocations that have been garbage collected ```