From a091f23835e2306b28b0f9ae6fe5ec7806994ba9 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Fri, 20 Nov 2020 16:05:19 +0200 Subject: [PATCH] [dbnode] Remove dead code in fs package --- src/dbnode/generated/mocks/generate.go | 2 +- src/dbnode/persist/fs/cross_block_iterator.go | 94 ------ .../persist/fs/cross_block_iterator_test.go | 177 ---------- src/dbnode/persist/fs/cross_block_reader.go | 234 -------------- .../persist/fs/cross_block_reader_test.go | 305 ------------------ src/dbnode/persist/fs/fs_mock.go | 174 +--------- src/dbnode/persist/fs/types.go | 32 -- 7 files changed, 2 insertions(+), 1016 deletions(-) delete mode 100644 src/dbnode/persist/fs/cross_block_iterator.go delete mode 100644 src/dbnode/persist/fs/cross_block_iterator_test.go delete mode 100644 src/dbnode/persist/fs/cross_block_reader.go delete mode 100644 src/dbnode/persist/fs/cross_block_reader_test.go diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 3869d17577..de23a9efbf 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs CrossBlockReader,CrossBlockIterator,DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/persist/fs/cross_block_iterator.go b/src/dbnode/persist/fs/cross_block_iterator.go deleted file mode 100644 index fcae8851cf..0000000000 --- a/src/dbnode/persist/fs/cross_block_iterator.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package fs - -import ( - "bytes" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/dbnode/ts" - xtime "github.com/m3db/m3/src/x/time" -) - -type crossBlockIterator struct { - idx int - exhausted bool - current encoding.ReaderIterator - byteReader *bytes.Reader - records []BlockRecord -} - -// NewCrossBlockIterator creates a new CrossBlockIterator. -func NewCrossBlockIterator(pool encoding.ReaderIteratorPool) CrossBlockIterator { - c := &crossBlockIterator{current: pool.Get(), byteReader: bytes.NewReader(nil)} - c.Reset(nil) - return c -} - -func (c *crossBlockIterator) Next() bool { - if c.exhausted { - return false - } - - if c.idx >= 0 && c.current.Next() { - return true - } - - // NB: clear previous. - if c.idx >= 0 { - if c.current.Err() != nil { - c.exhausted = true - return false - } - } - - c.idx++ - if c.idx >= len(c.records) { - c.exhausted = true - return false - } - - c.byteReader.Reset(c.records[c.idx].Data) - c.current.Reset(c.byteReader, nil) - - // NB: rerun using the next record. - return c.Next() -} - -func (c *crossBlockIterator) Current() (ts.Datapoint, xtime.Unit, ts.Annotation) { - return c.current.Current() -} - -func (c *crossBlockIterator) Reset(records []BlockRecord) { - c.idx = -1 - c.records = records - c.exhausted = false - c.byteReader.Reset(nil) -} - -func (c *crossBlockIterator) Close() { - c.Reset(nil) - c.current.Close() -} - -func (c *crossBlockIterator) Err() error { - return c.current.Err() -} diff --git a/src/dbnode/persist/fs/cross_block_iterator_test.go b/src/dbnode/persist/fs/cross_block_iterator_test.go deleted file mode 100644 index 111580580c..0000000000 --- a/src/dbnode/persist/fs/cross_block_iterator_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package fs - -import ( - "fmt" - "io" - "io/ioutil" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/ts" - xtest "github.com/m3db/m3/src/x/test" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCrossBlockIterator(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - reader := encoding.NewMockReaderIterator(ctrl) - - iterPool := encoding.NewMockReaderIteratorPool(ctrl) - iterPool.EXPECT().Get().Return(reader) - - iter := NewCrossBlockIterator(iterPool) - assert.False(t, iter.Next()) - - count := 3 - iterCount := 5 - startTime := time.Now().Truncate(time.Hour) - start := startTime - records := make([]BlockRecord, 0, count) - for i := 0; i < count; i++ { - byteString := fmt.Sprint(i) - records = append(records, BlockRecord{ - Data: []byte(byteString), - }) - - reader.EXPECT().Reset(gomock.Any(), nil).Do( - func(r io.Reader, _ namespace.SchemaDescr) { - b, err := ioutil.ReadAll(r) - require.NoError(t, err) - assert.Equal(t, byteString, string(b)) - }) - - for j := 0; j < iterCount; j++ { - reader.EXPECT().Next().Return(true) - reader.EXPECT().Current().Return(ts.Datapoint{ - Value: float64(j), - Timestamp: start, - }, xtime.Second, nil) - start = start.Add(time.Minute) - } - - reader.EXPECT().Next().Return(false) - reader.EXPECT().Err().Return(nil) - } - - iter.Reset(records) - i := 0 - for iter.Next() { - dp, _, _ := iter.Current() - // NB: iterator values should go [0,1,...,iterCount] for each block record. - assert.Equal(t, float64(i%iterCount), dp.Value) - // NB: time should be constantly increasing per value. - assert.Equal(t, startTime.Add(time.Minute*time.Duration(i)), dp.Timestamp) - i++ - } - - assert.Equal(t, count*iterCount, i) - - reader.EXPECT().Err().Return(errExpected) - assert.Equal(t, errExpected, iter.Err()) - reader.EXPECT().Close() - iter.Close() -} - -func TestFailingCrossBlockIterator(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - reader := encoding.NewMockReaderIterator(ctrl) - - iterPool := encoding.NewMockReaderIteratorPool(ctrl) - iterPool.EXPECT().Get().Return(reader) - - iter := NewCrossBlockIterator(iterPool) - assert.False(t, iter.Next()) - - count := 4 - iterCount := 5 - remaining := 12 - startTime := time.Now().Truncate(time.Hour) - start := startTime - records := make([]BlockRecord, 0, count) - for i := 0; i < count; i++ { - byteString := fmt.Sprint(i) - data := []byte(byteString) - - if remaining == 0 { - records = append(records, BlockRecord{ - Data: data, - }) - continue - } - - records = append(records, BlockRecord{ - Data: data, - }) - - reader.EXPECT().Reset(gomock.Any(), nil).Do( - func(r io.Reader, _ namespace.SchemaDescr) { - b, err := ioutil.ReadAll(r) - require.NoError(t, err) - assert.Equal(t, byteString, string(b)) - }) - - for j := 0; remaining > 0 && j < iterCount; j++ { - reader.EXPECT().Next().Return(true) - reader.EXPECT().Current().Return(ts.Datapoint{ - Value: float64(j), - Timestamp: start, - }, xtime.Second, nil) - start = start.Add(time.Minute) - remaining-- - } - - reader.EXPECT().Next().Return(false) - if remaining == 0 { - reader.EXPECT().Err().Return(errExpected).Times(2) - } else { - reader.EXPECT().Err().Return(nil) - } - } - - iter.Reset(records) - i := 0 - for iter.Next() { - dp, _, _ := iter.Current() - // NB: iterator values should go [0,1,...,iterCount] for each block record. - assert.Equal(t, float64(i%iterCount), dp.Value) - // NB: time should be constantly increasing per value. - assert.Equal(t, startTime.Add(time.Minute*time.Duration(i)), dp.Timestamp) - i++ - } - - assert.Equal(t, 12, i) - - assert.Equal(t, errExpected, iter.Err()) - reader.EXPECT().Close() - iter.Close() -} diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go deleted file mode 100644 index 6184bde98c..0000000000 --- a/src/dbnode/persist/fs/cross_block_reader.go +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package fs - -import ( - "bytes" - "container/heap" - "errors" - "io" - "time" - - "github.com/m3db/m3/src/dbnode/ts" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/instrument" -) - -var ( - errReaderNotOrderedByIndex = errors.New("crossBlockReader can only use DataFileSetReaders ordered by index") - errUnorderedDataFileSetReaders = errors.New("dataFileSetReaders are not ordered by time") -) - -type crossBlockReader struct { - dataFileSetReaders []DataFileSetReader - pendingReaderIndices []int - preAllocatedEntries []*minHeapEntry - minHeap minHeap - - iOpts instrument.Options - - id ident.BytesID - encodedTags ts.EncodedTags - records []BlockRecord - err error -} - -// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. -// DataFileSetReaders must be configured to return the data in the order of index, and must be -// provided in a slice sorted by block start time. -// Callers are responsible for closing the DataFileSetReaders. -func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader, iOpts instrument.Options) (CrossBlockReader, error) { - var previousStart time.Time - for _, dataFileSetReader := range dataFileSetReaders { - if !dataFileSetReader.StreamingEnabled() { - return nil, errReaderNotOrderedByIndex - } - currentStart := dataFileSetReader.Range().Start - if !currentStart.After(previousStart) { - return nil, errUnorderedDataFileSetReaders - } - previousStart = currentStart - } - - pendingReaderIndices := make([]int, len(dataFileSetReaders)) - preAllocatedEntries := make([]*minHeapEntry, len(dataFileSetReaders)) - for i := range dataFileSetReaders { - pendingReaderIndices[i] = i - preAllocatedEntries[i] = &minHeapEntry{} - } - - return &crossBlockReader{ - dataFileSetReaders: dataFileSetReaders, - pendingReaderIndices: pendingReaderIndices, - minHeap: make([]*minHeapEntry, 0, len(dataFileSetReaders)), - preAllocatedEntries: preAllocatedEntries, - records: make([]BlockRecord, 0, len(dataFileSetReaders)), - iOpts: iOpts, - }, nil -} - -func (r *crossBlockReader) Next() bool { - if r.err != nil { - return false - } - - // use empty var in inner loop with "for i := range" to have compiler use memclr optimization - // see: https://codereview.appspot.com/137880043 - var emptyRecord BlockRecord - for i := range r.records { - r.records[i] = emptyRecord - } - r.records = r.records[:0] - - if len(r.pendingReaderIndices) == 0 { - return false - } - - var err error - for _, readerIndex := range r.pendingReaderIndices { - *r.preAllocatedEntries[readerIndex], err = r.readFromDataFileSet(readerIndex) - if err == io.EOF { - // Will no longer read from this one. - continue - } else if err != nil { - r.err = err - return false - } else { - heap.Push(&r.minHeap, r.preAllocatedEntries[readerIndex]) - } - } - - r.pendingReaderIndices = r.pendingReaderIndices[:0] - - if len(r.minHeap) == 0 { - return false - } - - firstEntry := heap.Pop(&r.minHeap).(*minHeapEntry) - - r.id = firstEntry.id - r.encodedTags = firstEntry.encodedTags - - r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) - - // We have consumed an entry from this dataFileSetReader, so need to schedule a read from it on the next Next(). - r.pendingReaderIndices = append(r.pendingReaderIndices, firstEntry.dataFileSetReaderIndex) - - // As long as id stays the same across the blocks, accumulate records for this id/tags. - for len(r.minHeap) > 0 && bytes.Equal(r.minHeap[0].id.Bytes(), firstEntry.id.Bytes()) { - nextEntry := heap.Pop(&r.minHeap).(*minHeapEntry) - - r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum}) - - // We have consumed an entry from this dataFileSetReader, so need to schedule a read from it on the next Next(). - r.pendingReaderIndices = append(r.pendingReaderIndices, nextEntry.dataFileSetReaderIndex) - } - - return true -} - -func (r *crossBlockReader) Current() (ident.BytesID, ts.EncodedTags, []BlockRecord) { - return r.id, r.encodedTags, r.records -} - -func (r *crossBlockReader) readFromDataFileSet(index int) (minHeapEntry, error) { - id, encodedTags, data, checksum, err := r.dataFileSetReaders[index].StreamingRead() - - if err == io.EOF { - return minHeapEntry{}, err - } - - if err != nil { - multiErr := xerrors.NewMultiError(). - Add(err). - Add(r.Close()) - return minHeapEntry{}, multiErr.FinalError() - } - - return minHeapEntry{ - dataFileSetReaderIndex: index, - id: id, - encodedTags: encodedTags, - data: data, - checksum: checksum, - }, nil -} - -func (r *crossBlockReader) Err() error { - return r.err -} - -func (r *crossBlockReader) Close() error { - var emptyRecord BlockRecord - for i := range r.records { - r.records[i] = emptyRecord - } - r.records = r.records[:0] - - for i := range r.minHeap { - r.minHeap[i] = nil - } - r.minHeap = r.minHeap[:0] - - return nil -} - -type minHeapEntry struct { - id ident.BytesID - encodedTags []byte - data []byte - dataFileSetReaderIndex int - checksum uint32 -} - -var _ heap.Interface = (*minHeap)(nil) - -type minHeap []*minHeapEntry - -func (h minHeap) Len() int { - return len(h) -} - -func (h minHeap) Less(i, j int) bool { - idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes()) - if idsCmp == 0 { - return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex - } - return idsCmp < 0 -} - -func (h minHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *minHeap) Push(x interface{}) { - *h = append(*h, x.(*minHeapEntry)) -} - -func (h *minHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - old[n-1] = nil - *h = old[0 : n-1] - return x -} diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go deleted file mode 100644 index c8cb52c7cb..0000000000 --- a/src/dbnode/persist/fs/cross_block_reader_test.go +++ /dev/null @@ -1,305 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package fs - -import ( - "errors" - "fmt" - "io" - "strconv" - "strings" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/instrument" - xtest "github.com/m3db/m3/src/x/test" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var errExpected = errors.New("expected error") - -func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - dfsReader := NewMockDataFileSetReader(ctrl) - dfsReader.EXPECT().StreamingEnabled().Return(false) - - _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}, instrument.NewTestOptions(t)) - - assert.Equal(t, errReaderNotOrderedByIndex, err) -} - -func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - now := time.Now().Truncate(time.Hour) - dfsReader1 := NewMockDataFileSetReader(ctrl) - dfsReader1.EXPECT().StreamingEnabled().Return(true) - dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now}) - - later := now.Add(time.Hour) - dfsReader2 := NewMockDataFileSetReader(ctrl) - dfsReader2.EXPECT().StreamingEnabled().Return(true) - dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) - - _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}, instrument.NewTestOptions(t)) - - assert.Equal(t, errUnorderedDataFileSetReaders, err) -} - -func TestCrossBlockReader(t *testing.T) { - tests := []struct { - name string - blockSeriesIDs [][]string - expectedIDs []string - }{ - { - name: "no readers", - blockSeriesIDs: [][]string{}, - expectedIDs: []string{}, - }, - { - name: "empty readers", - blockSeriesIDs: [][]string{{}, {}, {}}, - expectedIDs: []string{}, - }, - { - name: "one reader, one series", - blockSeriesIDs: [][]string{{"id1"}}, - expectedIDs: []string{"id1"}, - }, - { - name: "one reader, many series", - blockSeriesIDs: [][]string{{"id1", "id2", "id3"}}, - expectedIDs: []string{"id1", "id2", "id3"}, - }, - { - name: "many readers with same series", - blockSeriesIDs: [][]string{{"id1"}, {"id1"}, {"id1"}}, - expectedIDs: []string{"id1"}, - }, - { - name: "many readers with different series", - blockSeriesIDs: [][]string{{"id1"}, {"id2"}, {"id3"}}, - expectedIDs: []string{"id1", "id2", "id3"}, - }, - { - name: "many readers with unordered series", - blockSeriesIDs: [][]string{{"id3"}, {"id1"}, {"id2"}}, - expectedIDs: []string{"id1", "id2", "id3"}, - }, - { - name: "complex case", - blockSeriesIDs: [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}, - expectedIDs: []string{"id1", "id2", "id3", "id4", "id5"}, - }, - { - name: "duplicate ids within a block", - blockSeriesIDs: [][]string{{"id1", "id2"}, {"id2", "id2"}}, - expectedIDs: []string{"id1", "id2", "id2"}, - }, - { - name: "immediate reader error", - blockSeriesIDs: [][]string{{"error"}}, - expectedIDs: []string{}, - }, - { - name: "reader error later", - blockSeriesIDs: [][]string{{"id1", "id2"}, {"id1", "error"}}, - expectedIDs: []string{"id1"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - testCrossBlockReader(t, tt.blockSeriesIDs, tt.expectedIDs) - }) - } -} - -func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string, expectedIDs []string) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - now := time.Now().Truncate(time.Hour) - var dfsReaders []DataFileSetReader - - expectedBlockCount := 0 - - for blockIndex, ids := range blockSeriesIds { - dfsReader := NewMockDataFileSetReader(ctrl) - dfsReader.EXPECT().StreamingEnabled().Return(true) - dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}).AnyTimes() - - blockHasError := false - for i, strID := range ids { - if strID == "error" { - dfsReader.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), errExpected) - blockHasError = true - } else { - id := ident.BytesID(strID) - tags := ts.EncodedTags(fmt.Sprintf("tags for %s", strID)) - data := []byte(strconv.Itoa(i)) - - checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions - - dfsReader.EXPECT().StreamingRead().Return(id, tags, data, checksum, nil) - } - } - - if !blockHasError { - dfsReader.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1) - } - - dfsReaders = append(dfsReaders, dfsReader) - expectedBlockCount += len(ids) - } - - cbReader, err := NewCrossBlockReader(dfsReaders, instrument.NewTestOptions(t)) - require.NoError(t, err) - defer cbReader.Close() - - blockCount := 0 - seriesCount := 0 - for cbReader.Next() { - id, tags, records := cbReader.Current() - - assert.Equal(t, expectedIDs[seriesCount], string(id)) - assert.Equal(t, fmt.Sprintf("tags for %s", expectedIDs[seriesCount]), string(tags)) - - previousBlockIndex := -1 - for _, record := range records { - blockIndex := int(record.DataChecksum) // see the comment above - assert.True(t, blockIndex > previousBlockIndex, "same id blocks must be read in temporal order") - - previousBlockIndex = blockIndex - assert.NotNil(t, record.Data) - } - - blockCount += len(records) - seriesCount++ - } - - assert.Equal(t, len(expectedIDs), seriesCount, "count of series read") - - err = cbReader.Err() - if err == nil || (err.Error() != errExpected.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { - require.NoError(t, cbReader.Err()) - assert.Equal(t, expectedBlockCount, blockCount, "count of blocks read") - } - - for _, dfsReader := range dfsReaders { - assert.NotNil(t, dfsReader) - } -} - -func TestSkippingReader(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - now := time.Now().Truncate(time.Hour) - var dfsReaders []DataFileSetReader - - expectedBlockCount := 0 - - blockSeriesIDs := [][]string{{"id1"}, {"id2"}, {"id3"}} - expectedIDs := []string{"id3"} - for blockIndex, ids := range blockSeriesIDs { - dfsReader := NewMockDataFileSetReader(ctrl) - dfsReader.EXPECT().StreamingEnabled().Return(true) - dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}).AnyTimes() - - blockHasError := false - for j, id := range ids { - if id == "error" { - dfsReader.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), errExpected) - blockHasError = true - } else { - tags := []byte(fmt.Sprintf("tags for %s", id)) - data := []byte{byte(j)} - - checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions - - dfsReader.EXPECT().StreamingRead().Return(ident.BytesID(id), tags, data, checksum, nil) - } - } - - if !blockHasError { - dfsReader.EXPECT().StreamingRead().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1) - } - - dfsReaders = append(dfsReaders, dfsReader) - expectedBlockCount += len(ids) - } - - cbReader, err := NewCrossBlockReader(dfsReaders, instrument.NewTestOptions(t)) - require.NoError(t, err) - defer cbReader.Close() - - blockCount := 0 - seriesCount := 0 - - // NB: skip first two - expectedBlockCount -= 2 - require.True(t, cbReader.Next()) - require.True(t, cbReader.Next()) - for cbReader.Next() { - // NB: call Current twice to ensure it does not mutate values. - id, _, _ := cbReader.Current() - assert.Equal(t, expectedIDs[seriesCount], id.String()) - _, tags, records := cbReader.Current() - - strID := string(id) - assert.Equal(t, expectedIDs[seriesCount], strID) - assert.Equal(t, fmt.Sprintf("tags for %s", strID), string(tags)) - - previousBlockIndex := -1 - for _, record := range records { - blockIndex := int(record.DataChecksum) // see the comment above - assert.True(t, blockIndex > previousBlockIndex, "same id blocks must be read in temporal order") - - previousBlockIndex = blockIndex - assert.NotNil(t, record.Data) - } - - blockCount += len(records) - seriesCount++ - } - - assert.Equal(t, len(expectedIDs), seriesCount, "count of series read") - - err = cbReader.Err() - if err == nil || (err.Error() != errExpected.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { - require.NoError(t, cbReader.Err()) - assert.Equal(t, expectedBlockCount, blockCount, "count of blocks read") - } - - for _, dfsReader := range dfsReaders { - assert.NotNil(t, dfsReader) - } -} diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 861c2659a1..25c89d94ca 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: CrossBlockReader,CrossBlockIterator,DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter) // Copyright (c) 2020 Uber Technologies, Inc. // @@ -43,178 +43,6 @@ import ( "github.com/golang/mock/gomock" ) -// MockCrossBlockReader is a mock of CrossBlockReader interface -type MockCrossBlockReader struct { - ctrl *gomock.Controller - recorder *MockCrossBlockReaderMockRecorder -} - -// MockCrossBlockReaderMockRecorder is the mock recorder for MockCrossBlockReader -type MockCrossBlockReaderMockRecorder struct { - mock *MockCrossBlockReader -} - -// NewMockCrossBlockReader creates a new mock instance -func NewMockCrossBlockReader(ctrl *gomock.Controller) *MockCrossBlockReader { - mock := &MockCrossBlockReader{ctrl: ctrl} - mock.recorder = &MockCrossBlockReaderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockCrossBlockReader) EXPECT() *MockCrossBlockReaderMockRecorder { - return m.recorder -} - -// Close mocks base method -func (m *MockCrossBlockReader) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close -func (mr *MockCrossBlockReaderMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockReader)(nil).Close)) -} - -// Current mocks base method -func (m *MockCrossBlockReader) Current() (ident.BytesID, ts.EncodedTags, []BlockRecord) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Current") - ret0, _ := ret[0].(ident.BytesID) - ret1, _ := ret[1].(ts.EncodedTags) - ret2, _ := ret[2].([]BlockRecord) - return ret0, ret1, ret2 -} - -// Current indicates an expected call of Current -func (mr *MockCrossBlockReaderMockRecorder) Current() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockCrossBlockReader)(nil).Current)) -} - -// Err mocks base method -func (m *MockCrossBlockReader) Err() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Err") - ret0, _ := ret[0].(error) - return ret0 -} - -// Err indicates an expected call of Err -func (mr *MockCrossBlockReaderMockRecorder) Err() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockCrossBlockReader)(nil).Err)) -} - -// Next mocks base method -func (m *MockCrossBlockReader) Next() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Next") - ret0, _ := ret[0].(bool) - return ret0 -} - -// Next indicates an expected call of Next -func (mr *MockCrossBlockReaderMockRecorder) Next() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockCrossBlockReader)(nil).Next)) -} - -// MockCrossBlockIterator is a mock of CrossBlockIterator interface -type MockCrossBlockIterator struct { - ctrl *gomock.Controller - recorder *MockCrossBlockIteratorMockRecorder -} - -// MockCrossBlockIteratorMockRecorder is the mock recorder for MockCrossBlockIterator -type MockCrossBlockIteratorMockRecorder struct { - mock *MockCrossBlockIterator -} - -// NewMockCrossBlockIterator creates a new mock instance -func NewMockCrossBlockIterator(ctrl *gomock.Controller) *MockCrossBlockIterator { - mock := &MockCrossBlockIterator{ctrl: ctrl} - mock.recorder = &MockCrossBlockIteratorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockCrossBlockIterator) EXPECT() *MockCrossBlockIteratorMockRecorder { - return m.recorder -} - -// Close mocks base method -func (m *MockCrossBlockIterator) Close() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Close") -} - -// Close indicates an expected call of Close -func (mr *MockCrossBlockIteratorMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockIterator)(nil).Close)) -} - -// Current mocks base method -func (m *MockCrossBlockIterator) Current() (ts.Datapoint, time0.Unit, ts.Annotation) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Current") - ret0, _ := ret[0].(ts.Datapoint) - ret1, _ := ret[1].(time0.Unit) - ret2, _ := ret[2].(ts.Annotation) - return ret0, ret1, ret2 -} - -// Current indicates an expected call of Current -func (mr *MockCrossBlockIteratorMockRecorder) Current() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockCrossBlockIterator)(nil).Current)) -} - -// Err mocks base method -func (m *MockCrossBlockIterator) Err() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Err") - ret0, _ := ret[0].(error) - return ret0 -} - -// Err indicates an expected call of Err -func (mr *MockCrossBlockIteratorMockRecorder) Err() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockCrossBlockIterator)(nil).Err)) -} - -// Next mocks base method -func (m *MockCrossBlockIterator) Next() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Next") - ret0, _ := ret[0].(bool) - return ret0 -} - -// Next indicates an expected call of Next -func (mr *MockCrossBlockIteratorMockRecorder) Next() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockCrossBlockIterator)(nil).Next)) -} - -// Reset mocks base method -func (m *MockCrossBlockIterator) Reset(arg0 []BlockRecord) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", arg0) -} - -// Reset indicates an expected call of Reset -func (mr *MockCrossBlockIteratorMockRecorder) Reset(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCrossBlockIterator)(nil).Reset), arg0) -} - // MockDataFileSetWriter is a mock of DataFileSetWriter interface type MockDataFileSetWriter struct { ctrl *gomock.Controller diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index ecce353743..6652c2f5c5 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -660,38 +660,6 @@ type Segments interface { BlockStart() time.Time } -// BlockRecord wraps together M3TSZ data bytes with their checksum. -type BlockRecord struct { - Data []byte - DataChecksum uint32 -} - -// CrossBlockReader allows reading data (encoded bytes) from multiple -// DataFileSetReaders of the same shard, ordered lexographically by series ID, -// then by block time. -type CrossBlockReader interface { - io.Closer - - // Next advances to the next data record, returning true if it exists. - Next() bool - - // Err returns the last error encountered (if any). - Err() error - - // Current returns distinct series id and encodedTags, plus a slice with data - // and checksums from all blocks corresponding to that series (in temporal order). - // ID, encodedTags, records, and underlying data are invalidated on each call to Next(). - Current() (id ident.BytesID, encodedTags ts.EncodedTags, records []BlockRecord) -} - -// CrossBlockIterator iterates across BlockRecords. -type CrossBlockIterator interface { - encoding.Iterator - - // Reset resets the iterator to the given block records. - Reset(records []BlockRecord) -} - // IndexClaimsManager manages concurrent claims to volume indices per ns and block start. // This allows multiple threads to safely increment the volume index. type IndexClaimsManager interface {