Skip to content

Commit

Permalink
Docs cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
neilotoole committed Feb 18, 2024
1 parent cc27777 commit 570bb78
Showing 1 changed file with 68 additions and 71 deletions.
139 changes: 68 additions & 71 deletions streamcache.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@
// Package streamcache implements an in-memory cache mechanism that allows
// multiple callers to read some or all of the contents of a source reader,
// while only reading from the source reader once; when there's only one
// final reader remaining, the cache is discarded and the final reader
// reads directly from the source.
// while only reading from the source reader once; when there's only one final
// reader remaining, the cache is discarded and the final reader reads directly
// from the source.
//
// Let's say we're reading from stdin. For example:
//
// $ cat myfile.ext | myprogram
//
// In this scenario, myprogram wants to detect the type of data
// in the file/pipe, and then print it out. That sampling could be done
// in a separate goroutine per sampler type. The input file could be,
// let's say, a CSV file, or a JSON file.
// In this scenario, myprogram wants to detect the type of data in the
// file/pipe, and then print it out. That sampling could be done in a separate
// goroutine per sampler type. The input file could be, let's say, a CSV file,
// or a JSON file.
//
// The obvious approach is to inspect the first few lines of the
// input, and check if the input is either valid CSV, or valid JSON.
// After that process, let's say we want to dump out the entire contents
// of the input.
// The obvious approach is to inspect the first few lines of the input, and
// check if the input is either valid CSV, or valid JSON. After that process,
// let's say we want to dump out the entire contents of the input.
//
// Package streamcache provides a facility to create a caching Stream from
// an underlying io.Reader (os.Stdin in this scenario), and spawn multiple
// readers, each of which can operate independently, in their own
// goroutines if desired. The underlying source (again, os.Stdin in this
// scenario) will only be read from once, but its data is available to
// multiple readers, because that data is cached in memory.
// Package streamcache provides a facility to create a caching Stream from an
// underlying io.Reader (os.Stdin in this scenario), and spawn multiple readers,
// each of which can operate independently, in their own goroutines if desired.
// The underlying source (again, os.Stdin in this scenario) will only be read
// from once, but its data is available to multiple readers, because that data
// is cached in memory.
//
// That is, until after Stream.Seal is invoked: when there's only one final
// reader left, the cache is discarded, and the final reader reads directly
// from the underlying source.
// reader left, the cache is discarded, and the final reader reads directly from
// the underlying source.
//
// The entrypoint to this package is streamcache.New, which returns a
// new Stream instance, from which readers can be created via Stream.NewReader.
// The entrypoint to this package is streamcache.New, which returns a new Stream
// instance, from which readers can be created via Stream.NewReader.
package streamcache

