From c645528a711dd73daae89b3a1106c83ec3e72000 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Tue, 11 Jun 2024 07:22:49 +0200 Subject: [PATCH] fix: Return to read immediately after a successful read. Copying the filestream, now pipe, dgram, and socket streams return to read again immediately after a successful read, so that we don't wait. This is now obvious the problem in #685 and using the bandwidth-delay-product we can see that a 250ms pause between reads of 4096B and 128KiB matches the results seen. Before: ``` jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log 0.01s user 0.01s system 7% cpu 0.264 total ``` After: ``` jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log 0.01s user 0.03s system 102% cpu 0.041 total ``` Thanks to @rideliner for the hint. --- internal/tailer/logstream/dgramstream.go | 5 +++++ internal/tailer/logstream/dgramstream_unix_test.go | 8 ++------ internal/tailer/logstream/filestream.go | 11 +++++------ internal/tailer/logstream/pipestream.go | 10 +++++++--- internal/tailer/logstream/pipestream_unix_test.go | 10 ++++------ internal/tailer/logstream/socketstream.go | 5 +++++ internal/tailer/logstream/socketstream_unix_test.go | 8 ++------ 7 files changed, 30 insertions(+), 27 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index b4643a960..d768974e9 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak ds.lastReadTime = time.Now() ds.mu.Unlock() ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel) + + // No error implies more to read, so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && IsEndOrCancel(err) { diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 29a90e60e..e5a8c9076 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Stream is not shut down with cancel in this test defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) @@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // sync past read - // "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done. _, err = s.Write([]byte{}) testutil.FatalIfErr(t, err) @@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) @@ -111,8 +109,6 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Synchronise past read. - cancel() // This cancellation should cause the stream to shut down. wg.Wait() diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index b8baf97be..bf1005fd3 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake fs.lastReadTime = time.Now() fs.mu.Unlock() fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel) + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && err != io.EOF { @@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } } - // No error implies there is more to read in this file so go - // straight back to read unless it looks like context is Done. - if err == nil && ctx.Err() == nil { - continue - } - Sleep: // If we get here it's because we've stalled. First test to see if it's // time to exit. diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index 67237e262..5bed5c5fb 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } logCloses.Add(ps.pathname, 1) close(ps.lines) + ps.cancel() }() - ctx, cancel := context.WithCancel(ctx) - defer cancel() SetReadDeadlineOnDone(ctx, fd) for { @@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake ps.lastReadTime = time.Now() ps.mu.Unlock() ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel) + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } // Test to see if we should exit. @@ -123,7 +127,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Exit immediately; cancelled context is going to cause the // next read to be interrupted and exit, so don't bother going // around the loop again. - return + //return case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s): Wake received", ps.pathname) diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/pipestream_unix_test.go index b55ea086f..a6ed66623 100644 --- a/internal/tailer/logstream/pipestream_unix_test.go +++ b/internal/tailer/logstream/pipestream_unix_test.go @@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666)) ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) @@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { testutil.WriteString(t, f, "1\n") - // Avoid a race with cancellation if we can synchronise with waker.Wake() - awaken(0, 0) - cancel() // Cancellation here should cause the stream to shut down. wg.Wait() @@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // The stream is not shut down by cancel in this test. defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled) testutil.FatalIfErr(t, err) @@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) { } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) - awaken(0, 0) + // Give the stream a chance to wake and read + time.Sleep(10 * time.Millisecond) testutil.FatalIfErr(t, f.Close()) diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 6f275b67f..df781581d 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake ss.lastReadTime = time.Now() ss.mu.Unlock() ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel) + + // No error implies more to read, so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } } if err != nil && IsEndOrCancel(err) { diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index e89d18bc6..37443416a 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // The stream is not shut down with cancel in this test. defer cancel() - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) @@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Sync past read - // Close the socket to signal to the socketStream to shut down. testutil.FatalIfErr(t, s.Close()) @@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - waker, awaken := waker.NewTest(ctx, 1, "stream") + waker := waker.NewTestAlways() sockName := scheme + "://" + addr ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) @@ -108,8 +106,6 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { _, err = s.Write([]byte("1\n")) testutil.FatalIfErr(t, err) - awaken(0, 0) // Sync past read to ensure we read - cancel() // This cancellation should cause the stream to shut down immediately. wg.Wait()