Skip to content

Commit

Permalink
Done with benchmarks for now
Browse files Browse the repository at this point in the history
  • Loading branch information
neilotoole committed Jan 25, 2024
1 parent 115356d commit 01a7369
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 23 deletions.
25 changes: 5 additions & 20 deletions streamcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"context"
"errors"
"io"
"runtime"
"sync"

"github.com/neilotoole/fifomu"
Expand Down Expand Up @@ -248,7 +247,6 @@ TOP:
// And now we acquire the src lock so that we
// can read from src.
s.srcMu.Lock() // src lock
// REVISIT: ^^ why not s.srcMu.LockContext(ctx) ?? Benchmark this.
}

// If we've gotten this far, we have the src lock, but not the
Expand Down Expand Up @@ -305,9 +303,8 @@ TOP:

// We're done updating the cache, so we can release the write and src
// locks, and return.
s.cMu.Unlock() // write unlock
s.srcMu.Unlock() // src unlock
runtime.Gosched() // REVISIT: benchmark this
s.cMu.Unlock() // write unlock
s.srcMu.Unlock() // src unlock
return n, err
}

Expand All @@ -320,7 +317,7 @@ func (s *Stream) isSatisfiedFromCache(p []byte, offset int) bool {

// fillFromCache copies bytes from Stream.cache to p, starting at offset,
// returning the number of bytes copied. If readErr is non-nil and we've
// reached the value of ErrAt, readErr is returned.
// reached the end of the cache, readErr is returned.
func (s *Stream) fillFromCache(p []byte, offset int) (n int, err error) {
n = copy(p, s.cache[offset:])
if s.readErr != nil && n+offset >= s.size {
Expand Down Expand Up @@ -503,19 +500,6 @@ func (s *Stream) Err() error {
return s.readErr
}

// ErrAt returns the byte offset at which the first error (if any) was
// returned by the underlying source reader, or -1. Thus, if Stream.Err
// is non-nil, ErrAt will be >= 0 and equal to Stream.Size, and the channel
// returned by Stream.Filled will be closed.
func (s *Stream) ErrAt() int {
s.cMu.RLock() // read lock
defer s.cMu.RUnlock() // read unlock
if s.readErr == nil {
return -1
}
return s.size
}

// Seal is called to indicate that no more calls to NewReader are permitted.
// If there are no unclosed readers when Seal is invoked, the Stream.Done
// channel is closed, and the Stream is considered finished. Subsequent
Expand Down Expand Up @@ -593,7 +577,8 @@ type Reader struct {
readFn readFunc

// pCloseErr is set by Reader.Close, and the set value is
// returned by any subsequent calls to Close.
// returned by any subsequent calls to Close. We use a pointer
// to error so Reader.Read can check if the Reader is closed.
pCloseErr *error

// offset is the offset into the stream from which the next
Expand Down
3 changes: 0 additions & 3 deletions streamcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func TestStream(t *testing.T) {
requireNoTake(t, s.Filled())
require.Equal(t, 0, s.Size())
require.Nil(t, s.Err())
require.Equal(t, -1, s.ErrAt())
requireNoTotal(t, s)

r := s.NewReader(ctx)
Expand Down Expand Up @@ -89,7 +88,6 @@ func TestStream(t *testing.T) {
requireNoTake(t, s.Done())
require.Equal(t, 8, streamcache.ReaderOffset(r))
require.Equal(t, 8, s.Size())
require.Equal(t, 8, s.ErrAt())

// Read one more time, and we should get io.EOF again.
gotN, gotErr = r.Read(buf)
Expand All @@ -99,7 +97,6 @@ func TestStream(t *testing.T) {
require.Equal(t, io.EOF, s.Err())
require.Equal(t, 8, streamcache.ReaderOffset(r))
require.Equal(t, 8, s.Size())
require.Equal(t, 8, s.ErrAt())
requireTotal(t, s, 8)
requireNoTake(t, s.Done())
requireTake(t, s.Filled())
Expand Down

0 comments on commit 01a7369

Please sign in to comment.