From 67f52e6e8f4325146939eb06316bb4e0e7129766 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sun, 27 Nov 2022 17:13:01 -0500 Subject: [PATCH 1/2] Use a sync.Pool for MultiSegmentArenas While working on this, I caught a bug in the transport tests: the test assume that it is safe to call the ReleaseFunc returned from NewMessage multiple times, which was incidentally true before but undocmented. This patch documents this invariant, and also ensures that it holds even when release() actually does something with the arena. --- message.go | 42 ++++++++++++++++++++++++++++++++++++-- rpc/transport/transport.go | 28 +++++++++++++++++++------ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/message.go b/message.go index aecb8f40..d031fb7c 100644 --- a/message.go +++ b/message.go @@ -110,7 +110,7 @@ func NewSingleSegmentMessage(b []byte) (msg *Message, first *Segment) { return msg, first } -// Analogous to NewSingleSegmentMessage, but using MutliSegment. +// Analogous to NewSingleSegmentMessage, but using MultiSegment. func NewMultiSegmentMessage(b [][]byte) (msg *Message, first *Segment) { msg, first, err := NewMessage(MultiSegment(b)) if err != nil { @@ -480,9 +480,43 @@ type MultiSegmentArena [][]byte // they are full. b MAY be nil. Callers MAY use b to populate the // buffer for reading or to reserve memory of a specific size. func MultiSegment(b [][]byte) *MultiSegmentArena { + if b == nil { + return multiSegmentPool.Get().(*MultiSegmentArena) + } + return multiSegment(b) +} + +// Return this arena to an internal sync.Pool of arenas that can be +// re-used. Any time MultiSegment(nil) is called, arenas from this +// pool will be used if available, which can help reduce memory +// allocations. Calling Release however is optional; if not done +// the garbage collector will release the memory per usual. +func (msa *MultiSegmentArena) Release() { + for i, v := range *msa { + // Clear the memory, so there's no junk in here for the next use: + for j, _ := range v { + v[j] = 0 + } + + // Truncate the segment, since we use the length as the marker for + // what's allocated: + (*msa)[i] = v[:0] + } + (*msa) = (*msa)[:0] // Hide the segments + multiSegmentPool.Put(msa) +} + +// Like MultiSegment, but doesn't use the pool +func multiSegment(b [][]byte) *MultiSegmentArena { return (*MultiSegmentArena)(&b) } +var multiSegmentPool = sync.Pool{ + New: func() any { + return multiSegment(nil) + }, +} + // demuxArena slices b into a multi-segment arena. It assumes that // len(data) >= hdr.totalSize(). func demuxArena(hdr streamHeader, data []byte) (Arena, error) { @@ -514,7 +548,11 @@ func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) { func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { var total int64 - for i, data := range *msa { + for i := 0; i < cap(*msa); i++ { + if i == len(*msa) { + (*msa) = (*msa)[:i+1] + } + data := (*msa)[i] id := SegmentID(i) if s := segs[id]; s != nil { data = s.data diff --git a/rpc/transport/transport.go b/rpc/transport/transport.go index 2264ab04..242956f4 100644 --- a/rpc/transport/transport.go +++ b/rpc/transport/transport.go @@ -25,9 +25,9 @@ import ( type Transport interface { // NewMessage allocates a new message to be sent over the transport. // The caller must call the release function when it no longer needs - // to reference the message. Before releasing the message, send may be - // called at most once to send the mssage, taking its cancelation and - // deadline from ctx. + // to reference the message. Calling the release function more than once + // has no effect. Before releasing the message, send may be called at most + // once to send the mssage, taking its cancelation and deadline from ctx. // // Messages returned by NewMessage must have a nil CapTable. // When the returned ReleaseFunc is called, any clients in the message's @@ -110,8 +110,8 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func( return rpccp.Message{}, nil, nil, err } - // TODO(soon): reuse memory - msg, seg, err := capnp.NewMessage(capnp.MultiSegment(nil)) + arena := capnp.MultiSegment(nil) + msg, seg, err := capnp.NewMessage(arena) if err != nil { err = transporterr.Annotate(fmt.Errorf("new message: %w", err), "stream transport") return rpccp.Message{}, nil, nil, err @@ -122,7 +122,13 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func( return rpccp.Message{}, nil, nil, err } + alreadyReleased := false + send = func() error { + if alreadyReleased { + panic("Tried to send() a message that was already released.") + } + // context expired? if err := ctx.Err(); err != nil { return transporterr.Annotate(fmt.Errorf("send: %w", ctx.Err()), "stream transport") @@ -147,7 +153,17 @@ func (s *transport) NewMessage(ctx context.Context) (_ rpccp.Message, send func( return err } - return rmsg, send, func() { msg.Reset(nil) }, nil + release = func() { + if alreadyReleased { + return + } + alreadyReleased = true + + msg.Reset(nil) + arena.Release() + } + + return rmsg, send, release, nil } // SetPartialWriteTimeout sets the timeout for completing the From 0f157e3269f61721c52f7c3591a99973e1ee9153 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sun, 27 Nov 2022 17:59:03 -0500 Subject: [PATCH 2/2] Document the fact that we zero segments in .Release() --- message.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/message.go b/message.go index d031fb7c..eab2173c 100644 --- a/message.go +++ b/message.go @@ -489,8 +489,12 @@ func MultiSegment(b [][]byte) *MultiSegmentArena { // Return this arena to an internal sync.Pool of arenas that can be // re-used. Any time MultiSegment(nil) is called, arenas from this // pool will be used if available, which can help reduce memory -// allocations. Calling Release however is optional; if not done -// the garbage collector will release the memory per usual. +// allocations. +// +// All segments will be zeroed before re-use. +// +// Calling Release is optional; if not done the garbage collector +// will release the memory per usual. func (msa *MultiSegmentArena) Release() { for i, v := range *msa { // Clear the memory, so there's no junk in here for the next use: