Skip to content

Commit

Permalink
refactored code to use optimistic get object call
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfeidau committed Jan 16, 2024
1 parent a2fa574 commit ed9ea38
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 130 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/bin
/coverage.*
coverage.*
.envrc
/vendor
/*.sarif
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
"go.testFlags": [
"-v",
"-coverpkg=github.com/wolfeidau/s3iofs"
]
],
"go.testEnvVars": {
"AWS_DEBUG": "true"
}
}
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ test:
@echo "--- test all the things"
@go test -coverprofile=coverage.txt ./...
@go tool cover -func=coverage.txt
@cd integration; go test -coverpkg=github.com/wolfeidau/s3iofs -coverprofile=coverage.txt ./...
@cd integration; go tool cover -func=coverage.txt
.PHONY: test
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/sys v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.25.3/go.mod h1:4EqRHDCKP78hq3zOnmFXu
github.com/aws/smithy-go v1.17.0 h1:wWJD7LX6PBV6etBUwO0zElG0nWN9rUhp0WdYeHSHAaI=
github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand All @@ -50,6 +51,12 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
Expand All @@ -61,5 +68,6 @@ golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
Expand Down
77 changes: 47 additions & 30 deletions integration/s3fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ var (
oneKilobyte = bytes.Repeat([]byte("a"), 1024)
)

func generateData(len int) []byte {
return bytes.Repeat([]byte("a"), len)
func generateData(length int) []byte {
return bytes.Repeat([]byte("a"), length)
}

func TestList(t *testing.T) {
Expand Down Expand Up @@ -93,21 +93,51 @@ func TestSeek(t *testing.T) {

s3fs := s3iofs.NewWithClient(testBucketName, client)

f, err := s3fs.Open("test_seek.txt")
assert.NoError(err)
t.Run("seek to start", func(t *testing.T) {
f, err := s3fs.Open("test_seek.txt")
assert.NoError(err)

rdr, ok := f.(io.ReadSeekCloser)
assert.True(ok)
rdr, ok := f.(io.ReadSeekCloser)
assert.True(ok)

defer rdr.Close()
defer rdr.Close()

n, err := rdr.Seek(512, 0)
assert.NoError(err)
assert.Equal(int64(512), n)
n, err := rdr.Seek(512, io.SeekStart)
assert.NoError(err)
assert.Equal(int64(512), n)

buf, err := io.ReadAll(rdr)
assert.NoError(err)
assert.Len(buf, 512)
buf, err := io.ReadAll(rdr)
assert.NoError(err)
assert.Len(buf, 512)
})

t.Run("seek to end", func(t *testing.T) {
f, err := s3fs.Open("test_seek.txt")
assert.NoError(err)
defer f.Close()
rdr, ok := f.(io.ReadSeekCloser)
assert.True(ok)
defer rdr.Close()
n, err := rdr.Seek(-512, io.SeekEnd)
assert.NoError(err)
assert.Equal(int64(512), n)
})

t.Run("seek to current", func(t *testing.T) {
f, err := s3fs.Open("test_seek.txt")
assert.NoError(err)
defer f.Close()
rdr, ok := f.(io.ReadSeekCloser)
assert.True(ok)
defer rdr.Close()
n, err := rdr.Seek(512, io.SeekCurrent)
assert.NoError(err)
assert.Equal(int64(512), n)

n, err = rdr.Seek(512, io.SeekCurrent)
assert.NoError(err)
assert.Equal(int64(1024), n)
})
}

func TestReaderAt(t *testing.T) {
Expand Down Expand Up @@ -169,7 +199,7 @@ func TestReaderAtBig(t *testing.T) {
assert.Equal(twoMegabytes, n)
}

func TestReadBig(t *testing.T) {
func TestReadFile(t *testing.T) {
assert := require.New(t)

_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
Expand All @@ -181,22 +211,9 @@ func TestReadBig(t *testing.T) {

s3fs := s3iofs.NewWithClient(testBucketName, client)

f, err := s3fs.Open("test_read_big.txt")
assert.NoError(err)

defer f.Close()

n, err := f.Read(make([]byte, oneMegabyte))
assert.NoError(err)
assert.Equal(oneMegabyte, n)

n, err = f.Read(make([]byte, twoMegabytes))
data, err := fs.ReadFile(s3fs, "test_read_big.txt")
assert.NoError(err)
assert.Equal(twoMegabytes, n)

n, err = f.Read(make([]byte, 1))
assert.Error(err)
assert.Equal(0, n)
assert.Len(data, threeMegabytes)
}

func TestReadBigEOF(t *testing.T) {
Expand All @@ -216,7 +233,7 @@ func TestReadBigEOF(t *testing.T) {

defer f.Close()

n, err := f.Read(make([]byte, twoMegabytes))
n, err := io.ReadFull(f, make([]byte, twoMegabytes))
assert.ErrorIs(err, io.ErrUnexpectedEOF)
assert.Equal(oneMegabyte, n)
}
1 change: 0 additions & 1 deletion integration/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestMain(m *testing.M) {
endpoint = fmt.Sprintf("http://%s", resource.GetHostPort("9000/tcp"))

if err := pool.Retry(func() error {

endpointURL, err := url.Parse(endpoint)
if err != nil {
log.Fatalf("failed to parse endpoint URL: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)

// S3API s3 calls used to build this library, this is used to enable testing
// S3API s3 calls used to build this library, this is used to enable testing.
type S3API interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
Expand Down
91 changes: 61 additions & 30 deletions s3file.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"io/fs"
"path"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -32,6 +34,8 @@ type s3File struct {
mode fs.FileMode
modTime time.Time // zero value for directories
offset int64
mutex sync.Mutex
body io.ReadCloser
}

func (s3f *s3File) Stat() (fs.FileInfo, error) {
Expand All @@ -51,26 +55,31 @@ func (s3f *s3File) Read(p []byte) (int, error) {
return 0, io.EOF
}

ctx := context.Background()

r, err := s3f.readerAt(ctx, s3f.offset, int64(len(p)))
if err != nil {
return 0, err
s3f.mutex.Lock()
defer s3f.mutex.Unlock()
if s3f.body != nil {
n, err := s3f.body.Read(p)
s3f.offset += int64(n) // update the current offset
return n, err
}

size, err := io.ReadFull(r, p)
s3f.offset += int64(size)
// random access to S3 object is currently being used to read the file
n, err := s3f.ReadAt(p, s3f.offset)
s3f.offset += int64(n) // update the current offset
if err != nil {
if err != io.EOF {
return size, err
}
// check if we are at the end of the underlying file
if s3f.offset > s3f.size {
return size, err
// if we get an unexpected EOF, and we are at the end of the underlying file, return EOF as that is
// the expected behavior
if errors.Is(err, io.ErrUnexpectedEOF) {
// if we are at the end of the underlying file, return EOF as that is the expected behavior
if s3f.offset == s3f.size {
return n, io.EOF
}
}

return n, err
}

return size, r.Close()
return n, nil
}

func (s3f *s3File) ReadAt(p []byte, offset int64) (n int, err error) {
Expand All @@ -85,7 +94,7 @@ func (s3f *s3File) ReadAt(p []byte, offset int64) (n int, err error) {
// given we are using offsets to read this block it is constrained by size of `p`
size, err := io.ReadFull(r, p)
if err != nil {
if err != io.EOF {
if errors.Is(err, io.EOF) {
return size, err
}
// check if we are at the end of the underlying file
Expand All @@ -98,6 +107,18 @@ func (s3f *s3File) ReadAt(p []byte, offset int64) (n int, err error) {
}

func (s3f *s3File) Seek(offset int64, whence int) (int64, error) {
// given the body stream doesn't support seek we will need to re-open the stream
// using read at the new offset
s3f.mutex.Lock()
defer s3f.mutex.Unlock()
if s3f.body != nil {
err := s3f.body.Close()
if err != nil {
return 0, err
}
s3f.body = nil
}

switch whence {
default:
return 0, &fs.PathError{Op: opSeek, Path: s3f.name, Err: fs.ErrInvalid}
Expand Down Expand Up @@ -134,55 +155,65 @@ func (s3f *s3File) readerAt(ctx context.Context, offset, length int64) (io.ReadC
}

func (s3f *s3File) Close() error {
s3f.mutex.Lock()
defer s3f.mutex.Unlock()
if s3f.body != nil {
err := s3f.body.Close()
if err != nil {
return err
}
s3f.body = nil
}

return nil
}

// Name returns the name of the file (or subdirectory) described by the entry.
func (s3f *s3File) Name() string {
return s3f.name
return path.Base(s3f.name)
}

// Size length in bytes for regular files; system-dependent for others
// Size length in bytes for regular files; system-dependent for others.
func (s3f *s3File) Size() int64 {
return s3f.size
}

// Mode file mode bits
// Mode file mode bits.
func (s3f *s3File) Mode() fs.FileMode {
return s3f.mode
}

// file mode bits
// file mode bits.
func (s3f *s3File) Type() fs.FileMode {
return s3f.mode
}

// modification time
// modification time.
func (s3f *s3File) ModTime() time.Time {
return s3f.modTime
}

// abbreviation for Mode().IsDir()
// abbreviation for Mode().IsDir().
func (s3f *s3File) IsDir() bool {
return s3f.Mode().IsDir()
}

// underlying data source (can return nil)
// underlying data source (can return nil).
func (s3f *s3File) Sys() interface{} {
return nil
}

func buildRange(offset, length int64) *string {
var byteRange *string
if offset > 0 && length < 0 {
byteRange = aws.String(fmt.Sprintf("bytes=%d-", offset))
} else if length == 0 {
switch {
case offset > 0 && length < 0:
return aws.String(fmt.Sprintf("bytes=%d-", offset))
case length == 0:
// AWS doesn't support a zero-length read; we'll read 1 byte and then
// ignore it in favor of http.NoBody below.
byteRange = aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+1))
} else if length >= 0 {
byteRange = aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
return aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+1))
case length >= 0:
return aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
}

return byteRange
return nil
}
Loading

0 comments on commit ed9ea38

Please sign in to comment.