Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: 404 when accessing files for GC'ed alloc #18232

Merged
merged 7 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18232.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: return 404 when trying to access logs and files from allocations that have been garbage collected
```
22 changes: 20 additions & 2 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,20 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder)
return
}
alloc, err := f.c.GetAlloc(req.AllocID)

ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder)
return
}
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("allocation %s destroyed from client", req.AllocID),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the best message to return. I think GC is only one of the reasons the alloc runner may have been destroyed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if something along the lines of "allocation %s not found on client" would be better? Destroyed seems like an internal aspect of the client which operators might find confusing to the idea that an alloc is not found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't happy with the message. You suggestion is good, I just adjusted it to state for allocation %s not found on client because, weirdly, the alloc is still there, it's just that the GC deletes the state (alloc runner)

pointer.Of(int64(404)),
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
encoder,
)
}
alloc := ar.Alloc()

// Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
Expand Down Expand Up @@ -352,11 +361,20 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
handleStreamResultError(allocIDNotPresentErr, pointer.Of(int64(400)), encoder)
return
}
alloc, err := f.c.GetAlloc(req.AllocID)

ar, err := f.c.getAllocRunner(req.AllocID)
if err != nil {
handleStreamResultError(structs.NewErrUnknownAllocation(req.AllocID), pointer.Of(int64(404)), encoder)
return
}
if ar.IsDestroyed() {
handleStreamResultError(
fmt.Errorf("allocation %s destroyed from client", req.AllocID),
pointer.Of(int64(404)),
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
encoder,
)
}
alloc := ar.Alloc()

// Check read permissions
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
Expand Down
182 changes: 182 additions & 0 deletions client/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -394,6 +395,96 @@ OUTER:
}
}

// TestFS_Stream_GC asserts that reading files from an alloc that has been
// GC'ed from the client returns a 404 error.
func TestFS_Stream_GC(t *testing.T) {
ci.Parallel(t)

// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)

c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })

job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}

// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]

// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)

c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))

// Build the request.
req := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "alloc/logs/web.stdout.0",
PlainText: true,
Follow: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}

// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Stream")
must.NoError(t, err)

// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)

// Start the handler.
go handler(p2)

// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}

streamMsg <- &msg
}
}()

// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))

for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "destroyed from client")
must.Eq(t, 404, *msg.Error.Code)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

func TestFS_Stream_ACL(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -1022,6 +1113,97 @@ func TestFS_Logs_TaskPending(t *testing.T) {
}
}

// TestFS_Logs_GC asserts that reading logs from an alloc that has been GC'ed
// from the client returns a 404 error.
func TestFS_Logs_GC(t *testing.T) {
ci.Parallel(t)

// Start a server and client.
s, cleanupS := nomad.TestServer(t, nil)
t.Cleanup(cleanupS)
testutil.WaitForLeader(t, s.RPC)

c, cleanupC := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
t.Cleanup(func() { cleanupC() })

job := mock.BatchJob()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}

// Wait for alloc to be running.
alloc := testutil.WaitForRunning(t, s.RPC, job)[0]

// GC alloc from the client.
ar, err := c.getAllocRunner(alloc.ID)
must.NoError(t, err)

c.garbageCollector.MarkForCollection(alloc.ID, ar)
must.True(t, c.CollectAllocation(alloc.ID))

// Build the request.
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
Task: job.TaskGroups[0].Tasks[0].Name,
LogType: "stdout",
Origin: "start",
PlainText: true,
QueryOptions: structs.QueryOptions{Region: "global"},
}

// Get the handler.
handler, err := c.StreamingRpcHandler("FileSystem.Logs")
must.NoError(t, err)

// Create a pipe.
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)

// Start the handler.
go handler(p2)

// Start the decoder.
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}

streamMsg <- &msg
}
}()

// Send the request.
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.NoError(t, encoder.Encode(req))

for {
select {
case <-time.After(3 * time.Second):
t.Fatal("timeout")
case err := <-errCh:
t.Fatalf("unexpected stream error: %v", err)
case msg := <-streamMsg:
must.Error(t, msg.Error)
must.ErrorContains(t, msg.Error, "destroyed from client")
must.Eq(t, 404, *msg.Error.Code)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

func TestFS_Logs_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
Loading