From 394b53473a4df0f22d028a6ae7fc5026b410c67b Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Sun, 9 Oct 2022 19:29:36 +0800 Subject: [PATCH 1/2] refactor(imagor): fanout seeker improvement --- fanout.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/fanout.go b/fanout.go index ac2c61225..e1f3f5aaf 100644 --- a/fanout.go +++ b/fanout.go @@ -10,7 +10,7 @@ func fanoutReader(source io.ReadCloser, size int) func() (io.Reader, io.Seeker, var lock sync.RWMutex var once sync.Once var consumers []chan []byte - var done = make(chan struct{}) + var fullBufReady = make(chan struct{}) var closed []bool var err error var buf = make([]byte, size) @@ -55,7 +55,7 @@ func fanoutReader(source io.ReadCloser, size int) func() (io.Reader, io.Seeker, } lock.RUnlock() if currentSize >= size { - close(done) + close(fullBufReady) } if e != nil || currentSize >= size { return @@ -153,17 +153,21 @@ func fanoutReader(source io.ReadCloser, size int) func() (io.Reader, io.Seeker, once.Do(func() { go init() }) - - if fullBufReader != nil && !readerClosed { - return fullBufReader.Seek(offset, whence) - } else if fullBufReader == nil && !readerClosed { - <-done - fullBufReader = bytes.NewReader(buf) - _ = closeCh(false) - return fullBufReader.Seek(offset, whence) - } else { + if readerClosed { return 0, io.ErrClosedPipe } + if bufReader != nil && + ((whence == io.SeekStart && offset < bufReader.Size()) || + (whence == io.SeekCurrent && offset < int64(bufReader.Len()))) { + return bufReader.Seek(offset, whence) + } + if fullBufReader != nil { + return fullBufReader.Seek(offset, whence) + } + <-fullBufReady + fullBufReader = bytes.NewReader(buf) + _ = closeCh(false) + return fullBufReader.Seek(offset, whence) }) return } From dfe5963af3af8fc5aeb06a32441b7781c9e6f0ab Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Sun, 9 Oct 2022 19:57:27 +0800 Subject: [PATCH 2/2] refactor(imagor): fanout seeker improvement --- fanout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fanout.go b/fanout.go index e1f3f5aaf..eefb167d1 100644 --- a/fanout.go +++ b/fanout.go @@ -101,7 +101,7 @@ func fanoutReader(source io.ReadCloser, size int) func() (io.Reader, io.Seeker, if readerClosed { return 0, io.ErrClosedPipe } - if fullBufReader != nil && !readerClosed { + if fullBufReader != nil { // proxy to full buf if ready return fullBufReader.Read(p) }