Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log streaming missing frames #11721

Merged
merged 15 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {

// If we aren't following end as soon as we hit EOF
var eofCancelCh chan error
var eofCancelOnNextEofCh chan error
if !req.Follow {
eofCancelCh = make(chan error)
close(eofCancelCh)
Expand All @@ -257,7 +258,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {

// Start streaming
go func() {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh, eofCancelOnNextEofCh); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
Expand Down Expand Up @@ -577,6 +578,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
return err
}

var newFileChannelCh chan error
var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
Expand All @@ -588,11 +590,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
close(eofCancelCh)
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
newFileChannelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
tgross marked this conversation as resolved.
Show resolved Hide resolved
}

p := filepath.Join(logPath, logEntry.Name)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, newFileChannelCh)

// Check if the context is cancelled
select {
Expand Down Expand Up @@ -637,10 +639,10 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in

// streamFile is the internal method to stream the content of a file. If limit
// is greater than zero, the stream will end once that many bytes have been
// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If
// read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If
// the connection is broken an EPIPE error is returned
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error {
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, eofCancelOnNextEofCh chan error) error {
tgross marked this conversation as resolved.
Show resolved Hide resolved

// Get the reader
file, err := fs.ReadAt(path, offset)
Expand All @@ -667,6 +669,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string,
// read and reach EOF.
var changes *watch.FileChanges

// Try to read file till end when on cancel received
cancelReceived := false

// Start streaming the data
bufSize := int64(streamFrameSize)
if limit > 0 && limit < streamFrameSize {
Expand Down Expand Up @@ -704,6 +709,11 @@ OUTER:
continue
}

// When eof and cancel received then cancel
tgross marked this conversation as resolved.
Show resolved Hide resolved
if cancelReceived {
return nil
}

// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(waitCtx, path, offset)
Expand Down Expand Up @@ -752,6 +762,13 @@ OUTER:
return nil
case <-ctx.Done():
return nil
case _, ok := <-eofCancelOnNextEofCh:
if !ok {
return nil
}

cancelReceived = true
continue OUTER
arkadiuss marked this conversation as resolved.
Show resolved Hide resolved
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down
8 changes: 4 additions & 4 deletions client/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) {
defer framer.Destroy()

err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, "foo", 0, ad, framer, nil)
context.Background(), 0, "foo", 0, ad, framer, nil, nil)
require.Error(t, err)
if runtime.GOOS == "windows" {
require.Contains(t, err.Error(), "cannot find the file")
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down