Skip to content

Commit

Permalink
[WIP] add CopyBuffer to copy.go
Browse files Browse the repository at this point in the history
[WIP] add CopyBuffer to copy.go

Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Sep 6, 2023
1 parent ac21880 commit f136d83
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
59 changes: 40 additions & 19 deletions internal/io/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"math"
"sync/atomic"
"syscall"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync"
Expand All @@ -33,18 +34,21 @@ func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
return cio.Copy(dst, src)
}

func CopyBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
return cio.CopyBuffer(dst, src, buf)

Check warning on line 38 in internal/io/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/io/copy.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}

type Copier interface {
Copy(dst io.Writer, src io.Reader) (written int64, err error)
CopyBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error)
}

type copier struct {
bufSize int64
pool sync.Pool
}

const (
defaultBufferSize int = 64 * 1024
)
var defaultBufferSize int = 16 * syscall.Getpagesize()

func NewCopier(size int) Copier {
c := new(copier)
Expand All @@ -62,6 +66,22 @@ func NewCopier(size int) Copier {
}

func (c *copier) Copy(dst io.Writer, src io.Reader) (written int64, err error) {
return c.copyBuffer(dst, src, nil)
}

func (c *copier) CopyBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
if buf == nil {
return c.Copy(dst, src)
}
if buf != nil && len(buf) == 0 {
panic("empty buffer in CopyBuffer")

Check warning on line 77 in internal/io/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/io/copy.go#L72-L77

Added lines #L72 - L77 were not covered by tests
}
b := bytes.NewBuffer(buf)
defer b.Reset()
return c.copyBuffer(dst, src, b)

Check warning on line 81 in internal/io/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/io/copy.go#L79-L81

Added lines #L79 - L81 were not covered by tests
}

func (c *copier) copyBuffer(dst io.Writer, src io.Reader, buf *bytes.Buffer) (written int64, err error) {
if dst == nil || src == nil {
return 0, errors.New("empty source or destination")
}
Expand All @@ -81,24 +101,25 @@ func (c *copier) Copy(dst io.Writer, src io.Reader) (written int64, err error) {
limit int64 = math.MaxInt64
size int64 = atomic.LoadInt64(&c.bufSize)
l *io.LimitedReader
buf *bytes.Buffer
)
if l, ok = src.(*io.LimitedReader); ok && l.N >= 1 && size > l.N {
limit = l.N
size = limit
}
buf, ok = c.pool.Get().(*bytes.Buffer)
if !ok || buf == nil {
buf = bytes.NewBuffer(make([]byte, size))
}
defer func() {
if atomic.LoadInt64(&c.bufSize) < size {
atomic.StoreInt64(&c.bufSize, size)
buf.Grow(int(size))
if buf == nil {
if l, ok = src.(*io.LimitedReader); ok && l.N >= 1 && size > l.N {
limit = l.N
size = limit
}
buf.Reset()
c.pool.Put(buf)
}()
buf, ok = c.pool.Get().(*bytes.Buffer)
if !ok || buf == nil {
buf = bytes.NewBuffer(make([]byte, size))
}

Check warning on line 113 in internal/io/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/io/copy.go#L112-L113

Added lines #L112 - L113 were not covered by tests
defer func() {
if atomic.LoadInt64(&c.bufSize) < size {
atomic.StoreInt64(&c.bufSize, size)
buf.Grow(int(size))
}

Check warning on line 118 in internal/io/copy.go

View check run for this annotation

Codecov / codecov/patch

internal/io/copy.go#L116-L118

Added lines #L116 - L118 were not covered by tests
buf.Reset()
c.pool.Put(buf)
}()
}
if size > int64(buf.Cap()) {
size = int64(buf.Cap())
}
Expand Down
20 changes: 20 additions & 0 deletions internal/io/copy_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ func BenchmarkValdIOCopy(b *testing.B) {
}
}

func BenchmarkValdIOCopyBuffer(b *testing.B) {
c := NewCopier(bufferLength)
for i := 0; i < b.N; i++ {
w := &writer{}
r := &reader{len: readerLength}
c.CopyBuffer(w, r, nil)
}
}

func BenchmarkStandardIOCopyParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down Expand Up @@ -113,3 +122,14 @@ func BenchmarkValdIOCopyParallel(b *testing.B) {
}
})
}

func BenchmarkValdIOCopyBufferParallel(b *testing.B) {
c := NewCopier(bufferLength)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w := &writer{}
r := &reader{len: readerLength}
c.CopyBuffer(w, r, nil)
}
})
}

0 comments on commit f136d83

Please sign in to comment.