Skip to content

Commit

Permalink
Merge pull request #350 from zenhack/pool-multisegment
Browse files Browse the repository at this point in the history
Use a sync.Pool for MultiSegmentArenas
  • Loading branch information
lthibault authored Nov 28, 2022
2 parents b37f988 + 0f157e3 commit 8231563
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
46 changes: 44 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewSingleSegmentMessage(b []byte) (msg *Message, first *Segment) {
return msg, first
}

// Analogous to NewSingleSegmentMessage, but using MutliSegment.
// Analogous to NewSingleSegmentMessage, but using MultiSegment.
func NewMultiSegmentMessage(b [][]byte) (msg *Message, first *Segment) {
msg, first, err := NewMessage(MultiSegment(b))
if err != nil {
Expand Down Expand Up @@ -480,9 +480,47 @@ type MultiSegmentArena [][]byte
// they are full. b MAY be nil. Callers MAY use b to populate the
// buffer for reading or to reserve memory of a specific size.
func MultiSegment(b [][]byte) *MultiSegmentArena {
if b == nil {
return multiSegmentPool.Get().(*MultiSegmentArena)
}
return multiSegment(b)
}

// Return this arena to an internal sync.Pool of arenas that can be
// re-used. Any time MultiSegment(nil) is called, arenas from this
// pool will be used if available, which can help reduce memory
// allocations.
//
// All segments will be zeroed before re-use.
//
// Calling Release is optional; if not done the garbage collector
// will release the memory per usual.
func (msa *MultiSegmentArena) Release() {
for i, v := range *msa {
// Clear the memory, so there's no junk in here for the next use:
for j, _ := range v {
v[j] = 0
}

// Truncate the segment, since we use the length as the marker for
// what's allocated:
(*msa)[i] = v[:0]
}
(*msa) = (*msa)[:0] // Hide the segments
multiSegmentPool.Put(msa)
}

// Like MultiSegment, but doesn't use the pool
func multiSegment(b [][]byte) *MultiSegmentArena {
return (*MultiSegmentArena)(&b)
}

var multiSegmentPool = sync.Pool{
New: func() any {
return multiSegment(nil)
},
}

// demuxArena slices b into a multi-segment arena. It assumes that
// len(data) >= hdr.totalSize().
func demuxArena(hdr streamHeader, data []byte) (Arena, error) {
Expand Down Expand Up @@ -514,7 +552,11 @@ func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) {

func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) {
var total int64
for i, data := range *msa {
for i := 0; i < cap(*msa); i++ {
if i == len(*msa) {
(*msa) = (*msa)[:i+1]
}
data := (*msa)[i]
id := SegmentID(i)
if s := segs[id]; s != nil {
data = s.data
Expand Down
28 changes: 22 additions & 6 deletions rpc/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
type Transport interface {
// NewMessage allocates a new message to be sent over the transport.
// The caller must call the release function when it no longer needs
// to reference the message. Before releasing the message, send may be
// called at most once to send the mssage, taking its cancelation and
// deadline from ctx.
// to reference the message. Calling the release function more than once
// has no effect. Before releasing the message, send may be called at most
// once to send the mssage, taking its cancelation and deadline from ctx.
//
// Messages returned by NewMessage must have a nil CapTable.
// When the returned ReleaseFunc is called, any clients in the message's
Expand Down Expand Up @@ -110,8 +110,8 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func(
return rpccp.Message{}, nil, nil, err
}

// TODO(soon): reuse memory
msg, seg, err := capnp.NewMessage(capnp.MultiSegment(nil))
arena := capnp.MultiSegment(nil)
msg, seg, err := capnp.NewMessage(arena)
if err != nil {
err = transporterr.Annotate(fmt.Errorf("new message: %w", err), "stream transport")
return rpccp.Message{}, nil, nil, err
Expand All @@ -122,7 +122,13 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func(
return rpccp.Message{}, nil, nil, err
}

alreadyReleased := false

send = func() error {
if alreadyReleased {
panic("Tried to send() a message that was already released.")
}

// context expired?
if err := ctx.Err(); err != nil {
return transporterr.Annotate(fmt.Errorf("send: %w", ctx.Err()), "stream transport")
Expand All @@ -147,7 +153,17 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func(
return err
}

return rmsg, send, func() { msg.Reset(nil) }, nil
release = func() {
if alreadyReleased {
return
}
alreadyReleased = true

msg.Reset(nil)
arena.Release()
}

return rmsg, send, release, nil
}

// SetPartialWriteTimeout sets the timeout for completing the
Expand Down

0 comments on commit 8231563

Please sign in to comment.