Skip to content

Commit

Permalink
Merge pull request #4 from neilotoole/stream-source
Browse files Browse the repository at this point in the history
Added Stream.Source() method to access the underlying source
  • Loading branch information
neilotoole committed Jan 26, 2024
2 parents 79e7cad + 429f31a commit cc27777
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
12 changes: 12 additions & 0 deletions streamcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ func (s *Stream) NewReader(ctx context.Context) *Reader {
return r
}

// Source returns the Stream's underlying source io.Reader.
// This can be useful if you need to force close the source
// for some reason, e.g.
//
// stream.Source().(io.Closer).Close()
//
// The Stream's behavior is undefined if the caller reads from
// the source directly.
func (s *Stream) Source() io.Reader {
return s.src
}

// readFunc is the type of the Reader.readFn field.
type readFunc func(r *Reader, p []byte, offset int) (n int, err error)

Expand Down
56 changes: 56 additions & 0 deletions streamcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"io"
"os"
"path/filepath"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -585,3 +586,58 @@ func TestContextCancelBeforeSrcRead(t *testing.T) {
src.unblock <- struct{}{}
wg.Wait()
}

func TestStreamSource(t *testing.T) {
t.Parallel()

// Write some data to a test file.
fp := filepath.Join(t.TempDir(), "streamcache_test.txt")
require.NoError(t, os.WriteFile(fp, []byte(anything), 0o600))

f, fErr := os.Open(fp)
require.NoError(t, fErr)

// Create a stream (and reader) that reads from the file.
stream := streamcache.New(f)
r := stream.NewReader(context.Background())

// Read a small chunk from the file.
buf := make([]byte, 2)
n, readErr := r.Read(buf)
require.NoError(t, readErr)
require.Equal(t, 2, n)

gotSrc := stream.Source()
require.Equal(t, f, gotSrc)

// Close the source (i.e. the file), and then try
// to read from the reader.
require.NoError(t, gotSrc.(io.ReadCloser).Close())

n, readErr = r.Read(buf)
require.Error(t, readErr)
require.Equal(t, 0, n)
readPathErr := new(os.PathError)
require.True(t, errors.As(readErr, &readPathErr))
require.Equal(t, "read", readPathErr.Op)
require.Equal(t, "file already closed", readPathErr.Err.Error())
require.Equal(t, 2, stream.Size())
require.Equal(t, readErr, stream.Err())
total, totalErr := stream.Total(context.Background())
require.Error(t, totalErr)
require.Equal(t, 2, total)
require.True(t, errors.Is(totalErr, readErr))
requireTake(t, stream.Filled())

// Now check what happens when we close the reader.
requireNoTake(t, stream.Done(),
"stream is not done until sealed and reader is closed")
stream.Seal()
closeErr := r.Close()
require.Error(t, closeErr)
closePathErr := new(os.PathError)
require.True(t, errors.As(closeErr, &closePathErr))
require.Equal(t, "close", closePathErr.Op)
require.Equal(t, "file already closed", closePathErr.Err.Error())
requireTake(t, stream.Done())
}

0 comments on commit cc27777

Please sign in to comment.