Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool multi-segment buffers. #488

Merged
merged 7 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 63 additions & 50 deletions arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,33 +112,14 @@ func (ssa *SingleSegmentArena) Release() {
*ssa = nil
}

type roSingleSegment []byte

func (ss roSingleSegment) NumSegments() int64 {
return 1
}

func (ss roSingleSegment) Data(id SegmentID) ([]byte, error) {
if id != 0 {
return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena")
}
return ss, nil
}

func (ss roSingleSegment) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) {
return 0, nil, errors.New("arena is read-only")
}

func (ss roSingleSegment) String() string {
return "read-only single-segment arena [len=" + str.Itod(len(ss)) + "]"
}

func (ss roSingleSegment) Release() {}

// MultiSegment is an arena that stores object data across multiple []byte
// buffers, allocating new buffers of exponentially-increasing size when
// full. This avoids the potentially-expensive slice copying of SingleSegment.
type MultiSegmentArena [][]byte
type MultiSegmentArena struct {
ss [][]byte
delim int // index of first segment in ss that is NOT in buf
buf []byte // full-sized buffer that was demuxed into ss.
}

// MultiSegment returns a new arena that allocates new segments when
// they are full. b MAY be nil. Callers MAY use b to populate the
Expand All @@ -160,75 +141,103 @@ func MultiSegment(b [][]byte) *MultiSegmentArena {
// Calling Release is optional; if not done the garbage collector
// will release the memory per usual.
func (msa *MultiSegmentArena) Release() {
for i, v := range *msa {
// Clear the memory, so there's no junk in here for the next use:
for j := range v {
v[j] = 0
for i, v := range msa.ss {
// segment not in buf?
if i >= msa.delim {
msa.ss[i] = nil
lthibault marked this conversation as resolved.
Show resolved Hide resolved
bufferpool.Default.Put(v)
}

// Truncate the segment, since we use the length as the marker for
// what's allocated:
(*msa)[i] = v[:0]
}
(*msa) = (*msa)[:0] // Hide the segments

bufferpool.Default.Put(msa.buf) // nil is ok
*msa = MultiSegmentArena{ss: msa.ss[:0]}
multiSegmentPool.Put(msa)
}

// Like MultiSegment, but doesn't use the pool
func multiSegment(b [][]byte) *MultiSegmentArena {
return (*MultiSegmentArena)(&b)
return &MultiSegmentArena{ss: b}
}

var multiSegmentPool = sync.Pool{
New: func() any {
return multiSegment(nil)
return multiSegment(make([][]byte, 0, 16))
},
}

// demuxArena slices data into a multi-segment arena. It assumes that
// len(data) >= hdr.totalSize().
func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte) error {
maxSeg := hdr.maxSegment()
if int64(maxSeg) > int64(maxInt-1) {
return errors.New("number of segments overflows int")
}

msa.buf = data
msa.delim = int(maxSeg + 1)

// We might be forced to allocate here, but hopefully it won't
// happen to often. We assume msa was freshly obtained from a
// pool, and that no segments have been allocated yet.
var segment []byte
for i := 0; i < msa.delim; i++ {
sz, err := hdr.segmentSize(SegmentID(i))
if err != nil {
return err
}

segment, data = data[:sz:sz], data[sz:]
msa.ss = append(msa.ss, segment)
}

return nil
}

func (msa *MultiSegmentArena) NumSegments() int64 {
return int64(len(*msa))
return int64(len(msa.ss))
}

func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) {
if int64(id) >= int64(len(*msa)) {
if int64(id) >= int64(len(msa.ss)) {
return nil, errors.New("segment " + str.Utod(id) + " requested (arena only has " +
str.Itod(len(*msa)) + " segments)")
str.Itod(len(msa.ss)) + " segments)")
}
return (*msa)[id], nil
return msa.ss[id], nil
}

func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) {
var total int64
for i := 0; i < cap(*msa); i++ {
if i == len(*msa) {
(*msa) = (*msa)[:i+1]
}
data := (*msa)[i]
for i, data := range msa.ss {
id := SegmentID(i)
if s := segs[id]; s != nil {
data = s.data
}

if hasCapacity(data, sz) {
return id, data, nil
}
total += int64(cap(data))
if total < 0 {

if total += int64(cap(data)); total < 0 {
// Overflow.
return 0, nil, errors.New("alloc " + str.Utod(sz) + " bytes: message too large")
}
}

n, err := nextAlloc(total, 1<<63-1, sz)
if err != nil {
return 0, nil, err
}
buf := make([]byte, 0, n)
id := SegmentID(len(*msa))
*msa = append(*msa, buf)

buf := bufferpool.Default.Get(n)
buf = buf[:0]

id := SegmentID(len(msa.ss))
msa.ss = append(msa.ss, buf)
return id, buf, nil
}

func (msa *MultiSegmentArena) String() string {
return "multi-segment arena [" + str.Itod(len(*msa)) + " segments]"
return "multi-segment arena [" + str.Itod(len(msa.ss)) + " segments]"
}

// nextAlloc computes how much more space to allocate given the number
Expand Down Expand Up @@ -273,3 +282,7 @@ func nextAlloc(curr, max int64, req Size) (int, error) {
return int((delta + 7) &^ 7), nil
}
}

func hasCapacity(b []byte, sz Size) bool {
return sz <= Size(cap(b)-len(b))
}
Loading