From af98068c969af213efda8429b031a2645e3e2441 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Mon, 20 Dec 2021 17:21:59 +0100 Subject: [PATCH 01/12] Performing one more read after receiving cancel when streaming file --- client/fs_endpoint.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 87644bd0a6da..db2f4e4944fd 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -637,10 +637,10 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If +// read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If // the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelOnNextEofCh chan error) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -667,6 +667,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, // read and reach EOF. var changes *watch.FileChanges + // Try to read file till end when on cancel received + cancelReceived := false + // Start streaming the data bufSize := int64(streamFrameSize) if limit > 0 && limit < streamFrameSize { @@ -704,6 +707,11 @@ OUTER: continue } + // When eof and cancel received then cancel + if cancelReceived { + return nil + } + // If EOF is hit, wait for a change to the file if changes == nil { changes, err = fs.ChangeEvents(waitCtx, path, offset) @@ -752,12 +760,13 @@ OUTER: return nil case <-ctx.Done(): return nil - case err, ok := <-eofCancelCh: + case _, ok := <-eofCancelOnNextEofCh: if !ok { return nil } - return err + cancelReceived = true + continue OUTER } } } @@ -809,6 +818,7 @@ func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, tas // waiting for. for _, entry := range indexes { if entry.idx >= nextIndex { + //<-time.NewTicker(5 * time.Second).C next <- nil close(next) return From b5e29fe84780a65d7db5f77edce96fa93cc61a42 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Tue, 21 Dec 2021 15:08:28 +0100 Subject: [PATCH 02/12] Bring cancel channel back in file streaming --- client/fs_endpoint.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index db2f4e4944fd..de349b3b7eec 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -247,6 +247,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // If we aren't following end as soon as we hit EOF var eofCancelCh chan error + var eofCancelOnNextEofCh chan error if !req.Follow { eofCancelCh = make(chan error) close(eofCancelCh) @@ -257,7 +258,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Start streaming go func() { - if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil { + if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh, eofCancelOnNextEofCh); err != nil { select { case errCh <- err: case <-ctx.Done(): @@ -577,6 +578,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in return err } + var newFileChannelCh chan error var eofCancelCh chan error exitAfter := false if !follow && idx > maxIndex { @@ -588,11 +590,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in close(eofCancelCh) exitAfter = true } else { - eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) + newFileChannelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) - err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh) + err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, newFileChannelCh) // Check if the context is cancelled select { @@ -640,7 +642,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If // the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelOnNextEofCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, eofCancelOnNextEofCh chan error) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -767,6 +769,12 @@ OUTER: cancelReceived = true continue OUTER + case err, ok := <-eofCancelCh: + if !ok { + return nil + } + + return err } } } From 3c245b8d1d31fa44cd4b08ddf16695c4d2950ed8 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Wed, 22 Dec 2021 11:06:16 +0100 Subject: [PATCH 03/12] Fix existing tests for log streaming --- client/fs_endpoint.go | 1 - client/fs_endpoint_test.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index de349b3b7eec..c4e18ad94881 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -826,7 +826,6 @@ func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, tas // waiting for. for _, entry := range indexes { if entry.idx >= nextIndex { - //<-time.NewTicker(5 * time.Second).C next <- nil close(next) return diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index fa650df27671..a3ab3a442ddb 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) { defer framer.Destroy() err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, "foo", 0, ad, framer, nil) + context.Background(), 0, "foo", 0, ad, framer, nil, nil) require.Error(t, err) if runtime.GOOS == "windows" { require.Contains(t, err.Error(), "cannot find the file") @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() From 4ce686806a4334c4c38e25f4962995e780624de2 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Mon, 20 Dec 2021 17:21:59 +0100 Subject: [PATCH 04/12] Performing one more read after receiving cancel when streaming file --- client/fs_endpoint.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 87644bd0a6da..db2f4e4944fd 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -637,10 +637,10 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If +// read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If // the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelOnNextEofCh chan error) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -667,6 +667,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, // read and reach EOF. var changes *watch.FileChanges + // Try to read file till end when on cancel received + cancelReceived := false + // Start streaming the data bufSize := int64(streamFrameSize) if limit > 0 && limit < streamFrameSize { @@ -704,6 +707,11 @@ OUTER: continue } + // When eof and cancel received then cancel + if cancelReceived { + return nil + } + // If EOF is hit, wait for a change to the file if changes == nil { changes, err = fs.ChangeEvents(waitCtx, path, offset) @@ -752,12 +760,13 @@ OUTER: return nil case <-ctx.Done(): return nil - case err, ok := <-eofCancelCh: + case _, ok := <-eofCancelOnNextEofCh: if !ok { return nil } - return err + cancelReceived = true + continue OUTER } } } @@ -809,6 +818,7 @@ func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, tas // waiting for. for _, entry := range indexes { if entry.idx >= nextIndex { + //<-time.NewTicker(5 * time.Second).C next <- nil close(next) return From 77cf19e180c33ed57748b296128ba234062fd5ea Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Tue, 21 Dec 2021 15:08:28 +0100 Subject: [PATCH 05/12] Bring cancel channel back in file streaming --- client/fs_endpoint.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index db2f4e4944fd..de349b3b7eec 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -247,6 +247,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // If we aren't following end as soon as we hit EOF var eofCancelCh chan error + var eofCancelOnNextEofCh chan error if !req.Follow { eofCancelCh = make(chan error) close(eofCancelCh) @@ -257,7 +258,7 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Start streaming go func() { - if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil { + if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh, eofCancelOnNextEofCh); err != nil { select { case errCh <- err: case <-ctx.Done(): @@ -577,6 +578,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in return err } + var newFileChannelCh chan error var eofCancelCh chan error exitAfter := false if !follow && idx > maxIndex { @@ -588,11 +590,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in close(eofCancelCh) exitAfter = true } else { - eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) + newFileChannelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) - err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh) + err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, newFileChannelCh) // Check if the context is cancelled select { @@ -640,7 +642,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If // the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelOnNextEofCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, eofCancelOnNextEofCh chan error) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -767,6 +769,12 @@ OUTER: cancelReceived = true continue OUTER + case err, ok := <-eofCancelCh: + if !ok { + return nil + } + + return err } } } From a10d41b522b8124c9a618d2b8cfd00abd4f05f75 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Wed, 22 Dec 2021 11:06:16 +0100 Subject: [PATCH 06/12] Fix existing tests for log streaming --- client/fs_endpoint.go | 1 - client/fs_endpoint_test.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index de349b3b7eec..c4e18ad94881 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -826,7 +826,6 @@ func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, tas // waiting for. for _, entry := range indexes { if entry.idx >= nextIndex { - //<-time.NewTicker(5 * time.Second).C next <- nil close(next) return diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index fa650df27671..a3ab3a442ddb 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) { defer framer.Destroy() err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, "foo", 0, ad, framer, nil) + context.Background(), 0, "foo", 0, ad, framer, nil, nil) require.Error(t, err) if runtime.GOOS == "windows" { require.Contains(t, err.Error(), "cannot find the file") @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() From 77734178bb530f3ea8531b15ba62ec3182d95763 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Wed, 22 Dec 2021 17:58:04 +0100 Subject: [PATCH 07/12] Code review - rename channel variable in logging endpoint --- client/fs_endpoint.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index c4e18ad94881..aca6c242ec38 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -578,8 +578,8 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in return err } - var newFileChannelCh chan error var eofCancelCh chan error + var eofCancelOnNextEofCh chan error exitAfter := false if !follow && idx > maxIndex { // Exceeded what was there initially so return @@ -590,11 +590,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in close(eofCancelCh) exitAfter = true } else { - newFileChannelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) + eofCancelOnNextEofCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) - err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, newFileChannelCh) + err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, eofCancelOnNextEofCh) // Check if the context is cancelled select { @@ -639,8 +639,9 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. If -// the connection is broken an EPIPE error is returned +// read. eofCancelCh is used to cancel the stream if triggered while at EOF. +// eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. +// If the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, eofCancelOnNextEofCh chan error) error { From e2137683717443ae4286cdc4603b81c216f93e76 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Thu, 23 Dec 2021 13:45:16 +0100 Subject: [PATCH 08/12] Cover missing data in tests --- client/fs_endpoint_test.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index a3ab3a442ddb..ad836b787f0e 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1918,14 +1918,30 @@ func TestFS_logsImpl_Follow(t *testing.T) { expected := []byte("012345") initialWrites := 3 - writeToFile := func(index int, data []byte) { + filePath := func(index int) string { logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) - logFilePath := filepath.Join(logDir, logFile) + return filepath.Join(logDir, logFile) + } + writeToFile := func(index int, data []byte) { + logFilePath := filePath(index) err := ioutil.WriteFile(logFilePath, data, 0777) if err != nil { t.Fatalf("Failed to create file: %v", err) } } + appendToFile := func(index int, data []byte) { + logFilePath := filePath(index) + f, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + + defer f.Close() + + if _, err = f.Write(data); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + } for i := 0; i < initialWrites; i++ { writeToFile(i, expected[i:i+1]) } @@ -1967,11 +1983,13 @@ func TestFS_logsImpl_Follow(t *testing.T) { t.Fatalf("did not receive data: got %q", string(received)) } - // We got the first chunk of data, write out the rest to the next file + // We got the first chunk of data, write out the rest splitted + // between the last file and to the next file // at an index much ahead to check that it is following and detecting // skips skipTo := initialWrites + 10 - writeToFile(skipTo, expected[initialWrites:]) + appendToFile(initialWrites-1, expected[initialWrites:initialWrites+1]) + writeToFile(skipTo, expected[initialWrites+1:]) select { case <-fullResultCh: From 7bf627992e97f410da764e463a91cd9ad6239914 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Wed, 29 Dec 2021 10:15:24 +0100 Subject: [PATCH 09/12] code review - introduce simple boolean instead of channel to cancel file streaming --- client/fs_endpoint.go | 35 +++++++++++------------------------ client/fs_endpoint_test.go | 8 ++++---- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index aca6c242ec38..9c939ea31f21 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -246,19 +246,14 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { defer framer.Destroy() // If we aren't following end as soon as we hit EOF - var eofCancelCh chan error - var eofCancelOnNextEofCh chan error - if !req.Follow { - eofCancelCh = make(chan error) - close(eofCancelCh) - } + cancelAfterFirstEof := !req.Follow ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start streaming go func() { - if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh, eofCancelOnNextEofCh); err != nil { + if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil { select { case errCh <- err: case <-ctx.Done(): @@ -579,22 +574,21 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in } var eofCancelCh chan error - var eofCancelOnNextEofCh chan error + cancelAfterFirstEof := false exitAfter := false if !follow && idx > maxIndex { // Exceeded what was there initially so return return nil } else if !follow && idx == maxIndex { // At the end - eofCancelCh = make(chan error) - close(eofCancelCh) + cancelAfterFirstEof = true exitAfter = true } else { - eofCancelOnNextEofCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) + eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) - err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, eofCancelOnNextEofCh) + err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof) // Check if the context is cancelled select { @@ -639,11 +633,10 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelCh is used to cancel the stream if triggered while at EOF. -// eofCancelOnNextEofCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. +// read. eofCancelCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. // If the connection is broken an EPIPE error is returned func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, eofCancelOnNextEofCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -670,8 +663,8 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, // read and reach EOF. var changes *watch.FileChanges - // Try to read file till end when on cancel received - cancelReceived := false + // Only watch file when there is a need for it + cancelReceived := cancelAfterFirstEof // Start streaming the data bufSize := int64(streamFrameSize) @@ -763,19 +756,13 @@ OUTER: return nil case <-ctx.Done(): return nil - case _, ok := <-eofCancelOnNextEofCh: + case _, ok := <-eofCancelCh: if !ok { return nil } cancelReceived = true continue OUTER - case err, ok := <-eofCancelCh: - if !ok { - return nil - } - - return err } } } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index ad836b787f0e..76fad1847a82 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) { defer framer.Destroy() err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, "foo", 0, ad, framer, nil, nil) + context.Background(), 0, "foo", 0, ad, framer, nil, false) require.Error(t, err) if runtime.GOOS == "windows" { require.Contains(t, err.Error(), "cannot find the file") @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() From 8dcbb034174edf407f8e7c8ac758c6ec678b7192 Mon Sep 17 00:00:00 2001 From: Arkadiusz Date: Mon, 3 Jan 2022 17:49:33 +0100 Subject: [PATCH 10/12] Apply comments suggestions from code review Co-authored-by: Tim Gross --- client/fs_endpoint.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 9c939ea31f21..825b58ad91ea 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -633,8 +633,9 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelCh, if triggered while at EOF, is used to trigger one more read and cancel the stream on reaching next EOF. -// If the connection is broken an EPIPE error is returned +// read. If eofCancelCh is triggered while at EOF, read one more frame and +// cancel the stream on the next EOF. If the connection is broken an EPIPE +// error is returned. func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error { @@ -761,6 +762,8 @@ OUTER: return nil } + // try to read one more frame to avoid dropped entries + // during log rotation cancelReceived = true continue OUTER } From e2b40366006fa2112604a3e8664fc2047b63a702 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Tue, 4 Jan 2022 12:52:50 +0100 Subject: [PATCH 11/12] code review - do not swallow errors from eof channel when streaming file --- client/fs_endpoint.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 9c939ea31f21..5c12cc6c0a91 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -761,6 +761,10 @@ OUTER: return nil } + if err != nil { + return err + } + cancelReceived = true continue OUTER } From 91e637655749c3346f40960ddf5fcd2af61546f9 Mon Sep 17 00:00:00 2001 From: Arkadiusz Kraus Date: Tue, 4 Jan 2022 17:58:19 +0100 Subject: [PATCH 12/12] add changelog and comment in file streaming --- .changelog/11721.txt | 3 +++ client/fs_endpoint.go | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 .changelog/11721.txt diff --git a/.changelog/11721.txt b/.changelog/11721.txt new file mode 100644 index 000000000000..f6a665305b87 --- /dev/null +++ b/.changelog/11721.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed a bug where the allocation log streaming API was missing log frames that spanned log file rotation +``` diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 926a02a971da..2796c45c2b3c 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -704,7 +704,10 @@ OUTER: continue } - // When eof and cancel received then cancel + // At this point we can stop without waiting for more changes, + // because we have EOF and either we're not following at all, + // or we received an event from the eofCancelCh channel + // and last read was executed if cancelReceived { return nil }