diff --git a/.changelog/18232.txt b/.changelog/18232.txt new file mode 100644 index 000000000000..c16f3a3d46a2 --- /dev/null +++ b/.changelog/18232.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: return 404 instead of 500 when trying to access logs and files from allocations that have been garbage collected +``` diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 659cf81258a9..821967e19a87 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -169,32 +169,42 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) + return + } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), + encoder, + ) return } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { - handleStreamResultError(err, pointer.Of(int64(403)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusForbidden)), encoder) return } else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadFS) { - handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder) + handleStreamResultError(structs.ErrPermissionDenied, pointer.Of(int64(http.StatusForbidden)), encoder) return } // Validate the arguments if req.Path == "" { - handleStreamResultError(pathNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(pathNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -202,15 +212,15 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -220,13 +230,13 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Calculate the offset fileInfo, err := fs.Stat(req.Path) if err != nil { - handleStreamResultError(err, pointer.Of(int64(400)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusBadRequest)), encoder) return } if fileInfo.IsDir { handleStreamResultError( fmt.Errorf("file %q is a directory", req.Path), - pointer.Of(int64(400)), encoder) + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -328,7 +338,7 @@ OUTER: } if streamErr != nil { - handleStreamResultError(streamErr, pointer.Of(int64(500)), encoder) + handleStreamResultError(streamErr, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } } @@ -344,19 +354,29 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { encoder := codec.NewEncoder(conn, structs.MsgpackHandle) if err := decoder.Decode(&req); err != nil { - handleStreamResultError(err, pointer.Of(int64(500)), encoder) + handleStreamResultError(err, pointer.Of(int64(http.StatusInternalServerError)), encoder) return } if req.AllocID == "" { - handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } - alloc, err := f.c.GetAlloc(req.AllocID) + + ar, err := f.c.getAllocRunner(req.AllocID) if err != nil { - handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder) + handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(http.StatusNotFound)), encoder) + return + } + if ar.IsDestroyed() { + handleStreamResultError( + fmt.Errorf("state for allocation %s not found on client", req.AllocID), + pointer.Of(int64(http.StatusNotFound)), + encoder, + ) return } + alloc := ar.Alloc() // Check read permissions if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil { @@ -373,13 +393,13 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { // Validate the arguments if req.Task == "" { - handleStreamResultError(taskNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(taskNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.LogType { case "stdout", "stderr": default: - handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(400)), encoder) + handleStreamResultError(logTypeNotPresentErr, pointer.Of(int64(http.StatusBadRequest)), encoder) return } switch req.Origin { @@ -387,15 +407,15 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { case "": req.Origin = "start" default: - handleStreamResultError(invalidOrigin, pointer.Of(int64(400)), encoder) + handleStreamResultError(invalidOrigin, pointer.Of(int64(http.StatusBadRequest)), encoder) return } fs, err := f.c.GetAllocFS(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -404,9 +424,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { allocState, err := f.c.GetAllocState(req.AllocID) if err != nil { - code := pointer.Of(int64(500)) + code := pointer.Of(int64(http.StatusInternalServerError)) if structs.IsErrUnknownAllocation(err) { - code = pointer.Of(int64(404)) + code = pointer.Of(int64(http.StatusNotFound)) } handleStreamResultError(err, code, encoder) @@ -418,7 +438,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState == nil { handleStreamResultError( fmt.Errorf("unknown task name %q", req.Task), - pointer.Of(int64(400)), + pointer.Of(int64(http.StatusBadRequest)), encoder) return } @@ -426,7 +446,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { if taskState.StartedAt.IsZero() { handleStreamResultError( fmt.Errorf("task %q not started yet. No logs available", req.Task), - pointer.Of(int64(404)), + pointer.Of(int64(http.StatusNotFound)), encoder) return } @@ -512,7 +532,7 @@ OUTER: if streamErr != nil { // If error has a Code, use it - var code int64 = 500 + var code int64 = http.StatusInternalServerError if codedErr, ok := streamErr.(interface{ Code() int }); ok { code = int64(codedErr.Code()) } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index 4f53efea9c9d..5ebe1930e7d7 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -9,6 +9,7 @@ import ( "io" "math" "net" + "net/http" "os" "path/filepath" "reflect" @@ -31,6 +32,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -394,6 +396,96 @@ OUTER: } } +// TestFS_Stream_GC asserts that reading files from an alloc that has been +// GC'ed from the client returns a 404 error. +func TestFS_Stream_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsStreamRequest{ + AllocID: alloc.ID, + Path: "alloc/logs/web.stdout.0", + PlainText: true, + Follow: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Stream") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "not found on client") + must.Eq(t, http.StatusNotFound, *msg.Error.Code) + return + } + } +} + func TestFS_Stream_ACL(t *testing.T) { ci.Parallel(t) @@ -1015,13 +1107,104 @@ func TestFS_Logs_TaskPending(t *testing.T) { case msg := <-streamMsg: require.NotNil(msg.Error) require.NotNil(msg.Error.Code) - require.EqualValues(404, *msg.Error.Code) + require.EqualValues(http.StatusNotFound, *msg.Error.Code) require.Contains(msg.Error.Message, "not started") return } } } +// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed +// from the client returns a 404 error. +func TestFS_Logs_GC(t *testing.T) { + ci.Parallel(t) + + // Start a server and client. + s, cleanupS := nomad.TestServer(t, nil) + t.Cleanup(cleanupS) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := TestClient(t, func(c *config.Config) { + c.Servers = []string{s.GetConfig().RPCAddr.String()} + }) + t.Cleanup(func() { cleanupC() }) + + job := mock.BatchJob() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "10s", + } + + // Wait for alloc to be running. + alloc := testutil.WaitForRunning(t, s.RPC, job)[0] + + // GC alloc from the client. + ar, err := c.getAllocRunner(alloc.ID) + must.NoError(t, err) + + c.garbageCollector.MarkForCollection(alloc.ID, ar) + must.True(t, c.CollectAllocation(alloc.ID)) + + // Build the request. + req := &cstructs.FsLogsRequest{ + AllocID: alloc.ID, + Task: job.TaskGroups[0].Tasks[0].Name, + LogType: "stdout", + Origin: "start", + PlainText: true, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler. + handler, err := c.StreamingRpcHandler("FileSystem.Logs") + must.NoError(t, err) + + // Create a pipe. + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler. + go handler(p2) + + // Start the decoder. + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request. + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.NoError(t, encoder.Encode(req)) + + for { + select { + case <-time.After(3 * time.Second): + t.Fatal("timeout") + case err := <-errCh: + t.Fatalf("unexpected stream error: %v", err) + case msg := <-streamMsg: + must.Error(t, msg.Error) + must.ErrorContains(t, msg.Error, "not found on client") + must.Eq(t, http.StatusNotFound, *msg.Error.Code) + return + } + } +} + func TestFS_Logs_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t)