import (
Expand All @@ -48,47 +47,45 @@ var ErrAlreadyClosed = errors.New("reader is already closed")

// Stream mediates access to the bytes of an underlying source io.Reader.
// Multiple callers can invoke Stream.NewReader to obtain a Reader, each of
// which can read the full or partial contents of the source reader. Note
// that the source is only read from once, and the returned bytes are cached
// in memory. After Stream.Seal is invoked and readers are closed, the final
// reader discards the cache and reads directly from the source for the
// remaining bytes.
// which can read the full or partial contents of the source reader. Note that
// the source is only read from once, and the returned bytes are cached in
// memory. After Stream.Seal is invoked and readers are closed, the final reader
// discards the cache and reads directly from the source for the remaining
// bytes.
type Stream struct {
// src is the underlying reader from which bytes are read.
src io.Reader

// readErr is the first (and only) error returned by src's
// Read method. Once readErr has been set to a non-nil value,
// src is never read from again.
// readErr is the first (and only) error returned by src's Read method. Once
// readErr has been set to a non-nil value, src is never read from again.
readErr error

// rdrsDoneCh is closed after the Stream is sealed and the last
// reader is closed. See Stream.Done.
// rdrsDoneCh is closed after the Stream is sealed and the last reader is
// closed. See Stream.Done.
rdrsDoneCh chan struct{}

// srcDoneCh is closed when the underlying source reader
// returns an error, including io.EOF. See Stream.Filled.
// srcDoneCh is closed when the underlying source reader returns an error,
// including io.EOF. See Stream.Filled.
srcDoneCh chan struct{}

// rdrs is the set of unclosed Reader instances created
// by Stream.NewReader. When a Reader is closed, it is removed
// from this slice. Note that the element order may not match
// the Reader creation order, as the slice may be reordered
// by removeElement during Stream.close.
// rdrs is the set of unclosed Reader instances created by Stream.NewReader.
// When a Reader is closed, it is removed from this slice. Note that the
// element order may not match the Reader creation order, as the slice may
// be reordered by removeElement during Stream.close.
rdrs []*Reader

// cache holds the accumulated bytes read from src.
// It is nilled when the final reader switches to readSrcDirect.
// cache holds the accumulated bytes read from src. It is nilled when the
// final reader switches to readSrcDirect.
cache []byte

// srcMu guards concurrent access to reading from src. Note that it
// is not an instance of sync.Mutex, but instead fifomu.Mutex, which
// is a mutex whose Lock method returns the lock to callers in FIFO
// call order. This is important in Stream.readMain because, as implemented,
// a reader could get the src lock on repeated calls, starving the other
// readers, which is a big problem if that greedy reader blocks
// on reading from src. Most likely our use of locks could be
// improved to avoid this scenario, but that's where we're at today.
// srcMu guards concurrent access to reading from src. Note that it is not
// an instance of sync.Mutex, but instead fifomu.Mutex, which is a mutex
// whose Lock method returns the lock to callers in FIFO call order. This is
// important in Stream.readMain because, as implemented, a reader could get
// the src lock on repeated calls, starving the other readers, which is a
// big problem if that greedy reader blocks on reading from src. Most likely
// our use of locks could be improved to avoid this scenario, but that's
// where we're at today.
srcMu fifomu.Mutex

// size is the count of bytes read from src.
Expand All @@ -101,19 +98,18 @@ type Stream struct {
// no more calls to NewReader are allowed.
sealed bool

// Consider our two mutexes cMu and srcMu above. There are effectively
// three locks that can be acquired.
// Consider our two mutexes cMu and srcMu above. There are effectively three
// locks that can be acquired.
//
// - cMu's read lock
// - cMu's write lock
// - srcMu's lock
//
// These three locks are referred to in the comments as the read, write,
// and src locks.
// These three locks are referred to in the comments as the read, write, and
// src locks.
}

// New returns a new Stream that wraps src. Use Stream.NewReader
// to read from src.
// New returns a new Stream that wraps src. Use Stream.NewReader to read from src.
func New(src io.Reader) *Stream {
c := &Stream{
src: src,
Expand All @@ -125,14 +121,14 @@ func New(src io.Reader) *Stream {
return c
}

// NewReader returns a new Reader from Stream. If ctx is non-nil, it is
// checked for cancellation at the start of Reader.Read (and possibly
// at other checkpoints).
// NewReader returns a new Reader from Stream. If ctx is non-nil, it is checked
// for cancellation at the start of Reader.Read (and possibly at other
// checkpoints).
//
// It is the caller's responsibility to close the returned Reader.
//
// NewReader panics if s is already sealed via Stream.Seal (but note that
// you can first test via Stream.Sealed).
// NewReader panics if s is already sealed via Stream.Seal (but note that you
// can first test via Stream.Sealed).
//
// See: Reader.Read, Reader.Close.
func (s *Stream) NewReader(ctx context.Context) *Reader {
Expand All @@ -153,6 +149,7 @@ func (s *Stream) NewReader(ctx context.Context) *Reader {
}

// Source returns the Stream's underlying source io.Reader.
//
// This can be useful if you need to force close the source
// for some reason, e.g.
//
Expand All @@ -172,9 +169,9 @@ var (
_ readFunc = (*Stream)(nil).readSrcDirect
)

// readSrcDirect reads directly from Stream.src. The src's size is
// incremented as bytes are read from src, and Stream.readErr is set if
// src returns an error.
// readSrcDirect reads directly from Stream.src. The src's size is incremented
// as bytes are read from src, and Stream.readErr is set if src returns an
// error.
func (s *Stream) readSrcDirect(_ *Reader, p []byte, _ int) (n int, err error) {
n, err = s.src.Read(p)

Expand Down Expand Up @@ -440,20 +437,20 @@ func (s *Stream) readFinal(r *Reader, p []byte, offset int) (n int, err error) {
//
// Note also that it's possible that even after the returned channel is closed,
// Stream may not have closed its underlying source reader. For example, if
// a Stream is created and immediately sealed, the channel returned by Done
// is closed, although the underlying source reader was never closed.
// The source reader is closed only by closing the final Reader instance
// that was active after Seal is invoked.
// a Stream is created and immediately sealed, the channel returned by Done is
// closed, although the underlying source reader was never closed. The source
// reader is closed only by closing the final Reader instance that was active
// after Seal is invoked.
//
// See also: Stream.Filled.
func (s *Stream) Done() <-chan struct{} {
return s.rdrsDoneCh
}

// Filled returns a channel that is closed when the underlying source
// reader returns an error, including io.EOF. If the source reader returns
// an error, it is never read from again. If the source reader does not
// return an error, this channel is never closed.
// Filled returns a channel that is closed when the underlying source reader
// returns an error, including io.EOF. If the source reader returns an error, it
// is never read from again. If the source reader does not return an error, this
// channel is never closed.
//
// See also: Stream.Done.
func (s *Stream) Filled() <-chan struct{} {
Expand Down Expand Up @@ -613,7 +610,7 @@ type Reader struct {
// Otherwise Read reads from Stream, which may return bytes from Stream's cache
// or new bytes from the source, or a combination of both. Note in particular
// that Read preferentially returns available bytes from the cache rather than
// waiting to read from the source, even that means the returned n < len(p).
// waiting to read from the source, even if that means the returned n < len(p).
// This is in line with the io.Reader convention:
//
// If some data is available but not len(p) bytes, Read conventionally
Expand Down

0 comments on commit 570bb78

Please sign in to comment.