diff --git a/src/cmd/services/m3comparator/main/parser/series_iterator_builder.go b/src/cmd/services/m3comparator/main/parser/series_iterator_builder.go index c38252f465..6bfaec2736 100644 --- a/src/cmd/services/m3comparator/main/parser/series_iterator_builder.go +++ b/src/cmd/services/m3comparator/main/parser/series_iterator_builder.go @@ -21,12 +21,10 @@ package parser import ( - "io" "time" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/query/models" @@ -34,9 +32,6 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) -const sep rune = '!' -const tagSep rune = '.' - // Data is a set of datapoints. type Data []ts.Datapoint @@ -46,9 +41,7 @@ type IngestSeries struct { Tags Tags } -var iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) -} +var iterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()) func buildBlockReader( block Data, diff --git a/src/cmd/tools/read_data_files/main/main.go b/src/cmd/tools/read_data_files/main/main.go index 550d4ff563..72dfa54de2 100644 --- a/src/cmd/tools/read_data_files/main/main.go +++ b/src/cmd/tools/read_data_files/main/main.go @@ -21,7 +21,6 @@ package main import ( - "bytes" "encoding/base64" "fmt" "io" @@ -35,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/ident" "github.com/pborman/getopt" @@ -159,7 +159,7 @@ func main() { if benchMode != benchmarkSeries { data.IncRef() - iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) + iter := m3tsz.NewReaderIterator(xio.NewBytesReader64(data.Bytes()), true, encodingOpts) for iter.Next() { dp, _, annotation := iter.Current() if benchMode == benchmarkNone { diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index 474346f2f6..18bc61e361 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -23,7 +23,6 @@ package client import ( "errors" "fmt" - "io" "time" "github.com/m3db/m3/src/dbnode/encoding" @@ -43,11 +42,6 @@ const ( asyncWriteWorkerPoolDefaultSize = 128 ) -var ( - errConfigurationMustSupplyConfig = errors.New( - "must supply config when no topology initializer parameter supplied") -) - // Configuration is a configuration that can be used to construct a client. type Configuration struct { // The environment (static or dynamic) configuration. @@ -412,10 +406,7 @@ func (c Configuration) NewAdminClient( encodingOpts = encoding.NewOptions() } - v = v.SetReaderIteratorAllocate(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - intOptimized := m3tsz.DefaultIntOptimizationEnabled - return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts) - }) + v = v.SetReaderIteratorAllocate(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) if c.Proto != nil && c.Proto.Enabled { v = v.SetEncodingProto(encodingOpts) diff --git a/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go b/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go index 2eb8d6837b..66b6e14019 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go @@ -22,7 +22,6 @@ package client import ( "fmt" - "io" "math/rand" "os" "sort" @@ -32,7 +31,6 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/x/xpool" "github.com/m3db/m3/src/x/ident" @@ -272,9 +270,7 @@ func initTestFetchTaggedPools() *testFetchTaggedPools { pools.readerSlices.Init() pools.multiReader = encoding.NewMultiReaderIteratorPool(opts) - pools.multiReader.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - }) + pools.multiReader.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())) pools.seriesIter = encoding.NewSeriesIteratorPool(opts) pools.seriesIter.Init() diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index e329d0ffce..e2e3608e28 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -22,7 +22,6 @@ package client import ( "errors" - "io" "math" "runtime" "time" @@ -37,6 +36,7 @@ import ( m3dbruntime "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -473,16 +473,17 @@ func (o *options) Validate() error { func (o *options) SetEncodingM3TSZ() Options { opts := *o - opts.readerIteratorAllocate = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - } + opts.readerIteratorAllocate = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()) opts.isProtoEnabled = false return &opts } func (o *options) SetEncodingProto(encodingOpts encoding.Options) Options { opts := *o - opts.readerIteratorAllocate = func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + opts.readerIteratorAllocate = func( + r xio.Reader64, + descr namespace.SchemaDescr, + ) encoding.ReaderIterator { return proto.NewIterator(r, descr, encodingOpts) } opts.isProtoEnabled = true diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index 611c85c97f..dac836d680 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -24,7 +24,6 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "math" "sort" "sync" @@ -58,17 +57,19 @@ import ( ) var ( - blockSize = 2 * time.Hour - nsID = ident.StringID("testNs1") - nsRetentionOpts = retention.NewOptions(). - SetBlockSize(blockSize). - SetRetentionPeriod(48 * blockSize) + blockSize = 2 * time.Hour + nsID = ident.StringID("testNs1") + + nsRetentionOpts = retention.NewOptions().SetBlockSize(blockSize).SetRetentionPeriod(48 * blockSize) + testTagDecodingPool = serialize.NewTagDecoderPool( serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), pool.NewObjectPoolOptions().SetSize(1)) + testTagEncodingPool = serialize.NewTagEncoderPool( serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions().SetSize(1)) + testIDPool = newSessionTestOptions().IdentifierPool() fooID = ident.StringID("foo") fooTags checked.Bytes @@ -101,9 +102,7 @@ func testsNsMetadata(t *testing.T) namespace.Metadata { func newSessionTestMultiReaderIteratorPool() encoding.MultiReaderIteratorPool { p := encoding.NewMultiReaderIteratorPool(nil) - p.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - }) + p.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())) return p } @@ -1455,10 +1454,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) { require.True(t, ok) segment, err := reader.Segment() require.NoError(t, err) - rawBlockData := make([]byte, segment.Len()) - n, err := reader.Read(rawBlockData) - require.NoError(t, err) - require.Equal(t, len(rawBlockData), n) + rawBlockData, err := xio.ToBytes(reader) + require.Equal(t, io.EOF, err) + require.Equal(t, len(rawBlockData), segment.Len()) rawBlockLen := int64(len(rawBlockData)) var ( @@ -1510,8 +1508,8 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) { Return(&rpc.FetchBlocksRawResult_{ Elements: []*rpc.Blocks{ // First foo block intact - &rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{ - &rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{ + {ID: []byte("foo"), Blocks: []*rpc.Block{ + {Start: start.UnixNano(), Segments: &rpc.Segments{ Merged: &rpc.Segment{ Head: rawBlockData[:len(rawBlockData)-1], Tail: []byte{rawBlockData[len(rawBlockData)-1]}, @@ -1519,16 +1517,16 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) { }}, }}, // First bar block intact, second with error - &rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{ - &rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{ + {ID: []byte("bar"), Blocks: []*rpc.Block{ + {Start: start.UnixNano(), Segments: &rpc.Segments{ Merged: &rpc.Segment{ Head: rawBlockData[:len(rawBlockData)-1], Tail: []byte{rawBlockData[len(rawBlockData)-1]}, }, }}, }}, - &rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{ - &rpc.Block{Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{ + {ID: []byte("bar"), Blocks: []*rpc.Block{ + {Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{ Type: rpc.ErrorType_INTERNAL_ERROR, Message: "an error", }}, @@ -1606,10 +1604,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) { require.True(t, ok) segment, err := reader.Segment() require.NoError(t, err) - rawBlockData := make([]byte, segment.Len()) - n, err := reader.Read(rawBlockData) - require.NoError(t, err) - require.Equal(t, len(rawBlockData), n) + rawBlockData, err := xio.ToBytes(reader) + require.Equal(t, io.EOF, err) + require.Equal(t, len(rawBlockData), segment.Len()) rawBlockLen := int64(len(rawBlockData)) var ( @@ -1666,26 +1663,26 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) { Return(&rpc.FetchBlocksRawResult_{ Elements: []*rpc.Blocks{ // valid foo block - &rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{ - &rpc.Block{Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{ + {ID: []byte("foo"), Blocks: []*rpc.Block{ + {Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{ Merged: &rpc.Segment{ Head: head, Tail: tail, }, }}, }}, - &rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{ + {ID: []byte("bar"), Blocks: []*rpc.Block{ // invalid bar block - &rpc.Block{Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{ + {Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{ Merged: &rpc.Segment{ Head: head, Tail: tail, }, }}, }}, - &rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{ + {ID: []byte("bar"), Blocks: []*rpc.Block{ // valid bar block, no checksum - &rpc.Block{Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{ + {Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{ Merged: &rpc.Segment{ Head: head, Tail: tail, @@ -1769,8 +1766,8 @@ func TestBlocksResultAddBlockFromPeerReadMerged(t *testing.T) { require.NoError(t, err) // Assert block has data - data, err := ioutil.ReadAll(xio.NewSegmentReader(seg)) - require.NoError(t, err) + data, err := xio.ToBytes(xio.NewSegmentReader(seg)) + require.Equal(t, io.EOF, err) assert.Equal(t, []byte{1, 2, 3}, data) } diff --git a/src/dbnode/encoding/encoding.go b/src/dbnode/encoding/encoding.go index f6623124ec..0a0389b179 100644 --- a/src/dbnode/encoding/encoding.go +++ b/src/dbnode/encoding/encoding.go @@ -43,7 +43,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) { } // SignExtend sign extends the highest bit of v which has numBits (<=64). -func SignExtend(v uint64, numBits uint) int64 { +func SignExtend(v uint64, numBits uint8) int64 { shift := 64 - numBits return (int64(v) << shift) >> shift } diff --git a/src/dbnode/encoding/encoding_mock.go b/src/dbnode/encoding/encoding_mock.go index 526895fc58..201a89c74c 100644 --- a/src/dbnode/encoding/encoding_mock.go +++ b/src/dbnode/encoding/encoding_mock.go @@ -25,7 +25,6 @@ package encoding import ( - "io" "reflect" "time" @@ -707,7 +706,7 @@ func (mr *MockReaderIteratorMockRecorder) Close() *gomock.Call { } // Reset mocks base method -func (m *MockReaderIterator) Reset(reader io.Reader, schema namespace.SchemaDescr) { +func (m *MockReaderIterator) Reset(reader xio.Reader64, schema namespace.SchemaDescr) { m.ctrl.T.Helper() m.ctrl.Call(m, "Reset", reader, schema) } @@ -1495,7 +1494,7 @@ func (m *MockDecoder) EXPECT() *MockDecoderMockRecorder { } // Decode mocks base method -func (m *MockDecoder) Decode(reader io.Reader) ReaderIterator { +func (m *MockDecoder) Decode(reader xio.Reader64) ReaderIterator { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Decode", reader) ret0, _ := ret[0].(ReaderIterator) @@ -1508,130 +1507,6 @@ func (mr *MockDecoderMockRecorder) Decode(reader interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockDecoder)(nil).Decode), reader) } -// MockIStream is a mock of IStream interface -type MockIStream struct { - ctrl *gomock.Controller - recorder *MockIStreamMockRecorder -} - -// MockIStreamMockRecorder is the mock recorder for MockIStream -type MockIStreamMockRecorder struct { - mock *MockIStream -} - -// NewMockIStream creates a new mock instance -func NewMockIStream(ctrl *gomock.Controller) *MockIStream { - mock := &MockIStream{ctrl: ctrl} - mock.recorder = &MockIStreamMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockIStream) EXPECT() *MockIStreamMockRecorder { - return m.recorder -} - -// Read mocks base method -func (m *MockIStream) Read(arg0 []byte) (int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Read", arg0) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Read indicates an expected call of Read -func (mr *MockIStreamMockRecorder) Read(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockIStream)(nil).Read), arg0) -} - -// ReadBit mocks base method -func (m *MockIStream) ReadBit() (Bit, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadBit") - ret0, _ := ret[0].(Bit) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ReadBit indicates an expected call of ReadBit -func (mr *MockIStreamMockRecorder) ReadBit() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadBit", reflect.TypeOf((*MockIStream)(nil).ReadBit)) -} - -// ReadByte mocks base method -func (m *MockIStream) ReadByte() (byte, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadByte") - ret0, _ := ret[0].(byte) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ReadByte indicates an expected call of ReadByte -func (mr *MockIStreamMockRecorder) ReadByte() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadByte", reflect.TypeOf((*MockIStream)(nil).ReadByte)) -} - -// ReadBits mocks base method -func (m *MockIStream) ReadBits(numBits uint) (uint64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadBits", numBits) - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ReadBits indicates an expected call of ReadBits -func (mr *MockIStreamMockRecorder) ReadBits(numBits interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadBits", reflect.TypeOf((*MockIStream)(nil).ReadBits), numBits) -} - -// PeekBits mocks base method -func (m *MockIStream) PeekBits(numBits uint) (uint64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeekBits", numBits) - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// PeekBits indicates an expected call of PeekBits -func (mr *MockIStreamMockRecorder) PeekBits(numBits interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeekBits", reflect.TypeOf((*MockIStream)(nil).PeekBits), numBits) -} - -// RemainingBitsInCurrentByte mocks base method -func (m *MockIStream) RemainingBitsInCurrentByte() uint { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemainingBitsInCurrentByte") - ret0, _ := ret[0].(uint) - return ret0 -} - -// RemainingBitsInCurrentByte indicates an expected call of RemainingBitsInCurrentByte -func (mr *MockIStreamMockRecorder) RemainingBitsInCurrentByte() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemainingBitsInCurrentByte", reflect.TypeOf((*MockIStream)(nil).RemainingBitsInCurrentByte)) -} - -// Reset mocks base method -func (m *MockIStream) Reset(r io.Reader) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", r) -} - -// Reset indicates an expected call of Reset -func (mr *MockIStreamMockRecorder) Reset(r interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockIStream)(nil).Reset), r) -} - // MockOStream is a mock of OStream interface type MockOStream struct { ctrl *gomock.Controller diff --git a/src/dbnode/encoding/istream.go b/src/dbnode/encoding/istream.go index f4eeba5426..1d1d2e6a83 100644 --- a/src/dbnode/encoding/istream.go +++ b/src/dbnode/encoding/istream.go @@ -21,169 +21,121 @@ package encoding import ( - "bufio" "io" - "math" -) -// istream encapsulates a readable stream. -type istream struct { - r *bufio.Reader // encoded stream - err error // error encountered - current byte // current byte we are working off of - buffer []byte // buffer for reading in multiple bytes - remaining uint // bits remaining in current to be read -} + "github.com/m3db/m3/src/dbnode/x/xio" +) -// NewIStream creates a new Istream -func NewIStream(reader io.Reader, bufioSize int) IStream { - return &istream{ - r: bufio.NewReaderSize(reader, bufioSize), - // Buffer meant to hold uint64 size of bytes. - buffer: make([]byte, 8), - } +// IStream encapsulates a readable stream. +type IStream struct { + r xio.Reader64 + err error // error encountered + current uint64 // current uint64 we are working off of + index int // current index within data slice + remaining uint8 // bits remaining in current to be read } -func (is *istream) ReadBit() (Bit, error) { - if is.err != nil { - return 0, is.err - } - if is.remaining == 0 { - if err := is.readByteFromStream(); err != nil { - return 0, err - } - } - return Bit(is.consumeBuffer(1)), nil +// NewIStream creates a new IStream +func NewIStream(reader64 xio.Reader64) *IStream { + return &IStream{r: reader64} } -func (is *istream) Read(b []byte) (int, error) { - if is.remaining == 0 { - // Optimized path for when the iterator is already aligned on a byte boundary. Avoids - // all the bit manipulation and ReadByte() function calls. - // Use ReadFull because the bufferedReader may not return the requested number of bytes. - return io.ReadFull(is.r, b) - } - - var ( - i int - err error - ) - +// Read reads len(b) bytes. +func (is *IStream) Read(b []byte) (int, error) { + var i int for ; i < len(b); i++ { - b[i], err = is.ReadByte() + res, err := is.ReadBits(8) if err != nil { return i, err } + b[i] = byte(res) } return i, nil } -func (is *istream) ReadByte() (byte, error) { - if is.err != nil { - return 0, is.err - } - remaining := is.remaining - res := is.consumeBuffer(remaining) - if remaining == 8 { - return res, nil - } - if err := is.readByteFromStream(); err != nil { - return 0, err - } - res = (res << uint(8-remaining)) | is.consumeBuffer(8-remaining) - return res, nil +// ReadByte reads the next Byte. +func (is *IStream) ReadByte() (byte, error) { + res, err := is.ReadBits(8) + return byte(res), err +} + +// ReadBit reads the next Bit. +func (is *IStream) ReadBit() (Bit, error) { + res, err := is.ReadBits(1) + return Bit(res), err } -func (is *istream) ReadBits(numBits uint) (uint64, error) { +// ReadBits reads the next Bits. +func (is *IStream) ReadBits(numBits uint8) (uint64, error) { if is.err != nil { return 0, is.err } - var res uint64 - numBytes := numBits / 8 - if numBytes > 0 { - // Use Read call rather than individual ReadByte calls since it has - // optimized path for when the iterator is aligned on a byte boundary. - bytes := is.buffer[0:numBytes] - _, err := is.Read(bytes) - if err != nil { - return 0, err - } - for _, b := range bytes { - res = (res << 8) | uint64(b) - } + if numBits <= is.remaining { + // Have enough bits buffered. + return is.consumeBuffer(numBits), nil } - - numBits = numBits % 8 - for numBits > 0 { - // This is equivalent to calling is.ReadBit() in a loop but some manual inlining - // has been performed to optimize this loop as its heavily used in the hot path. - if is.remaining == 0 { - if err := is.readByteFromStream(); err != nil { - return 0, err - } - } - - numToRead := numBits - if is.remaining < numToRead { - numToRead = is.remaining - } - bits := is.current >> (8 - numToRead) - is.current <<= numToRead - is.remaining -= numToRead - res = (res << uint64(numToRead)) | uint64(bits) - numBits -= numToRead + res := readBitsInWord(is.current, numBits) + // Not enough bits buffered, read next word from the stream. + bitsNeeded := numBits - is.remaining + if err := is.readWordFromStream(); err != nil { + return 0, err + } + if is.remaining < bitsNeeded { + return 0, io.EOF } - return res, nil + return res | is.consumeBuffer(bitsNeeded), nil } -func (is *istream) PeekBits(numBits uint) (uint64, error) { - // check the last byte first +// PeekBits looks at the next Bits, but doesn't move the pos. +func (is *IStream) PeekBits(numBits uint8) (uint64, error) { if numBits <= is.remaining { - return uint64(readBitsInByte(is.current, numBits)), nil + return readBitsInWord(is.current, numBits), nil } - // now check the bytes buffered and read more if necessary. - numBitsRead := is.remaining - res := uint64(readBitsInByte(is.current, is.remaining)) - numBytesToRead := int(math.Ceil(float64(numBits-numBitsRead) / 8)) - bytesRead, err := is.r.Peek(numBytesToRead) + res := readBitsInWord(is.current, numBits) + bitsNeeded := numBits - is.remaining + next, bytes, err := is.r.Peek64() if err != nil { return 0, err } - for i := 0; i < numBytesToRead-1; i++ { - res = (res << 8) | uint64(bytesRead[i]) - numBitsRead += 8 + rem := 8 * bytes + if rem < bitsNeeded { + return 0, io.EOF } - remainder := readBitsInByte(bytesRead[numBytesToRead-1], numBits-numBitsRead) - res = (res << (numBits - numBitsRead)) | uint64(remainder) - return res, nil + return res | readBitsInWord(next, bitsNeeded), nil } -func (is *istream) RemainingBitsInCurrentByte() uint { - return is.remaining +// RemainingBitsInCurrentByte returns the number of bits remaining to be read in the current byte. +func (is *IStream) RemainingBitsInCurrentByte() uint { + return uint(is.remaining % 8) } -// readBitsInByte reads numBits in byte b. -func readBitsInByte(b byte, numBits uint) byte { - return b >> (8 - numBits) +// readBitsInWord reads the first numBits in word w. +func readBitsInWord(w uint64, numBits uint8) uint64 { + return w >> (64 - numBits) } // consumeBuffer consumes numBits in is.current. -func (is *istream) consumeBuffer(numBits uint) byte { - res := readBitsInByte(is.current, numBits) +func (is *IStream) consumeBuffer(numBits uint8) uint64 { + res := readBitsInWord(is.current, numBits) is.current <<= numBits is.remaining -= numBits return res } -func (is *istream) readByteFromStream() error { - is.current, is.err = is.r.ReadByte() - is.remaining = 8 - return is.err +func (is *IStream) readWordFromStream() error { + current, bytes, err := is.r.Read64() + is.current = current + is.remaining = 8 * bytes + is.err = err + + return err } -func (is *istream) Reset(r io.Reader) { - is.r.Reset(r) +// Reset resets the IStream. +func (is *IStream) Reset(reader xio.Reader64) { is.err = nil is.current = 0 is.remaining = 0 + is.index = 0 + is.r = reader } diff --git a/src/dbnode/encoding/istream_test.go b/src/dbnode/encoding/istream_test.go index 4841590b01..eddc4dd30d 100644 --- a/src/dbnode/encoding/istream_test.go +++ b/src/dbnode/encoding/istream_test.go @@ -21,21 +21,22 @@ package encoding import ( - "bytes" + "io" "testing" "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/x/xio" ) -func TestReadBits(t *testing.T) { +func TestIStreamReadBits(t *testing.T) { byteStream := []byte{ 0xca, 0xfe, 0xfd, 0x89, 0x1a, 0x2b, 0x3c, 0x48, 0x55, 0xe6, 0xf7, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, } - o := NewIStream(bytes.NewReader(byteStream), 16) - is := o.(*istream) - numBits := []uint{1, 3, 4, 8, 7, 2, 64, 64} + is := NewIStream(xio.NewBytesReader64(byteStream)) + numBits := []byte{1, 3, 4, 8, 7, 2, 64, 64} var res []uint64 for _, v := range numBits { read, err := is.ReadBits(v) @@ -46,15 +47,35 @@ func TestReadBits(t *testing.T) { require.Equal(t, expected, res) _, err := is.ReadBits(8) - require.Error(t, err) + require.EqualError(t, err, io.EOF.Error()) } -func TestPeekBitsSuccess(t *testing.T) { +func TestIStreamReadByte(t *testing.T) { + var ( + byteStream = []uint8{ + 0xca, 0xfe, 0xfd, 0x89, 0x1a, 0x2b, 0x3c, 0x48, 0x55, 0xe6, 0xf7, + 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, + } + is = NewIStream(xio.NewBytesReader64(byteStream)) + res = make([]byte, 0, len(byteStream)) + ) + + for range byteStream { + read, err := is.ReadByte() + require.NoError(t, err) + res = append(res, read) + } + require.Equal(t, byteStream, res) + + _, err := is.ReadByte() + require.EqualError(t, err, io.EOF.Error()) +} + +func TestIStreamPeekBitsSuccess(t *testing.T) { byteStream := []byte{0xa9, 0xfe, 0xfe, 0xdf, 0x9b, 0x57, 0x21, 0xf1} - o := NewIStream(bytes.NewReader(byteStream), 16) - is := o.(*istream) + is := NewIStream(xio.NewBytesReader64(byteStream)) inputs := []struct { - numBits uint + numBits uint8 expected uint64 }{ {0, 0}, @@ -71,31 +92,29 @@ func TestPeekBitsSuccess(t *testing.T) { require.NoError(t, err) require.Equal(t, input.expected, res) } - require.Equal(t, byte(0), is.current) + require.Equal(t, uint64(0), is.current) require.Equal(t, 0, int(is.remaining)) } -func TestPeekBitsError(t *testing.T) { +func TestIStreamPeekBitsError(t *testing.T) { byteStream := []byte{0x1, 0x2} - o := NewIStream(bytes.NewReader(byteStream), 16) - is := o.(*istream) + is := NewIStream(xio.NewBytesReader64(byteStream)) res, err := is.PeekBits(20) - require.Error(t, err) + require.EqualError(t, err, io.EOF.Error()) require.Equal(t, uint64(0), res) } -func TestReadAfterPeekBits(t *testing.T) { +func TestIStreamReadAfterPeekBits(t *testing.T) { byteStream := []byte{0xab, 0xcd} - o := NewIStream(bytes.NewReader(byteStream), 16) - is := o.(*istream) + is := NewIStream(xio.NewBytesReader64(byteStream)) res, err := is.PeekBits(10) require.NoError(t, err) require.Equal(t, uint64(0x2af), res) _, err = is.PeekBits(20) - require.Error(t, err) + require.EqualError(t, err, io.EOF.Error()) inputs := []struct { - numBits uint + numBits uint8 expected uint64 }{ {2, 0x2}, @@ -107,14 +126,62 @@ func TestReadAfterPeekBits(t *testing.T) { require.Equal(t, input.expected, res) } _, err = is.ReadBits(8) - require.Error(t, err) + require.EqualError(t, err, io.EOF.Error()) } -func TestResetIStream(t *testing.T) { - o := NewIStream(bytes.NewReader(nil), 16) - is := o.(*istream) - is.ReadBits(1) - is.Reset(bytes.NewReader(nil)) - require.Equal(t, byte(0), is.current) - require.Equal(t, 0, int(is.remaining)) +func TestIStreamPeekAfterReadBits(t *testing.T) { + byteStream := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xA} + is := NewIStream(xio.NewBytesReader64(byteStream)) + + res, err := is.ReadBits(16) + require.NoError(t, err) + require.Equal(t, uint64(0x102), res) + + res, err = is.PeekBits(63) + require.NoError(t, err) + require.Equal(t, uint64(0x30405060708090A)>>1, res) + + res, err = is.PeekBits(64) + require.NoError(t, err) + require.Equal(t, uint64(0x30405060708090A), res) + + res, err = is.ReadBits(1) + require.NoError(t, err) + require.Equal(t, uint64(0), res) + + res, err = is.PeekBits(63) + require.NoError(t, err) + require.Equal(t, uint64(0x30405060708090A), res) + + _, err = is.PeekBits(64) + require.EqualError(t, err, io.EOF.Error()) +} + +func TestIStreamRemainingBitsInCurrentByte(t *testing.T) { + byteStream := []byte{0xff, 0, 0x42} + is := NewIStream(xio.NewBytesReader64(byteStream)) + for _, b := range byteStream { + for i := 0; i < 8; i++ { + var expected uint + if i > 0 { + expected = uint(8 - i) + } + require.Equal(t, expected, is.RemainingBitsInCurrentByte()) + bit, err := is.ReadBit() + require.NoError(t, err) + expectedBit := Bit(b>>i) & 1 + require.Equal(t, expectedBit, bit) + } + } +} + +func TestIStreamReset(t *testing.T) { + is := NewIStream(xio.NewBytesReader64([]byte{0xff})) + _, _ = is.ReadBits(8) + _, _ = is.ReadBits(1) + is.Reset(xio.NewBytesReader64(nil)) + require.Equal(t, uint64(0), is.current) + require.Equal(t, uint8(0), is.remaining) + require.Equal(t, 0, is.index) + require.NoError(t, is.err) } diff --git a/src/dbnode/encoding/iterator_test.go b/src/dbnode/encoding/iterator_test.go index f0a33c9357..ec6f4b065d 100644 --- a/src/dbnode/encoding/iterator_test.go +++ b/src/dbnode/encoding/iterator_test.go @@ -21,7 +21,6 @@ package encoding import ( - "io" "time" "github.com/m3db/m3/src/dbnode/namespace" @@ -44,7 +43,7 @@ type testIterator struct { closed bool err error onNext func(oldIdx, newIdx int) - onReset func(r io.Reader, descr namespace.SchemaDescr) + onReset func(r xio.Reader64, descr namespace.SchemaDescr) } func newTestIterator(values []testValue) ReaderIterator { @@ -72,7 +71,7 @@ func (it *testIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) { } v := it.values[idx] dp := ts.Datapoint{Timestamp: v.t, TimestampNanos: xtime.ToUnixNano(v.t), Value: v.value} - return dp, v.unit, ts.Annotation(v.annotation) + return dp, v.unit, v.annotation } func (it *testIterator) Err() error { @@ -83,7 +82,7 @@ func (it *testIterator) Close() { it.closed = true } -func (it *testIterator) Reset(r io.Reader, descr namespace.SchemaDescr) { +func (it *testIterator) Reset(r xio.Reader64, descr namespace.SchemaDescr) { it.onReset(r, descr) } @@ -101,7 +100,7 @@ type testMultiIterator struct { closed bool err error onNext func(oldIdx, newIdx int) - onReset func(r io.Reader) + onReset func(r xio.Reader64) } func newTestMultiIterator(values []testValue, err error) MultiReaderIterator { @@ -129,7 +128,7 @@ func (it *testMultiIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) } v := it.values[idx] dp := ts.Datapoint{Timestamp: v.t, TimestampNanos: xtime.ToUnixNano(v.t), Value: v.value} - return dp, v.unit, ts.Annotation(v.annotation) + return dp, v.unit, v.annotation } func (it *testMultiIterator) Err() error { @@ -215,10 +214,11 @@ func (it *testReaderSliceOfSlicesIterator) arrayIdx() int { } type testNoopReader struct { - n int // return for "n", also required so that each struct construction has its address + n byte // return for "n", also required so that each struct construction has its address } -func (r *testNoopReader) Read(p []byte) (int, error) { return r.n, nil } +func (r *testNoopReader) Read64() (word uint64, n byte, err error) { return 0, r.n, nil } +func (r *testNoopReader) Peek64() (word uint64, n byte, err error) { return 0, r.n, nil } func (r *testNoopReader) Segment() (ts.Segment, error) { return ts.Segment{}, nil } func (r *testNoopReader) Reset(ts.Segment) {} func (r *testNoopReader) Finalize() {} diff --git a/src/dbnode/encoding/m3tsz/decoder.go b/src/dbnode/encoding/m3tsz/decoder.go index e29812dc31..e9d0d38e06 100644 --- a/src/dbnode/encoding/m3tsz/decoder.go +++ b/src/dbnode/encoding/m3tsz/decoder.go @@ -21,9 +21,8 @@ package m3tsz import ( - "io" - "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/x/xio" ) type decoder struct { @@ -40,6 +39,6 @@ func NewDecoder(intOptimized bool, opts encoding.Options) encoding.Decoder { } // Decode decodes the encoded data captured by the reader. -func (dec *decoder) Decode(reader io.Reader) encoding.ReaderIterator { +func (dec *decoder) Decode(reader xio.Reader64) encoding.ReaderIterator { return NewReaderIterator(reader, dec.intOptimized, dec.opts) } diff --git a/src/dbnode/encoding/m3tsz/decoder_benchmark_test.go b/src/dbnode/encoding/m3tsz/decoder_benchmark_test.go new file mode 100644 index 0000000000..0f70c7b4da --- /dev/null +++ b/src/dbnode/encoding/m3tsz/decoder_benchmark_test.go @@ -0,0 +1,71 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3tsz + +import ( + "encoding/base64" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/x/xio" +) + +// BenchmarkM3TSZDecode-12 16867 69272 ns/op +func BenchmarkM3TSZDecode(b *testing.B) { + var ( + encodingOpts = encoding.NewOptions() + reader = xio.NewBytesReader64(nil) + seriesRun = prepareSampleSeriesRun(b) + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + reader.Reset(seriesRun[i]) + iter := NewReaderIterator(reader, DefaultIntOptimizationEnabled, encodingOpts) + for iter.Next() { + _, _, _ = iter.Current() + } + require.NoError(b, iter.Err()) + } +} + +func prepareSampleSeriesRun(b *testing.B) [][]byte { + var ( + rnd = rand.New(rand.NewSource(42)) // nolint: gosec + sampleSeries = make([][]byte, 0, len(sampleSeriesBase64)) + seriesRun = make([][]byte, 0, b.N) + ) + + for _, b64 := range sampleSeriesBase64 { + data, err := base64.StdEncoding.DecodeString(b64) + require.NoError(b, err) + sampleSeries = append(sampleSeries, data) + } + + for i := 0; i < b.N; i++ { + seriesRun = append(seriesRun, sampleSeries[rnd.Intn(len(sampleSeries))]) + } + + return seriesRun +} diff --git a/src/dbnode/encoding/m3tsz/encoder_benchmark_test.go b/src/dbnode/encoding/m3tsz/encoder_benchmark_test.go index 67c62b4660..02e5cf9f68 100644 --- a/src/dbnode/encoding/m3tsz/encoder_benchmark_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_benchmark_test.go @@ -21,7 +21,6 @@ package m3tsz import ( - "bytes" "encoding/base64" "math/rand" "testing" @@ -31,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" xtime "github.com/m3db/m3/src/x/time" ) @@ -75,7 +75,7 @@ func prepareSampleSeriesEncRun(b *testing.B) [][]ts.Datapoint { sampleSeries = make([][]byte, 0, len(sampleSeriesBase64)) seriesRun = make([][]ts.Datapoint, b.N) encodingOpts = encoding.NewOptions() - reader = bytes.NewReader(nil) + reader = xio.NewBytesReader64(nil) ) for _, b64 := range sampleSeriesBase64 { diff --git a/src/dbnode/encoding/m3tsz/encoder_test.go b/src/dbnode/encoding/m3tsz/encoder_test.go index 13eb54e761..7b85b790b7 100644 --- a/src/dbnode/encoding/m3tsz/encoder_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_test.go @@ -21,12 +21,14 @@ package m3tsz import ( + "io" "math/rand" "testing" "time" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" @@ -160,10 +162,11 @@ func getBytes(t *testing.T, e encoding.Encoder) []byte { if !ok { return nil } - var b [1000]byte - n, err := r.Read(b[:]) - require.NoError(t, err) - return b[:n] + + bytes, err := xio.ToBytes(r) + assert.Equal(t, io.EOF, err) + + return bytes } func TestWriteTimeUnit(t *testing.T) { diff --git a/src/dbnode/encoding/m3tsz/float_encoder_iterator.go b/src/dbnode/encoding/m3tsz/float_encoder_iterator.go index b27f959ac2..9a4348b816 100644 --- a/src/dbnode/encoding/m3tsz/float_encoder_iterator.go +++ b/src/dbnode/encoding/m3tsz/float_encoder_iterator.go @@ -55,7 +55,7 @@ func (eit *FloatEncoderAndIterator) WriteFloat(stream encoding.OStream, val floa } // ReadFloat reads a compressed float from the stream. -func (eit *FloatEncoderAndIterator) ReadFloat(stream encoding.IStream) error { +func (eit *FloatEncoderAndIterator) ReadFloat(stream *encoding.IStream) error { if eit.NotFirst { return eit.readNextFloat(stream) } @@ -102,7 +102,7 @@ func (eit *FloatEncoderAndIterator) writeXOR(stream encoding.OStream, currXOR ui stream.WriteBits(currXOR>>uint(curTrailing), numMeaningfulBits) } -func (eit *FloatEncoderAndIterator) readFullFloat(stream encoding.IStream) error { +func (eit *FloatEncoderAndIterator) readFullFloat(stream *encoding.IStream) error { vb, err := stream.ReadBits(64) if err != nil { return err @@ -114,7 +114,7 @@ func (eit *FloatEncoderAndIterator) readFullFloat(stream encoding.IStream) error return nil } -func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error { +func (eit *FloatEncoderAndIterator) readNextFloat(stream *encoding.IStream) error { cb, err := stream.ReadBits(1) if err != nil { return err @@ -122,7 +122,6 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error if cb == opcodeZeroValueXOR { eit.PrevXOR = 0 - eit.PrevFloatBits ^= eit.PrevXOR return nil } @@ -134,7 +133,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error cb = (cb << 1) | nextCB if cb == opcodeContainedValueXOR { previousLeading, previousTrailing := encoding.LeadingAndTrailingZeros(eit.PrevXOR) - numMeaningfulBits := uint(64 - previousLeading - previousTrailing) + numMeaningfulBits := uint8(64 - previousLeading - previousTrailing) meaningfulBits, err := stream.ReadBits(numMeaningfulBits) if err != nil { return err @@ -153,7 +152,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error numLeadingZeros := (numLeadingZeroesAndNumMeaningfulBits & bits12To6Mask) >> 6 numMeaningfulBits := (numLeadingZeroesAndNumMeaningfulBits & bits6To0Mask) + 1 - meaningfulBits, err := stream.ReadBits(uint(numMeaningfulBits)) + meaningfulBits, err := stream.ReadBits(uint8(numMeaningfulBits)) if err != nil { return err } diff --git a/src/dbnode/encoding/m3tsz/iterator.go b/src/dbnode/encoding/m3tsz/iterator.go index b2f7e69fa2..22a1be963a 100644 --- a/src/dbnode/encoding/m3tsz/iterator.go +++ b/src/dbnode/encoding/m3tsz/iterator.go @@ -21,19 +21,28 @@ package m3tsz import ( - "io" "math" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" xtime "github.com/m3db/m3/src/x/time" ) +// DefaultReaderIteratorAllocFn returns a function for allocating NewReaderIterator. +func DefaultReaderIteratorAllocFn( + opts encoding.Options, +) func(r xio.Reader64, _ namespace.SchemaDescr) encoding.ReaderIterator { + return func(r xio.Reader64, _ namespace.SchemaDescr) encoding.ReaderIterator { + return NewReaderIterator(r, DefaultIntOptimizationEnabled, opts) + } +} + // readerIterator provides an interface for clients to incrementally // read datapoints off of an encoded stream. type readerIterator struct { - is encoding.IStream + is *encoding.IStream opts encoding.Options err error // current error @@ -51,9 +60,13 @@ type readerIterator struct { } // NewReaderIterator returns a new iterator for a given reader -func NewReaderIterator(reader io.Reader, intOptimized bool, opts encoding.Options) encoding.ReaderIterator { +func NewReaderIterator( + reader xio.Reader64, + intOptimized bool, + opts encoding.Options, +) encoding.ReaderIterator { return &readerIterator{ - is: encoding.NewIStream(reader, opts.IStreamReaderSizeM3TSZ()), + is: encoding.NewIStream(reader), opts: opts, tsIterator: NewTimestampIterator(opts, false), intOptimized: intOptimized, @@ -165,10 +178,10 @@ func (it *readerIterator) readIntValDiff() { sign = 1.0 } - it.intVal += sign * float64(it.readBits(uint(it.sig))) + it.intVal += sign * float64(it.readBits(it.sig)) } -func (it *readerIterator) readBits(numBits uint) uint64 { +func (it *readerIterator) readBits(numBits uint8) uint64 { if !it.hasNext() { return 0 } @@ -218,7 +231,7 @@ func (it *readerIterator) hasNext() bool { } // Reset resets the ReadIterator for reuse. -func (it *readerIterator) Reset(reader io.Reader, schema namespace.SchemaDescr) { +func (it *readerIterator) Reset(reader xio.Reader64, schema namespace.SchemaDescr) { it.is.Reset(reader) it.tsIterator = NewTimestampIterator(it.opts, it.tsIterator.SkipMarkers) it.err = nil diff --git a/src/dbnode/encoding/m3tsz/iterator_test.go b/src/dbnode/encoding/m3tsz/iterator_test.go index d5550e59dd..212ff1eb13 100644 --- a/src/dbnode/encoding/m3tsz/iterator_test.go +++ b/src/dbnode/encoding/m3tsz/iterator_test.go @@ -21,19 +21,23 @@ package m3tsz import ( - "bytes" "testing" "time" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" ) func getTestReaderIterator(rawBytes []byte) *readerIterator { - return NewReaderIterator(bytes.NewReader(rawBytes), false, encoding.NewOptions()).(*readerIterator) + return NewReaderIterator( + xio.NewBytesReader64(rawBytes), + false, + encoding.NewOptions(), + ).(*readerIterator) } func TestReaderIteratorReadNextTimestamp(t *testing.T) { @@ -57,7 +61,7 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) { } for _, input := range inputs { - stream := encoding.NewIStream(bytes.NewBuffer(input.rawBytes), 16) + stream := encoding.NewIStream(xio.NewBytesReader64(input.rawBytes)) it := NewTimestampIterator(encoding.NewOptions(), false) it.TimeUnit = input.timeUnit @@ -68,7 +72,7 @@ func TestReaderIteratorReadNextTimestamp(t *testing.T) { require.Equal(t, input.expectedTimeDelta, it.PrevTimeDelta) } - stream := encoding.NewIStream(bytes.NewBuffer([]byte{0x1}), 16) + stream := encoding.NewIStream(xio.NewBytesReader64([]byte{0x1})) it := NewTimestampIterator(encoding.NewOptions(), false) err := it.readNextTimestamp(stream) require.Error(t, err) @@ -127,7 +131,7 @@ func TestReaderIteratorReadAnnotation(t *testing.T) { }, } for _, input := range inputs { - stream := encoding.NewIStream(bytes.NewBuffer(input.rawBytes), 16) + stream := encoding.NewIStream(xio.NewBytesReader64(input.rawBytes)) it := NewTimestampIterator(encoding.NewOptions(), false) err := it.readAnnotation(stream) @@ -157,7 +161,7 @@ func TestReaderIteratorReadTimeUnit(t *testing.T) { }, } for _, input := range inputs { - stream := encoding.NewIStream(bytes.NewBuffer(input.rawBytes), 16) + stream := encoding.NewIStream(xio.NewBytesReader64(input.rawBytes)) it := NewTimestampIterator(encoding.NewOptions(), false) it.TimeUnit = input.timeUnit diff --git a/src/dbnode/encoding/m3tsz/roundtrip_test.go b/src/dbnode/encoding/m3tsz/roundtrip_test.go index 345013ac0f..dfb98a8722 100644 --- a/src/dbnode/encoding/m3tsz/roundtrip_test.go +++ b/src/dbnode/encoding/m3tsz/roundtrip_test.go @@ -147,7 +147,6 @@ func validateRoundTrip(t *testing.T, input []ts.Datapoint, intOpt bool) { require.True(t, ok) it := decoder.Decode(stream) - require.True(t, ok) defer it.Close() i := 0 diff --git a/src/dbnode/encoding/m3tsz/timestamp_iterator.go b/src/dbnode/encoding/m3tsz/timestamp_iterator.go index ca3402607c..a9da57e72b 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_iterator.go +++ b/src/dbnode/encoding/m3tsz/timestamp_iterator.go @@ -31,7 +31,7 @@ import ( ) // TimestampIterator encapsulates all the state required for iterating over -// delta-of-delta compresed timestamps. +// delta-of-delta compressed timestamps. type TimestampIterator struct { PrevTime xtime.UnixNano PrevTimeDelta time.Duration @@ -41,6 +41,8 @@ type TimestampIterator struct { Opts encoding.Options + markerEncodingScheme encoding.MarkerEncodingScheme + TimeUnitChanged bool Done bool @@ -49,9 +51,8 @@ type TimestampIterator struct { // for situations where looking ahead is not safe. SkipMarkers bool - numValueBits uint - numBits uint - markerEncodingScheme encoding.MarkerEncodingScheme + numValueBits uint8 + numBits uint8 } // NewTimestampIterator creates a new TimestampIterator. @@ -60,14 +61,14 @@ func NewTimestampIterator(opts encoding.Options, skipMarkers bool) TimestampIter return TimestampIterator{ Opts: opts, SkipMarkers: skipMarkers, - numValueBits: uint(mes.NumValueBits()), - numBits: uint(mes.NumOpcodeBits() + mes.NumValueBits()), + numValueBits: uint8(mes.NumValueBits()), + numBits: uint8(mes.NumOpcodeBits() + mes.NumValueBits()), markerEncodingScheme: mes, } } // ReadTimestamp reads the first or next timestamp. -func (it *TimestampIterator) ReadTimestamp(stream encoding.IStream) (bool, bool, error) { +func (it *TimestampIterator) ReadTimestamp(stream *encoding.IStream) (bool, bool, error) { it.PrevAnt = nil var ( @@ -97,7 +98,7 @@ func (it *TimestampIterator) ReadTimestamp(stream encoding.IStream) (bool, bool, // ReadTimeUnit reads an encoded time unit and updates the iterator's state // accordingly. It is exposed as a public method so that callers can control // the encoding / decoding of the time unit on their own if they choose. -func (it *TimestampIterator) ReadTimeUnit(stream encoding.IStream) error { +func (it *TimestampIterator) ReadTimeUnit(stream *encoding.IStream) error { tuBits, err := stream.ReadByte() if err != nil { return err @@ -112,7 +113,7 @@ func (it *TimestampIterator) ReadTimeUnit(stream encoding.IStream) error { return nil } -func (it *TimestampIterator) readFirstTimestamp(stream encoding.IStream) error { +func (it *TimestampIterator) readFirstTimestamp(stream *encoding.IStream) error { ntBits, err := stream.ReadBits(64) if err != nil { return err @@ -133,7 +134,7 @@ func (it *TimestampIterator) readFirstTimestamp(stream encoding.IStream) error { return nil } -func (it *TimestampIterator) readNextTimestamp(stream encoding.IStream) error { +func (it *TimestampIterator) readNextTimestamp(stream *encoding.IStream) error { dod, err := it.readMarkerOrDeltaOfDelta(stream) if err != nil { return err @@ -144,7 +145,8 @@ func (it *TimestampIterator) readNextTimestamp(stream encoding.IStream) error { return nil } -func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Duration, bool, error) { +// nolint: gocyclo +func (it *TimestampIterator) tryReadMarker(stream *encoding.IStream) (time.Duration, bool, error) { opcodeAndValue, success := it.tryPeekBits(stream, it.numBits) if !success { return 0, false, nil @@ -200,7 +202,9 @@ func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Durati } } -func (it *TimestampIterator) readMarkerOrDeltaOfDelta(stream encoding.IStream) (time.Duration, error) { +func (it *TimestampIterator) readMarkerOrDeltaOfDelta( + stream *encoding.IStream, +) (time.Duration, error) { if !it.SkipMarkers { dod, success, err := it.tryReadMarker(stream) if err != nil { @@ -224,7 +228,9 @@ func (it *TimestampIterator) readMarkerOrDeltaOfDelta(stream encoding.IStream) ( } func (it *TimestampIterator) readDeltaOfDelta( - stream encoding.IStream, tes encoding.TimeEncodingScheme) (time.Duration, error) { + stream *encoding.IStream, + tes encoding.TimeEncodingScheme, +) (time.Duration, error) { if it.TimeUnitChanged { // NB(xichen): if the time unit has changed, always read 64 bits as normalized // dod in nanoseconds. @@ -254,12 +260,12 @@ func (it *TimestampIterator) readDeltaOfDelta( cb = (cb << 1) | nextCB if cb == buckets[i].Opcode() { - dodBits, err := stream.ReadBits(uint(buckets[i].NumValueBits())) + dodBits, err := stream.ReadBits(uint8(buckets[i].NumValueBits())) if err != nil { return 0, err } - dod := encoding.SignExtend(dodBits, uint(buckets[i].NumValueBits())) + dod := encoding.SignExtend(dodBits, uint8(buckets[i].NumValueBits())) timeUnit, err := it.TimeUnit.Value() if err != nil { return 0, nil @@ -269,7 +275,7 @@ func (it *TimestampIterator) readDeltaOfDelta( } } - numValueBits := uint(tes.DefaultBucket().NumValueBits()) + numValueBits := uint8(tes.DefaultBucket().NumValueBits()) dodBits, err := stream.ReadBits(numValueBits) if err != nil { return 0, err @@ -283,7 +289,7 @@ func (it *TimestampIterator) readDeltaOfDelta( return xtime.FromNormalizedDuration(dod, timeUnit), nil } -func (it *TimestampIterator) readAnnotation(stream encoding.IStream) error { +func (it *TimestampIterator) readAnnotation(stream *encoding.IStream) error { antLen, err := it.readVarint(stream) if err != nil { return err @@ -311,12 +317,12 @@ func (it *TimestampIterator) readAnnotation(stream encoding.IStream) error { return nil } -func (it *TimestampIterator) readVarint(stream encoding.IStream) (int, error) { +func (it *TimestampIterator) readVarint(stream *encoding.IStream) (int, error) { res, err := binary.ReadVarint(stream) return int(res), err } -func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits uint) (uint64, bool) { +func (it *TimestampIterator) tryPeekBits(stream *encoding.IStream, numBits uint8) (uint64, bool) { res, err := stream.PeekBits(numBits) if err != nil { return 0, false diff --git a/src/dbnode/encoding/multi_reader_iterator_test.go b/src/dbnode/encoding/multi_reader_iterator_test.go index a82ab1549f..c065d24489 100644 --- a/src/dbnode/encoding/multi_reader_iterator_test.go +++ b/src/dbnode/encoding/multi_reader_iterator_test.go @@ -22,7 +22,6 @@ package encoding import ( "fmt" - "io" "testing" "time" @@ -287,7 +286,7 @@ func assertTestMultiReaderIterator( test testMultiReader, ) { type readerEntries struct { - reader io.Reader + reader xio.Reader64 entries *testMultiReaderEntries } @@ -315,8 +314,8 @@ func assertTestMultiReaderIterator( } var testIterators []*testIterator - var iteratorAlloc func(reader io.Reader, descr namespace.SchemaDescr) ReaderIterator - iteratorAlloc = func(reader io.Reader, descr namespace.SchemaDescr) ReaderIterator { + var iteratorAlloc func(xio.Reader64, namespace.SchemaDescr) ReaderIterator + iteratorAlloc = func(reader xio.Reader64, _ namespace.SchemaDescr) ReaderIterator { for i := range entriesByReader { if reader != entriesByReader[i].reader { continue @@ -331,7 +330,7 @@ func assertTestMultiReaderIterator( } } } - it.onReset = func(r io.Reader, descr namespace.SchemaDescr) { + it.onReset = func(r xio.Reader64, descr namespace.SchemaDescr) { newIt := iteratorAlloc(r, descr).(*testIterator) *it = *newIt // We close this here as we never actually use this iterator diff --git a/src/dbnode/encoding/null.go b/src/dbnode/encoding/null.go index 2dc914dcd6..35ce276912 100644 --- a/src/dbnode/encoding/null.go +++ b/src/dbnode/encoding/null.go @@ -22,7 +22,6 @@ package encoding import ( "fmt" - "io" "time" "github.com/m3db/m3/src/dbnode/namespace" @@ -54,12 +53,12 @@ func (e *nullEncoder) LastEncoded() (ts.Datapoint, error) { func (e *nullEncoder) LastAnnotationChecksum() (uint64, error) { return 0, fmt.Errorf("not implemented") } -func (e *nullEncoder) Len() int { return 0 } -func (e *nullEncoder) Seal() { e.sealed = true } -func (e *nullEncoder) Reset(t time.Time, capacity int, descr namespace.SchemaDescr) {} -func (e *nullEncoder) Close() {} -func (e *nullEncoder) Discard() ts.Segment { return ts.Segment{} } -func (e *nullEncoder) DiscardReset(t time.Time, capacity int, descr namespace.SchemaDescr) ts.Segment { +func (e *nullEncoder) Len() int { return 0 } +func (e *nullEncoder) Seal() { e.sealed = true } +func (e *nullEncoder) Reset(time.Time, int, namespace.SchemaDescr) {} +func (e *nullEncoder) Close() {} +func (e *nullEncoder) Discard() ts.Segment { return ts.Segment{} } +func (e *nullEncoder) DiscardReset(time.Time, int, namespace.SchemaDescr) ts.Segment { return ts.Segment{} } func (e *nullEncoder) SetSchema(_ namespace.SchemaDescr) {} @@ -74,7 +73,8 @@ func NewNullReaderIterator() ReaderIterator { func (r *nullReaderIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) { return ts.Datapoint{}, xtime.Unit(0), nil } -func (r *nullReaderIterator) Next() bool { return false } -func (r *nullReaderIterator) Err() error { return fmt.Errorf("not implemented") } -func (r *nullReaderIterator) Close() {} -func (r *nullReaderIterator) Reset(reader io.Reader, descr namespace.SchemaDescr) {} +func (r *nullReaderIterator) Next() bool { return false } +func (r *nullReaderIterator) Err() error { return fmt.Errorf("not implemented") } +func (r *nullReaderIterator) Close() {} +func (r *nullReaderIterator) Reset(xio.Reader64, namespace.SchemaDescr) { +} diff --git a/src/dbnode/encoding/proto/corruption_prop_test.go b/src/dbnode/encoding/proto/corruption_prop_test.go index 7f5ec5e5a9..c58ae0f78a 100644 --- a/src/dbnode/encoding/proto/corruption_prop_test.go +++ b/src/dbnode/encoding/proto/corruption_prop_test.go @@ -23,7 +23,6 @@ package proto import ( - "bytes" "os" "testing" "time" @@ -31,7 +30,9 @@ import ( "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/x/xio" ) // TestIteratorHandlesCorruptStreams ensures that the protobuf iterator never panics when reading corrupt streams. @@ -48,8 +49,8 @@ func TestIteratorHandlesCorruptStreams(t *testing.T) { parameters.Rng.Seed(seed) props.Property("Iterator should handle corrupt streams", prop.ForAll(func(input corruptionPropTestInput) (bool, error) { - buff := bytes.NewBuffer(input.bytes) - iter := NewIterator(buff, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) + r := xio.NewBytesReader64(input.bytes) + iter := NewIterator(r, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) for iter.Next() { } return true, nil diff --git a/src/dbnode/encoding/proto/int_encoder_iterator.go b/src/dbnode/encoding/proto/int_encoder_iterator.go index 53dc4074eb..d15e7c472e 100644 --- a/src/dbnode/encoding/proto/int_encoder_iterator.go +++ b/src/dbnode/encoding/proto/int_encoder_iterator.go @@ -27,10 +27,6 @@ import ( "github.com/m3db/m3/src/dbnode/encoding/m3tsz" ) -const ( - opcodeZeroSig = 0x0 -) - type intEncoderAndIterator struct { prevIntBits uint64 intSigBitsTracker m3tsz.IntSigBitsTracker @@ -148,7 +144,7 @@ func (eit *intEncoderAndIterator) encodeIntValDiff(stream encoding.OStream, valB stream.WriteBits(valBits, int(numSig)) } -func (eit *intEncoderAndIterator) readIntValue(stream encoding.IStream) error { +func (eit *intEncoderAndIterator) readIntValue(stream *encoding.IStream) error { if eit.hasEncodedFirst { changeExistsControlBit, err := stream.ReadBit() if err != nil { @@ -182,7 +178,7 @@ func (eit *intEncoderAndIterator) readIntValue(stream encoding.IStream) error { return nil } -func (eit *intEncoderAndIterator) readIntSig(stream encoding.IStream) error { +func (eit *intEncoderAndIterator) readIntSig(stream *encoding.IStream) error { updateControlBit, err := stream.ReadBit() if err != nil { return fmt.Errorf( @@ -216,7 +212,7 @@ func (eit *intEncoderAndIterator) readIntSig(stream encoding.IStream) error { return nil } -func (eit *intEncoderAndIterator) readIntValDiff(stream encoding.IStream) error { +func (eit *intEncoderAndIterator) readIntValDiff(stream *encoding.IStream) error { negativeControlBit, err := stream.ReadBit() if err != nil { return fmt.Errorf( @@ -224,7 +220,7 @@ func (eit *intEncoderAndIterator) readIntValDiff(stream encoding.IStream) error itErrPrefix, err) } - numSig := uint(eit.intSigBitsTracker.NumSig) + numSig := eit.intSigBitsTracker.NumSig diffSigBits, err := stream.ReadBits(numSig) if err != nil { return fmt.Errorf( diff --git a/src/dbnode/encoding/proto/iterator.go b/src/dbnode/encoding/proto/iterator.go index 54dbb68eda..d1c81ab575 100644 --- a/src/dbnode/encoding/proto/iterator.go +++ b/src/dbnode/encoding/proto/iterator.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -55,7 +56,7 @@ type iterator struct { err error schema *desc.MessageDescriptor schemaDesc namespace.SchemaDescr - stream encoding.IStream + stream *encoding.IStream marshaller customFieldMarshaller byteFieldDictLRUSize int // TODO(rartoul): Update these as we traverse the stream if we encounter @@ -79,11 +80,11 @@ type iterator struct { // NewIterator creates a new iterator. func NewIterator( - reader io.Reader, + reader xio.Reader64, descr namespace.SchemaDescr, opts encoding.Options, ) encoding.ReaderIterator { - stream := encoding.NewIStream(reader, opts.IStreamReaderSizeProto()) + stream := encoding.NewIStream(reader) i := &iterator{ opts: opts, @@ -237,7 +238,7 @@ func (it *iterator) Err() error { return it.err } -func (it *iterator) Reset(reader io.Reader, descr namespace.SchemaDescr) { +func (it *iterator) Reset(reader xio.Reader64, descr namespace.SchemaDescr) { it.resetSchema(descr) it.stream.Reset(reader) it.tsIterator = m3tsz.NewTimestampIterator(it.opts, true) @@ -336,7 +337,7 @@ func (it *iterator) readCustomFieldsSchema() error { } for i := 1; i <= int(numCustomFields); i++ { - fieldTypeBits, err := it.stream.ReadBits(uint(numBitsToEncodeCustomType)) + fieldTypeBits, err := it.stream.ReadBits(uint8(numBitsToEncodeCustomType)) if err != nil { return err } @@ -546,7 +547,7 @@ func (it *iterator) readBytesValue(i int, customField customFieldState) error { if valueInDictControlBit == opCodeInterpretSubsequentBitsAsLRUIndex { dictIdxBits, err := it.stream.ReadBits( - uint(numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize))) + uint8(numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize))) if err != nil { return fmt.Errorf( "%s error trying to read bytes dict idx: %v", @@ -861,15 +862,6 @@ func (it *iterator) nextToBeEvicted(fieldIdx int) []byte { return dict[0] } -func (it *iterator) readBits(numBits uint) (uint64, error) { - res, err := it.stream.ReadBits(numBits) - if err != nil { - return 0, err - } - - return res, nil -} - func (it *iterator) resetUnmarshalProtoBuffer(n int) { if it.unmarshalProtoBuf != nil && it.unmarshalProtoBuf.Cap() >= n { // If the existing one is big enough, just resize it. diff --git a/src/dbnode/encoding/proto/round_trip_test.go b/src/dbnode/encoding/proto/round_trip_test.go index 7e0cb64f5c..3efdf25129 100644 --- a/src/dbnode/encoding/proto/round_trip_test.go +++ b/src/dbnode/encoding/proto/round_trip_test.go @@ -21,7 +21,6 @@ package proto import ( - "bytes" "errors" "fmt" "testing" @@ -30,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" @@ -162,8 +162,8 @@ func TestRoundTrip(t *testing.T) { require.NoError(t, err) require.Equal(t, numExpectedBytes, len(rawBytes)) - buff := bytes.NewBuffer(rawBytes) - iter := NewIterator(buff, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) + r := xio.NewBytesReader64(rawBytes) + iter := NewIterator(r, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) i := 0 for iter.Next() { @@ -212,9 +212,9 @@ func TestRoundTripMidStreamSchemaChanges(t *testing.T) { vl2WriteTime := vl1WriteTime.Add(time.Second) err = enc.Encode(ts.Datapoint{Timestamp: vl2WriteTime}, xtime.Second, marshalledVL) - require.Equal(t, - "proto encoder: error unmarshalling message: encountered unknown field with field number: 6", - err.Error()) + require.EqualError(t, + err, + "proto encoder: error unmarshalling message: encountered unknown field with field number: 6") enc.SetSchema(namespace.GetTestSchemaDescr(testVL2Schema)) err = enc.Encode(ts.Datapoint{Timestamp: vl2WriteTime}, xtime.Second, marshalledVL) @@ -224,8 +224,8 @@ func TestRoundTripMidStreamSchemaChanges(t *testing.T) { require.NoError(t, err) // Try reading the stream just using the vl1 schema. - buff := bytes.NewBuffer(rawBytes) - iter := NewIterator(buff, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) + r := xio.NewBytesReader64(rawBytes) + iter := NewIterator(r, namespace.GetTestSchemaDescr(testVLSchema), testEncodingOptions) require.True(t, iter.Next(), "iter err: %v", iter.Err()) dp, unit, annotation := iter.Current() @@ -260,8 +260,8 @@ func TestRoundTripMidStreamSchemaChanges(t *testing.T) { require.NoError(t, iter.Err()) // Try reading the stream just using the vl2 schema. - buff = bytes.NewBuffer(rawBytes) - iter = NewIterator(buff, namespace.GetTestSchemaDescr(testVL2Schema), testEncodingOptions) + r = xio.NewBytesReader64(rawBytes) + iter = NewIterator(r, namespace.GetTestSchemaDescr(testVL2Schema), testEncodingOptions) require.True(t, iter.Next(), "iter err: %v", iter.Err()) dp, unit, annotation = iter.Current() diff --git a/src/dbnode/encoding/scheme.go b/src/dbnode/encoding/scheme.go index efcbcee138..0c8327aea8 100644 --- a/src/dbnode/encoding/scheme.go +++ b/src/dbnode/encoding/scheme.go @@ -141,7 +141,8 @@ func newTimeEncodingSchemes(schemes map[xtime.Unit]TimeEncodingScheme) TimeEncod } // newTimeEncodingScheme creates a new time encoding scheme. -// NB(xichen): numValueBitsForBbuckets should be ordered by value in ascending order (smallest value first). +// NB(xichen): numValueBitsForBuckets should be ordered by value +// in ascending order (smallest value first). func newTimeEncodingScheme(numValueBitsForBuckets []int, numValueBitsForDefault int) TimeEncodingScheme { numBuckets := len(numValueBitsForBuckets) buckets := make([]TimeBucket, 0, numBuckets) diff --git a/src/dbnode/encoding/series_iterator_split_into_blocks_test.go b/src/dbnode/encoding/series_iterator_split_into_blocks_test.go index 5ebc2aadd8..5ef4e7d7a2 100644 --- a/src/dbnode/encoding/series_iterator_split_into_blocks_test.go +++ b/src/dbnode/encoding/series_iterator_split_into_blocks_test.go @@ -22,7 +22,6 @@ package encoding_test import ( - "io" "testing" "time" @@ -34,9 +33,10 @@ import ( "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/namespace" ) type Series struct { @@ -69,7 +69,7 @@ func TestDeconstructAndReconstruct(t *testing.T) { i++ } - iterAlloc := func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { + iterAlloc := func(r xio.Reader64, _ namespace.SchemaDescr) encoding.ReaderIterator { iter := m3tsz.NewDecoder(true, encoding.NewOptions()) return iter.Decode(r) } diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 230925b340..567d908016 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -21,7 +21,6 @@ package encoding import ( - "io" "time" "github.com/m3db/m3/src/dbnode/namespace" @@ -151,19 +150,19 @@ type Options interface { // ByteFieldDictionaryLRUSize returns the ByteFieldDictionaryLRUSize. ByteFieldDictionaryLRUSize() int - // SetIStreamReaderSizeM3TSZ sets the istream bufio reader size + // SetIStreamReaderSizeM3TSZ sets the IStream bufio reader size // for m3tsz encoding iteration. SetIStreamReaderSizeM3TSZ(value int) Options - // IStreamReaderSizeM3TSZ returns the istream bufio reader size + // IStreamReaderSizeM3TSZ returns the IStream bufio reader size // for m3tsz encoding iteration. IStreamReaderSizeM3TSZ() int - // SetIStreamReaderSizeProto sets the istream bufio reader size + // SetIStreamReaderSizeProto sets the IStream bufio reader size // for proto encoding iteration. SetIStreamReaderSizeProto(value int) Options - // SetIStreamReaderSizeProto returns the istream bufio reader size + // SetIStreamReaderSizeProto returns the IStream bufio reader size // for proto encoding iteration. IStreamReaderSizeProto() int } @@ -191,7 +190,7 @@ type ReaderIterator interface { // Reset resets the iterator to read from a new reader with // a new schema (for schema aware iterators). - Reset(reader io.Reader, schema namespace.SchemaDescr) + Reset(reader xio.Reader64, schema namespace.SchemaDescr) } // MultiReaderIterator is an iterator that iterates in order over @@ -328,7 +327,7 @@ type MutableSeriesIterators interface { // Decoder is the generic interface for different types of decoders. type Decoder interface { // Decode decodes the encoded data in the reader. - Decode(reader io.Reader) ReaderIterator + Decode(reader xio.Reader64) ReaderIterator } // NewDecoderFn creates a new decoder. @@ -338,32 +337,7 @@ type NewDecoderFn func() Decoder type EncoderAllocate func() Encoder // ReaderIteratorAllocate allocates a ReaderIterator for a pool. -type ReaderIteratorAllocate func(reader io.Reader, descr namespace.SchemaDescr) ReaderIterator - -// IStream encapsulates a readable stream. -type IStream interface { - // Read reads len(b) bytes. - Read([]byte) (int, error) - - // ReadBit reads the next Bit. - ReadBit() (Bit, error) - - // ReadByte reads the next Byte. - ReadByte() (byte, error) - - // ReadBits reads the next Bits. - ReadBits(numBits uint) (uint64, error) - - // PeekBits looks at the next Bits, but doesn't move the pos. - PeekBits(numBits uint) (uint64, error) - - // RemainingBitsInCurrentByte returns the number of bits remaining to - // be read in the current byte. - RemainingBitsInCurrentByte() uint - - // Reset resets the IStream. - Reset(r io.Reader) -} +type ReaderIteratorAllocate func(reader xio.Reader64, descr namespace.SchemaDescr) ReaderIterator // OStream encapsulates a writable stream. type OStream interface { diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index be56b6a327..661eda8518 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -23,7 +23,6 @@ package integration import ( - "bytes" "errors" "fmt" "testing" @@ -36,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/ident/testutil" xtime "github.com/m3db/m3/src/x/time" @@ -184,14 +184,6 @@ func waitUntilFileSetFilesExist( return waitUntilFileSetFilesExistOrNot(filePathPrefix, files, true, timeout) } -func waitUntilFileSetFilesNotExist( - filePathPrefix string, - files []fs.FileSetFileIdentifier, - timeout time.Duration, -) error { - return waitUntilFileSetFilesExistOrNot(filePathPrefix, files, false, timeout) -} - func waitUntilFileSetFilesExistOrNot( filePathPrefix string, files []fs.FileSetFileIdentifier, @@ -309,7 +301,7 @@ func checkForTime( var datapoints []generate.TestValue it := iteratorPool.Get() - it.Reset(bytes.NewBuffer(data.Bytes()), nsCtx.Schema) + it.Reset(xio.NewBytesReader64(data.Bytes()), nsCtx.Schema) for it.Next() { dp, _, ann := it.Current() datapoints = append(datapoints, generate.TestValue{Datapoint: dp, Annotation: ann}) diff --git a/src/dbnode/persist/fs/merger_test.go b/src/dbnode/persist/fs/merger_test.go index 9487e4b09a..5592274538 100644 --- a/src/dbnode/persist/fs/merger_test.go +++ b/src/dbnode/persist/fs/merger_test.go @@ -75,9 +75,7 @@ func init() { srPool = xio.NewSegmentReaderPool(poolOpts) srPool.Init() multiIterPool = encoding.NewMultiReaderIteratorPool(poolOpts) - multiIterPool.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - }) + multiIterPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())) bytesPool := pool.NewCheckedBytesPool(nil, poolOpts, func(s []pool.Bucket) pool.BytesPool { return pool.NewBytesPool(s, poolOpts) }) @@ -618,11 +616,10 @@ func datapointsToCheckedBytes(t *testing.T, dps []ts.Datapoint) checked.Bytes { r, ok := encoder.Stream(ctx) require.True(t, ok) - var b [1000]byte - n, err := r.Read(b[:]) - require.NoError(t, err) + bytes, err := xio.ToBytes(r) + require.Equal(t, io.EOF, err) - copied := append([]byte(nil), b[:n]...) + copied := append([]byte(nil), bytes...) cb := checked.NewBytes(copied, nil) return cb } diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 8f725866ec..fc15887c22 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -969,12 +969,20 @@ func (req *retrieveRequest) BlockSize() time.Duration { return req.blockSize } -func (req *retrieveRequest) Read(b []byte) (int, error) { +func (req *retrieveRequest) Read64() (word uint64, n byte, err error) { req.resultWg.Wait() if req.err != nil { - return 0, req.err + return 0, 0, req.err } - return req.reader.Read(b) + return req.reader.Read64() +} + +func (req *retrieveRequest) Peek64() (word uint64, n byte, err error) { + req.resultWg.Wait() + if req.err != nil { + return 0, 0, req.err + } + return req.reader.Peek64() } func (req *retrieveRequest) Segment() (ts.Segment, error) { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 13ab1ac9ef..c02f8caef7 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1683,14 +1683,14 @@ func withEncodingAndPoolingOptions( return m3tsz.NewEncoder(time.Time{}, nil, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) }) - iteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + iteratorPool.Init(func(r xio.Reader64, descr namespace.SchemaDescr) encoding.ReaderIterator { if cfg.Proto != nil && cfg.Proto.Enabled { return proto.NewIterator(r, descr, encodingOpts) } return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) }) - multiIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + multiIteratorPool.Init(func(r xio.Reader64, descr namespace.SchemaDescr) encoding.ReaderIterator { iter := iteratorPool.Get() iter.Reset(r, descr) return iter diff --git a/src/dbnode/storage/block/merged_block_reader.go b/src/dbnode/storage/block/merged_block_reader.go index 04461f261f..cd3b90e39e 100644 --- a/src/dbnode/storage/block/merged_block_reader.go +++ b/src/dbnode/storage/block/merged_block_reader.go @@ -177,12 +177,20 @@ func (r *dbMergedBlockReader) BlockSize() time.Duration { return r.blockSize } -func (r *dbMergedBlockReader) Read(b []byte) (int, error) { +func (r *dbMergedBlockReader) Read64() (word uint64, n byte, err error) { reader, err := r.mergedReader() if err != nil { - return 0, err + return 0, 0, err } - return reader.Read(b) + return reader.Read64() +} + +func (r *dbMergedBlockReader) Peek64() (word uint64, n byte, err error) { + reader, err := r.mergedReader() + if err != nil { + return 0, 0, err + } + return reader.Peek64() } func (r *dbMergedBlockReader) Segment() (ts.Segment, error) { diff --git a/src/dbnode/storage/block/options.go b/src/dbnode/storage/block/options.go index 70abd04930..14e1581db7 100644 --- a/src/dbnode/storage/block/options.go +++ b/src/dbnode/storage/block/options.go @@ -21,17 +21,15 @@ package block import ( - "io" - "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/pool" xsync "github.com/m3db/m3/src/x/sync" - "github.com/m3db/m3/src/dbnode/namespace" ) const ( @@ -93,14 +91,13 @@ func NewOptions() Options { o.encoderPool.Init(func() encoding.Encoder { return m3tsz.NewEncoder(timeZero, nil, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) }) - o.readerIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) - o.multiReaderIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - it := o.readerIteratorPool.Get() - it.Reset(r, descr) - return it - }) + o.readerIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) + o.multiReaderIteratorPool.Init( + func(r xio.Reader64, descr namespace.SchemaDescr) encoding.ReaderIterator { + it := o.readerIteratorPool.Get() + it.Reset(r, descr) + return it + }) o.segmentReaderPool.Init() o.bytesPool.Init() return o diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 146266966a..21d8724a4c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -441,9 +442,9 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options, seg, err := reader.Segment() require.NoError(t, err) - bytes := make([]byte, seg.Len()) - _, err = reader.Read(bytes) - require.NoError(t, err) + bytes, err := xio.ToBytes(reader) + require.Equal(t, io.EOF, err) + require.Equal(t, seg.Len(), len(bytes)) mockReader.EXPECT().Read().Return( foo.ID, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 7c689dc4d2..57146b5786 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -24,6 +24,7 @@ package commitlog import ( "fmt" + "io" "io/ioutil" "os" "reflect" @@ -45,6 +46,7 @@ import ( "github.com/m3db/m3/src/dbnode/topology" tu "github.com/m3db/m3/src/dbnode/topology/testutil" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -191,14 +193,8 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { ctx := context.NewBackground() reader, ok := encoder.Stream(ctx) if ok { - seg, err := reader.Segment() - if err != nil { - return false, err - } - - bytes := make([]byte, seg.Len()) - _, err = reader.Read(bytes) - if err != nil { + bytes, err := xio.ToBytes(reader) + if err != io.EOF { return false, err } encodersBySeries[seriesID] = bytes diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 907e3b5a47..7b2eb03f40 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -23,6 +23,7 @@ package fs import ( "errors" "fmt" + "io" "io/ioutil" "os" "path" @@ -44,6 +45,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" @@ -557,11 +559,10 @@ func validateReadResults( readerAtTime := seriesReaders[0] assert.Equal(t, times[i], readerAtTime.Start) ctx := context.NewBackground() - var b [100]byte - n, err := readerAtTime.Reader.Read(b[:]) + bytes, err := xio.ToBytes(readerAtTime.Reader) ctx.Close() - require.NoError(t, err) - require.Equal(t, data[i], b[:n]) + require.Equal(t, io.EOF, err) + require.Equal(t, data[i], bytes) } tester.EnsureNoWrites() diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 2019b1d49a..877b059d8d 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -316,12 +316,7 @@ type NamespacesTester struct { func buildDefaultIterPool() encoding.MultiReaderIteratorPool { iterPool := encoding.NewMultiReaderIteratorPool(pool.NewObjectPoolOptions()) - iterPool.Init( - func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, - m3tsz.DefaultIntOptimizationEnabled, - encoding.NewOptions()) - }) + iterPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())) return iterPool } diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index e276ee852a..ec56c22bda 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -23,7 +23,6 @@ package storage import ( "errors" "fmt" - "io" "math" "runtime" "time" @@ -505,16 +504,12 @@ func (o *options) SetEncodingM3TSZPooled() Options { opts.encoderPool = encoderPool // initialize single reader iterator pool - readerIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) + readerIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) opts.readerIteratorPool = readerIteratorPool // initialize multi reader iterator pool multiReaderIteratorPool := encoding.NewMultiReaderIteratorPool(opts.poolOpts) - multiReaderIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) + multiReaderIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) opts.multiReaderIteratorPool = multiReaderIteratorPool opts.blockOpts = opts.blockOpts. diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index 8497b451dc..87bd5cfece 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -21,7 +21,6 @@ package series import ( - "io" "sort" "strings" "testing" @@ -42,9 +41,10 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/namespace" ) var ( @@ -60,9 +60,7 @@ func newBufferTestOptions() Options { encoderPool.Init(func() encoding.Encoder { return m3tsz.NewEncoder(timeZero, nil, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) }) - multiReaderIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) + multiReaderIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) bufferBucketPool := NewBufferBucketPool(nil) bufferBucketVersionsPool := NewBufferBucketVersionsPool(nil) diff --git a/src/dbnode/storage/series/series_test.go b/src/dbnode/storage/series/series_test.go index f8d9e8e739..cd52c30c4f 100644 --- a/src/dbnode/storage/series/series_test.go +++ b/src/dbnode/storage/series/series_test.go @@ -22,7 +22,6 @@ package series import ( "errors" - "io" "sort" "testing" "time" @@ -43,9 +42,10 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/namespace" ) func newSeriesTestOptions() Options { @@ -57,9 +57,7 @@ func newSeriesTestOptions() Options { encoderPool.Init(func() encoding.Encoder { return m3tsz.NewEncoder(timeZero, nil, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) }) - multiReaderIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) + multiReaderIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) bufferBucketPool := NewBufferBucketPool(nil) bufferBucketVersionsPool := NewBufferBucketVersionsPool(nil) diff --git a/src/dbnode/testdata/prototest/pools.go b/src/dbnode/testdata/prototest/pools.go index 6b23f0e132..ad07221680 100644 --- a/src/dbnode/testdata/prototest/pools.go +++ b/src/dbnode/testdata/prototest/pools.go @@ -21,12 +21,12 @@ package prototest import ( - "io" "time" - "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/dbnode/encoding/proto" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/proto" + "github.com/m3db/m3/src/dbnode/x/xio" + "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" "github.com/m3db/m3/src/dbnode/namespace" @@ -63,10 +63,10 @@ func newPools() Pools { encoderPool.Init(func() encoding.Encoder { return proto.NewEncoder(timeZero, encodingOpts) }) - readerIterPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + readerIterPool.Init(func(r xio.Reader64, descr namespace.SchemaDescr) encoding.ReaderIterator { return proto.NewIterator(r, descr, encodingOpts) }) - multiReaderIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + multiReaderIteratorPool.Init(func(r xio.Reader64, descr namespace.SchemaDescr) encoding.ReaderIterator { i := readerIterPool.Get() i.Reset(r, descr) return i diff --git a/src/dbnode/x/xio/block_reader_test.go b/src/dbnode/x/xio/block_reader_test.go index fd7371a7cb..746822391a 100644 --- a/src/dbnode/x/xio/block_reader_test.go +++ b/src/dbnode/x/xio/block_reader_test.go @@ -50,16 +50,15 @@ func TestCloneBlock(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - var p []byte seg := ts.Segment{} reader := NewMockSegmentReader(ctrl) - reader.EXPECT().Read(p).Return(0, errTest).Times(1) - reader.EXPECT().Read(p).Return(100, nil).Times(1) + reader.EXPECT().Read64().Return(uint64(0), byte(0), errTest).Times(1) + reader.EXPECT().Read64().Return(uint64(123456), byte(10), nil).Times(1) reader.EXPECT().Reset(seg).Return().Times(1) clonedReader := NewMockSegmentReader(ctrl) - clonedReader.EXPECT().Read(p).Return(1337, nil).Times(1) + clonedReader.EXPECT().Read64().Return(uint64(1337), byte(2), nil).Times(1) reader.EXPECT().Clone(nil).Return(clonedReader, nil).Times(1) @@ -69,12 +68,14 @@ func TestCloneBlock(t *testing.T) { BlockSize: blockSize, } - read, err := b.Read(p) - require.Equal(t, read, 0) - require.Equal(t, err, errTest) + word, n, err := b.Read64() + require.Equal(t, uint64(0), word) + require.Equal(t, byte(0), n) + require.Equal(t, errTest, err) - read, err = b.Read(p) - require.Equal(t, read, 100) + word, n, err = b.Read64() + require.Equal(t, uint64(123456), word) + require.Equal(t, byte(10), n) require.NoError(t, err) b2, err := b.CloneBlock(nil) @@ -90,9 +91,10 @@ func TestCloneBlock(t *testing.T) { require.Equal(t, b2.Start, start) require.Equal(t, b2.BlockSize, blockSize) - read, err = b2.Read(p) + word, n, err = b2.Read64() - require.Equal(t, read, 1337) + require.Equal(t, uint64(1337), word) + require.Equal(t, byte(2), n) require.NoError(t, err) } @@ -121,16 +123,16 @@ func TestBlockReaderClone(t *testing.T) { func TestBlockReaderRead(t *testing.T) { br, sr := buildBlock(t) - var p []byte - - sr.EXPECT().Read(p).Return(0, errTest).Times(1) - read, err := br.Read(p) - require.Equal(t, read, 0) + sr.EXPECT().Read64().Return(uint64(0), byte(0), errTest).Times(1) + word, n, err := br.Read64() + require.Equal(t, uint64(0), word) + require.Equal(t, byte(0), n) require.Equal(t, err, errTest) - sr.EXPECT().Read(p).Return(100, nil).Times(1) - read, err = br.Read(p) - require.Equal(t, read, 100) + sr.EXPECT().Read64().Return(uint64(100), byte(1), nil).Times(1) + word, n, err = br.Read64() + require.Equal(t, uint64(100), word) + require.Equal(t, byte(1), n) require.NoError(t, err) } diff --git a/src/dbnode/x/xio/io_mock.go b/src/dbnode/x/xio/io_mock.go index daa99ba481..12f1c91cc7 100644 --- a/src/dbnode/x/xio/io_mock.go +++ b/src/dbnode/x/xio/io_mock.go @@ -83,19 +83,36 @@ func (mr *MockSegmentReaderMockRecorder) Finalize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockSegmentReader)(nil).Finalize)) } -// Read mocks base method -func (m *MockSegmentReader) Read(arg0 []byte) (int, error) { +// Peek64 mocks base method +func (m *MockSegmentReader) Peek64() (uint64, byte, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Read", arg0) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "Peek64") + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Peek64 indicates an expected call of Peek64 +func (mr *MockSegmentReaderMockRecorder) Peek64() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peek64", reflect.TypeOf((*MockSegmentReader)(nil).Peek64)) +} + +// Read64 mocks base method +func (m *MockSegmentReader) Read64() (uint64, byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read64") + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } -// Read indicates an expected call of Read -func (mr *MockSegmentReaderMockRecorder) Read(arg0 interface{}) *gomock.Call { +// Read64 indicates an expected call of Read64 +func (mr *MockSegmentReaderMockRecorder) Read64() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockSegmentReader)(nil).Read), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read64", reflect.TypeOf((*MockSegmentReader)(nil).Read64)) } // Reset mocks base method diff --git a/src/dbnode/x/xio/null.go b/src/dbnode/x/xio/null.go index 81fdebad47..146b12519f 100644 --- a/src/dbnode/x/xio/null.go +++ b/src/dbnode/x/xio/null.go @@ -27,7 +27,8 @@ import ( type nullSegmentReader struct{} -func (r nullSegmentReader) Read(_ []byte) (n int, err error) { return 0, nil } +func (r nullSegmentReader) Read64() (word uint64, n byte, err error) { return 0, 0, nil } +func (r nullSegmentReader) Peek64() (word uint64, n byte, err error) { return 0, 0, nil } func (r nullSegmentReader) Segment() (ts.Segment, error) { return ts.Segment{}, nil } func (r nullSegmentReader) Reset(_ ts.Segment) {} func (r nullSegmentReader) Finalize() {} diff --git a/src/dbnode/x/xio/reader64.go b/src/dbnode/x/xio/reader64.go new file mode 100644 index 0000000000..37ccf7a404 --- /dev/null +++ b/src/dbnode/x/xio/reader64.go @@ -0,0 +1,86 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xio + +import ( + "encoding/binary" + "io" +) + +// BytesReader64 implements a Reader64 over a slice of bytes. +type BytesReader64 struct { + data []byte + index int +} + +// NewBytesReader64 creates a new BytesReader64. +func NewBytesReader64(data []byte) *BytesReader64 { + return &BytesReader64{data: data} +} + +// Read64 reads and returns a 64 bit word plus a number of bytes (up to 8) actually read. +func (r *BytesReader64) Read64() (word uint64, n byte, err error) { + if r.index+8 <= len(r.data) { + // NB: this compiles to a single 64 bit load followed by + // a BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res := binary.BigEndian.Uint64(r.data[r.index:]) + r.index += 8 + return res, 8, nil + } + if r.index >= len(r.data) { + return 0, 0, io.EOF + } + var res uint64 + var bytes byte + for ; r.index < len(r.data); r.index++ { + res = (res << 8) | uint64(r.data[r.index]) + bytes++ + } + return res << (64 - 8*bytes), bytes, nil +} + +// Peek64 peeks and returns the next 64 bit word plus a number of bytes (up to 8) available. +func (r *BytesReader64) Peek64() (word uint64, n byte, err error) { + if r.index+8 <= len(r.data) { + // NB: this compiles to a single 64 bit load followed by + // BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res := binary.BigEndian.Uint64(r.data[r.index:]) + return res, 8, nil + } + + if r.index >= len(r.data) { + return 0, 0, io.EOF + } + + var res uint64 + var bytes byte + for i := r.index; i < len(r.data); i++ { + res = (res << 8) | uint64(r.data[i]) + bytes++ + } + return res << (64 - 8*bytes), bytes, nil +} + +// Reset resets the BytesReader64 for reuse. +func (r *BytesReader64) Reset(data []byte) { + r.data = data + r.index = 0 +} diff --git a/src/dbnode/x/xio/reader64_test.go b/src/dbnode/x/xio/reader64_test.go new file mode 100644 index 0000000000..98d29bdb11 --- /dev/null +++ b/src/dbnode/x/xio/reader64_test.go @@ -0,0 +1,74 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xio + +import ( + "encoding/binary" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBytesReader64(t *testing.T) { + var ( + data = []byte{4, 5, 6, 7, 8, 9, 1, 2, 3, 0, 10, 11, 12, 13, 14, 15, 16, 17} + r = NewBytesReader64(nil) + ) + + for l := 0; l < len(data); l++ { + testBytesReader64(t, r, data[:l]) + } +} + +func testBytesReader64(t *testing.T, r *BytesReader64, data []byte) { + r.Reset(data) + + var ( + peeked = []byte{} + read = []byte{} + buf [8]byte + word uint64 + n byte + err error + ) + + for { + word, n, err = r.Peek64() + if err != nil { + break + } + binary.BigEndian.PutUint64(buf[:], word) + peeked = append(peeked, buf[:n]...) + + word, n, err = r.Read64() + if err != nil { + break + } + binary.BigEndian.PutUint64(buf[:], word) + read = append(read, buf[:n]...) + } + + require.Equal(t, io.EOF, err) + assert.Equal(t, data, peeked) + assert.Equal(t, data, read) +} diff --git a/src/dbnode/x/xio/segment_reader.go b/src/dbnode/x/xio/segment_reader.go index 352c8feeea..5555937cc2 100644 --- a/src/dbnode/x/xio/segment_reader.go +++ b/src/dbnode/x/xio/segment_reader.go @@ -21,6 +21,7 @@ package xio import ( + "encoding/binary" "io" "github.com/m3db/m3/src/dbnode/ts" @@ -46,40 +47,103 @@ func (sr *segmentReader) Clone( return NewSegmentReader(sr.segment.Clone(pool)), nil } -func (sr *segmentReader) Read(b []byte) (int, error) { - if len(b) == 0 { - return 0, nil +func (sr *segmentReader) Read64() (word uint64, n byte, err error) { + sr.lazyInit() + + var ( + headLen = len(sr.lazyHead) + res uint64 + bytes byte + ) + + if sr.si+8 <= headLen { + // NB: this compiles to a single 64 bit load followed by + // a BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res = binary.BigEndian.Uint64(sr.lazyHead[sr.si:]) + sr.si += 8 + return res, 8, nil } - if b := sr.segment.Head; b != nil && len(sr.lazyHead) == 0 { - sr.lazyHead = b.Bytes() + headTailLen := headLen + len(sr.lazyTail) + + if sr.si < headLen { + for ; sr.si < headLen; sr.si++ { + res = (res << 8) | uint64(sr.lazyHead[sr.si]) + bytes++ + } + for ; sr.si < headTailLen && bytes < 8; sr.si++ { + res = (res << 8) | uint64(sr.lazyTail[sr.si-headLen]) + bytes++ + } + return res << (64 - 8*bytes), bytes, nil } - if b := sr.segment.Tail; b != nil && len(sr.lazyTail) == 0 { - sr.lazyTail = b.Bytes() + + if sr.si+8 <= headTailLen { + // NB: this compiles to a single 64 bit load followed by + // a BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res = binary.BigEndian.Uint64(sr.lazyTail[sr.si-headLen:]) + sr.si += 8 + return res, 8, nil + } + + if sr.si >= headTailLen { + return 0, 0, io.EOF } - nh, nt := len(sr.lazyHead), len(sr.lazyTail) - if sr.si >= nh+nt { - return 0, io.EOF + for ; sr.si < headTailLen; sr.si++ { + res = (res << 8) | uint64(sr.lazyTail[sr.si-headLen]) + bytes++ + } + return res << (64 - 8*bytes), bytes, nil +} + +func (sr *segmentReader) Peek64() (word uint64, n byte, err error) { + sr.lazyInit() + + var ( + headLen = len(sr.lazyHead) + i = sr.si + res uint64 + bytes byte + ) + + if i+8 <= headLen { + // NB: this compiles to a single 64 bit load followed by + // a BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res = binary.BigEndian.Uint64(sr.lazyHead[i:]) + return res, 8, nil } - n := 0 - if sr.si < nh { - nRead := copy(b, sr.lazyHead[sr.si:]) - sr.si += nRead - n += nRead - if n == len(b) { - return n, nil + + headTailLen := headLen + len(sr.lazyTail) + + if i < headLen { + for ; i < headLen; i++ { + res = (res << 8) | uint64(sr.lazyHead[i]) + bytes++ + } + for ; i < headTailLen && bytes < 8; i++ { + res = (res << 8) | uint64(sr.lazyTail[i-headLen]) + bytes++ } + return res << (64 - 8*bytes), bytes, nil } - if sr.si < nh+nt { - nRead := copy(b[n:], sr.lazyTail[sr.si-nh:]) - sr.si += nRead - n += nRead + + if i+8 <= headTailLen { + // NB: this compiles to a single 64 bit load followed by + // a BSWAPQ on amd64 gc 1.13 (https://godbolt.org/z/oTK1jx). + res = binary.BigEndian.Uint64(sr.lazyTail[i-headLen:]) + return res, 8, nil + } + + if i >= headTailLen { + return 0, 0, io.EOF } - if n == 0 { - return 0, io.EOF + + for ; i < headTailLen; i++ { + res = (res << 8) | uint64(sr.lazyTail[i-headLen]) + bytes++ } - return n, nil + return res << (64 - 8*bytes), bytes, nil } func (sr *segmentReader) Segment() (ts.Segment, error) { @@ -102,3 +166,12 @@ func (sr *segmentReader) Finalize() { pool.Put(sr) } } + +func (sr *segmentReader) lazyInit() { + if b := sr.segment.Head; b != nil && len(sr.lazyHead) == 0 { + sr.lazyHead = b.Bytes() + } + if b := sr.segment.Tail; b != nil && len(sr.lazyTail) == 0 { + sr.lazyTail = b.Bytes() + } +} diff --git a/src/dbnode/x/xio/segment_reader_test.go b/src/dbnode/x/xio/segment_reader_test.go index ef0f704fb0..fed14e0680 100644 --- a/src/dbnode/x/xio/segment_reader_test.go +++ b/src/dbnode/x/xio/segment_reader_test.go @@ -21,6 +21,7 @@ package xio import ( + "encoding/binary" "io" "testing" @@ -52,6 +53,10 @@ var ( type byteFunc func(d []byte) checked.Bytes +func checkedNoPool(d []byte) checked.Bytes { + return checked.NewBytes(d, nil) +} + func testSegmentReader( t *testing.T, checkd byteFunc, @@ -60,15 +65,9 @@ func testSegmentReader( checksum := uint32(10) segment := ts.NewSegment(checkd(head), checkd(tail), checksum, ts.FinalizeNone) r := NewSegmentReader(segment) - var b [100]byte - n, err := r.Read(b[:]) - require.NoError(t, err) - require.Equal(t, len(expected), n) - require.Equal(t, expected, b[:n]) - - n, err = r.Read(b[:]) + bytes, err := ToBytes(r) require.Equal(t, io.EOF, err) - require.Equal(t, 0, n) + require.Equal(t, expected, bytes) seg, err := r.Segment() require.NoError(t, err) @@ -92,13 +91,13 @@ func testSegmentReader( cloned.Finalize() segment.Finalize() } + func TestSegmentReaderNoPool(t *testing.T) { - checkd := func(d []byte) checked.Bytes { return checked.NewBytes(d, nil) } - testSegmentReader(t, checkd, nil) + testSegmentReader(t, checkedNoPool, nil) } func TestSegmentReaderWithPool(t *testing.T) { - bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{pool.Bucket{ + bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{{ Capacity: 1024, Count: 10, }}, nil, func(s []pool.Bucket) pool.BytesPool { @@ -115,3 +114,54 @@ func TestSegmentReaderWithPool(t *testing.T) { testSegmentReader(t, checkd, bytesPool) } + +func TestSegmentReader64(t *testing.T) { + data := make([]byte, 32) + for i := range data { + data[i] = 100 + byte(i) + } + + for headLen := 0; headLen < len(data); headLen++ { + for tailLen := 0; tailLen < len(data)-headLen; tailLen++ { + testSegmentReader64(t, data[:headLen], data[headLen:headLen+tailLen]) + } + } +} + +func testSegmentReader64(t *testing.T, head []byte, tail []byte) { + var expected []byte + expected = append(expected, head...) + expected = append(expected, tail...) + + var ( + segment = ts.NewSegment(checkedNoPool(head), checkedNoPool(tail), 0, ts.FinalizeNone) + r = NewSegmentReader(segment) + peeked, read []byte + buf [8]byte + word uint64 + n byte + err error + ) + + for { + word, n, err = r.Peek64() + if err != nil { + break + } + binary.BigEndian.PutUint64(buf[:], word) + peeked = append(peeked, buf[:n]...) + + word, n, err = r.Read64() + require.NoError(t, err) + + binary.BigEndian.PutUint64(buf[:], word) + read = append(read, buf[:n]...) + } + + require.Equal(t, io.EOF, err) + require.Equal(t, expected, peeked) + require.Equal(t, expected, read) + + _, _, err = r.Read64() + require.Equal(t, io.EOF, err) +} diff --git a/src/dbnode/x/xio/types.go b/src/dbnode/x/xio/types.go index fd02692fed..28cd107f0c 100644 --- a/src/dbnode/x/xio/types.go +++ b/src/dbnode/x/xio/types.go @@ -21,7 +21,6 @@ package xio import ( - "io" "time" "github.com/m3db/m3/src/dbnode/ts" @@ -42,7 +41,7 @@ var EmptyBlockReader = BlockReader{} // SegmentReader implements the io reader interface backed by a segment. type SegmentReader interface { - io.Reader + Reader64 xresource.Finalizer // Segment gets the segment read by this reader. @@ -103,3 +102,13 @@ type ReaderSliceOfSlicesFromBlockReadersIterator interface { // Reset resets the iterator with a new array of block readers arrays. Reset(blocks [][]BlockReader) } + +// Reader64 is a reader for reading 64 bit words. +type Reader64 interface { + + // Read64 reads and returns a 64 bit word plus a number of bytes (up to 8) actually read. + Read64() (word uint64, n byte, err error) + + // Read64 peeks and returns the next 64 bit word plus a number of bytes (up to 8) available. + Peek64() (word uint64, n byte, err error) +} diff --git a/src/dbnode/x/xio/utils.go b/src/dbnode/x/xio/utils.go new file mode 100644 index 0000000000..8374bc7af9 --- /dev/null +++ b/src/dbnode/x/xio/utils.go @@ -0,0 +1,40 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xio + +import "encoding/binary" + +// ToBytes reads and returns the contents of Reader64 as a slice of bytes. +// Should normally return io.EOF as an error. +func ToBytes(reader Reader64) ([]byte, error) { + var ( + res []byte + buf [8]byte + ) + + word, bytes, err := reader.Read64() + for ; err == nil; word, bytes, err = reader.Read64() { + binary.BigEndian.PutUint64(buf[:], word) + res = append(res, buf[:bytes]...) + } + + return res, err +} diff --git a/src/query/pools/query_pools.go b/src/query/pools/query_pools.go index 65c5c7ad99..d0e1dcd360 100644 --- a/src/query/pools/query_pools.go +++ b/src/query/pools/query_pools.go @@ -21,11 +21,10 @@ package pools import ( - "io" - "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" @@ -42,10 +41,8 @@ const ( defaultReplicas = 3 defaultSeriesIteratorPoolSize = 2 << 12 // ~8k defaultCheckedBytesWrapperPoolSize = 2 << 12 // ~8k - defaultBucketCapacity = 256 defaultPoolableConcurrentQueries = 64 defaultPoolableSeriesPerQuery = 4096 - defaultSeriesReplicaReaderPoolSize = defaultPoolableConcurrentQueries * defaultPoolableSeriesPerQuery * defaultReplicas ) var ( @@ -212,16 +209,15 @@ func BuildIteratorPools( encodingOpts := encoding.NewOptions(). SetReaderIteratorPool(readerIteratorPool) - readerIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) - }) + readerIteratorPool.Init(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts)) pools.multiReaderIterator = encoding.NewMultiReaderIteratorPool(defaultPerSeriesPoolOpts) - pools.multiReaderIterator.Init(func(r io.Reader, s namespace.SchemaDescr) encoding.ReaderIterator { - iter := readerIteratorPool.Get() - iter.Reset(r, s) - return iter - }) + pools.multiReaderIterator.Init( + func(r xio.Reader64, s namespace.SchemaDescr) encoding.ReaderIterator { + iter := readerIteratorPool.Get() + iter.Reset(r, s) + return iter + }) pools.seriesIterator = encoding.NewSeriesIteratorPool(defaultPerSeriesPoolOpts) pools.seriesIterator.Init() diff --git a/src/query/remote/compressed_codecs.go b/src/query/remote/compressed_codecs.go index 166cefcd2c..77f41772a6 100644 --- a/src/query/remote/compressed_codecs.go +++ b/src/query/remote/compressed_codecs.go @@ -22,7 +22,6 @@ package remote import ( "fmt" - "io" "sync" "time" @@ -47,14 +46,12 @@ func initializeVars() { b.Reset(nil) })) - iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - } + iterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()) } var ( opts checked.BytesOptions - iterAlloc func(r io.Reader, d namespace.SchemaDescr) encoding.ReaderIterator + iterAlloc func(r xio.Reader64, d namespace.SchemaDescr) encoding.ReaderIterator initialize sync.Once ) diff --git a/src/query/test/test_series_iterator.go b/src/query/test/test_series_iterator.go index a2adb81b66..26b1586fe8 100644 --- a/src/query/test/test_series_iterator.go +++ b/src/query/test/test_series_iterator.go @@ -22,7 +22,6 @@ package test import ( "fmt" - "io" "sort" "time" @@ -53,7 +52,7 @@ var ( // End is the expected end time for the generated series End time.Time - testIterAlloc func(r io.Reader, d namespace.SchemaDescr) encoding.ReaderIterator + testIterAlloc func(r xio.Reader64, d namespace.SchemaDescr) encoding.ReaderIterator ) func init() { @@ -68,9 +67,7 @@ func init() { Middle = Start.Add(BlockSize) End = Middle.Add(BlockSize) - testIterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - } + testIterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()) } // Builds a MultiReaderIterator representing a single replica diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 6b9a91f690..85b5ad166e 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -22,7 +22,6 @@ package m3db import ( "fmt" - "io" "os" "runtime" "sync" @@ -44,8 +43,8 @@ import ( "github.com/m3db/m3/src/x/pool" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - "github.com/pkg/profile" + "github.com/pkg/profile" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -478,7 +477,7 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset, s m3tsz.DefaultIntOptimizationEnabled, encodingOpts) iterAlloc := func( - r io.Reader, + r xio.Reader64, d namespace.SchemaDescr, ) encoding.ReaderIterator { readerIter.Reset(r, d) diff --git a/src/query/ts/m3db/options.go b/src/query/ts/m3db/options.go index 43867c654e..153df2aac7 100644 --- a/src/query/ts/m3db/options.go +++ b/src/query/ts/m3db/options.go @@ -23,13 +23,11 @@ package m3db import ( "errors" "fmt" - "io" "time" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/pools" queryconsolidator "github.com/m3db/m3/src/query/storage/m3/consolidators" @@ -39,13 +37,11 @@ import ( ) var ( - defaultCapacity = 1024 - defaultCount = 10 - defaultLookbackDuration = time.Duration(0) - defaultConsolidationFn = consolidators.TakeLast - defaultIterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions()) - } + defaultCapacity = 1024 + defaultCount = 10 + defaultLookbackDuration = time.Duration(0) + defaultConsolidationFn = consolidators.TakeLast + defaultIterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()) defaultIteratorBatchingFn = iteratorBatchingFn defaultBlockSeriesProcessor = NewBlockSeriesProcessor() defaultInstrumented = true