Skip to content

Commit

Permalink
feat(fanout): unified fanout concurrent reader (#211)
Browse files Browse the repository at this point in the history
* feat(fanout): fanout reader package

* feat(fanout): unified fanout concurrent reader
  • Loading branch information
cshum authored Oct 11, 2022
1 parent dc16085 commit 100ed56
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 201 deletions.
55 changes: 20 additions & 35 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package imagor
import (
"bytes"
"encoding/json"
"github.com/cshum/imagor/fanout"
"github.com/cshum/imagor/seekstream"
"io"
"net/http"
Expand Down Expand Up @@ -128,17 +129,6 @@ var avif = []byte("avif")
var tifII = []byte("\x49\x49\x2A\x00")
var tifMM = []byte("\x4D\x4D\x00\x2A")

type readSeekCloser struct {
io.Reader
io.Seeker
io.Closer
}

type readCloser struct {
io.Reader
io.Closer
}

type readSeekNopCloser struct {
io.ReadSeeker
}
Expand Down Expand Up @@ -189,19 +179,11 @@ func (b *Blob) init() {
if b.fanout && size > 0 && size < maxMemorySize && err == nil {
// use fan-out reader if buf size known and within memory size
// otherwise create new readers
factory := fanoutReader(reader, int(size))
factory := fanout.New(reader, int(size))
b.newReader = func() (io.ReadCloser, int64, error) {
return factory(), size, nil
}
reader = factory()
// if source not seekable, simulate seek from fanout buffer
if b.newReadSeeker == nil {
b.newReadSeeker = func() (io.ReadSeekCloser, int64, error) {
source := factory()
buffer := seekstream.NewMemoryBuffer(size)
return seekstream.New(source, buffer), size, nil
}
return factory.NewReader(), size, nil
}
reader = factory.NewReader()
} else {
b.fanout = false
}
Expand Down Expand Up @@ -325,23 +307,26 @@ func (b *Blob) NewReader() (reader io.ReadCloser, size int64, err error) {
// NewReadSeeker create read seeker if reader supports seek, or attempts to simulate seek using memory buffer
func (b *Blob) NewReadSeeker() (io.ReadSeekCloser, int64, error) {
b.init()
if b.newReadSeeker == nil {
reader, size, err := b.NewReader()
if b.newReadSeeker != nil {
return b.newReadSeeker()
}
// if source not seekable, simulate seek with seek stream
reader, size, err := b.NewReader()
if err != nil {
return nil, size, err
}
var buffer seekstream.Buffer
if size > 0 && size < maxMemorySize {
// in memory buffer if size is known and less then 100mb
buffer = seekstream.NewMemoryBuffer(size)
} else {
// otherwise temp file buffer
buffer, err = seekstream.NewTempFileBuffer("", "imagor")
if err != nil {
return nil, size, err
}
var buffer seekstream.Buffer
if size > 0 && size < maxMemorySize {
buffer = seekstream.NewMemoryBuffer(size)
} else {
buffer, err = seekstream.NewTempFileBuffer("", "imagor")
if err != nil {
return nil, size, err
}
}
return seekstream.New(reader, buffer), size, err
}
return b.newReadSeeker()
return seekstream.New(reader, buffer), size, err
}

func (b *Blob) ReadAll() ([]byte, error) {
Expand Down
8 changes: 8 additions & 0 deletions blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,11 @@ func TestBlobOverrideContentType(t *testing.T) {
assert.Equal(t, BlobTypeJPEG, b.BlobType())
assert.Equal(t, "foo/bar", b.ContentType())
}

type readerFunc func(p []byte) (n int, err error)

func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }

type closerFunc func() error

func (cf closerFunc) Close() error { return cf() }
157 changes: 0 additions & 157 deletions fanout.go

This file was deleted.

Loading

0 comments on commit 100ed56

Please sign in to comment.