Skip to content

Commit

Permalink
[aggregator] Remove indirection, large copy from unaggregated protobu…
Browse files Browse the repository at this point in the history
…f decoder (#3186)
  • Loading branch information
vdarulis authored Feb 7, 2021
1 parent 2d9019d commit 8840b10
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 30 deletions.
40 changes: 16 additions & 24 deletions src/metrics/encoding/protobuf/unaggregated_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,7 @@ import (
)

// UnaggregatedIterator decodes unaggregated metrics.
type UnaggregatedIterator interface {
// Next returns true if there are more items to decode.
Next() bool

// Current returns the current decoded value.
Current() encoding.UnaggregatedMessageUnion

// Err returns the error encountered during decoding, if any.
Err() error

// Close closes the iterator.
Close()
}

type unaggregatedIterator struct {
type UnaggregatedIterator struct {
reader encoding.ByteReadScanner
bytesPool pool.BytesPool
maxMessageSize int
Expand All @@ -61,17 +47,18 @@ type unaggregatedIterator struct {
func NewUnaggregatedIterator(
reader encoding.ByteReadScanner,
opts UnaggregatedOptions,
) UnaggregatedIterator {
) *UnaggregatedIterator {
bytesPool := opts.BytesPool()
return &unaggregatedIterator{
return &UnaggregatedIterator{
reader: reader,
bytesPool: bytesPool,
maxMessageSize: opts.MaxMessageSize(),
buf: allocate(bytesPool, opts.InitBufferSize()),
}
}

func (it *unaggregatedIterator) Close() {
// Close closes the iterator.
func (it *UnaggregatedIterator) Close() {
if it.closed {
return
}
Expand All @@ -87,10 +74,14 @@ func (it *unaggregatedIterator) Close() {
it.err = nil
}

func (it *unaggregatedIterator) Err() error { return it.err }
func (it *unaggregatedIterator) Current() encoding.UnaggregatedMessageUnion { return it.msg }
// Err returns the error encountered during decoding, if any
func (it *UnaggregatedIterator) Err() error { return it.err }

// Current returns the current decoded value
func (it *UnaggregatedIterator) Current() *encoding.UnaggregatedMessageUnion { return &it.msg }

func (it *unaggregatedIterator) Next() bool {
// Next returns true if there are more items to decode.
func (it *UnaggregatedIterator) Next() bool {
if it.err != nil || it.closed {
return false
}
Expand All @@ -113,7 +104,7 @@ func (it *unaggregatedIterator) Next() bool {
return true
}

func (it *unaggregatedIterator) decodeSize() (int, error) {
func (it *UnaggregatedIterator) decodeSize() (int, error) {
n, err := binary.ReadVarint(it.reader)
if err != nil {
it.err = err
Expand All @@ -122,11 +113,12 @@ func (it *unaggregatedIterator) decodeSize() (int, error) {
return int(n), nil
}

func (it *unaggregatedIterator) ensureBufferSize(targetSize int) {
func (it *UnaggregatedIterator) ensureBufferSize(targetSize int) {
it.buf = ensureBufferSize(it.buf, it.bytesPool, targetSize, dontCopyData)
}

func (it *unaggregatedIterator) decodeMessage(size int) error {
//nolint:gocyclo
func (it *UnaggregatedIterator) decodeMessage(size int) error {
_, err := io.ReadFull(it.reader, it.buf[:size])
if err != nil {
it.err = err
Expand Down
11 changes: 5 additions & 6 deletions src/metrics/encoding/protobuf/unaggregated_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,18 +538,17 @@ func TestUnaggregatedIteratorNextOnClose(t *testing.T) {

stream := bytes.NewReader(dataBuf.Bytes())
it := NewUnaggregatedIterator(stream, NewUnaggregatedOptions())
iterator := it.(*unaggregatedIterator)
require.False(t, iterator.closed)
require.NotNil(t, iterator.buf)
require.False(t, it.closed)
require.NotNil(t, it.buf)
require.Nil(t, it.Err())

// Verify that closing the iterator cleans up the state.
it.Close()
require.False(t, it.Next())
require.False(t, it.Next())
require.True(t, iterator.closed)
require.Nil(t, iterator.bytesPool)
require.Nil(t, iterator.buf)
require.True(t, it.closed)
require.Nil(t, it.bytesPool)
require.Nil(t, it.buf)
require.Nil(t, it.Err())

// Verify that closing a second time is a no op.
Expand Down

0 comments on commit 8840b10

Please sign in to comment.