diff --git a/src/dbnode/encoding/tile/frame_annotations.go b/src/dbnode/encoding/tile/frame_annotations.go deleted file mode 100644 index 2995355185..0000000000 --- a/src/dbnode/encoding/tile/frame_annotations.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "bytes" - "fmt" - - "github.com/m3db/m3/src/dbnode/ts" -) - -type annotationRecorder struct { - count int - a ts.Annotation - as []ts.Annotation -} - -var _ SeriesFrameAnnotations = (*annotationRecorder)(nil) - -func newAnnotationRecorder() *annotationRecorder { - return &annotationRecorder{} -} - -func (a *annotationRecorder) Value(idx int) (ts.Annotation, error) { - if idx < 0 || idx >= a.count { - return nil, fmt.Errorf("annotationRecorder.Value index (%d) out of bounds [0; %d)", idx, a.count) - } - - if a.singleValue() { - return a.a, nil - } - - return a.as[idx], nil -} - -func (a *annotationRecorder) singleValue() bool { - return a.count > 0 && len(a.as) == 0 -} - -func (a *annotationRecorder) record(annotation ts.Annotation) { - a.count++ - if a.count == 1 { - a.a = annotation - return - } - - // NB: annotation has already changed in this dataset. - if len(a.as) > 0 { - a.as = append(a.as, annotation) - return - } - - // NB: same annotation as previously recorded; skip. - if bytes.Equal(a.a, annotation) { - return - } - - if a.as == nil { - a.as = make([]ts.Annotation, 0, a.count) - } - - for i := 0; i < a.count-1; i++ { - a.as = append(a.as, a.a) - } - - a.as = append(a.as, annotation) -} - -func (a *annotationRecorder) reset() { - a.count = 0 - for i := range a.as { - a.as[i] = nil - } - - a.as = a.as[:0] -} diff --git a/src/dbnode/encoding/tile/frame_annotations_test.go b/src/dbnode/encoding/tile/frame_annotations_test.go deleted file mode 100644 index f9f17ae4b1..0000000000 --- a/src/dbnode/encoding/tile/frame_annotations_test.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "testing" - - "github.com/m3db/m3/src/dbnode/ts" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func a(s string) ts.Annotation { return ts.Annotation(s) } - -func (a *annotationRecorder) assertValue(t *testing.T, expected string, idx int) { - actual, err := a.Value(idx) - require.NoError(t, err) - assert.Equal(t, expected, string(actual), "index %d", idx) -} - -func (a *annotationRecorder) assertError(t *testing.T, idx int) { - _, err := a.Value(idx) - assert.Error(t, err, "index %d", idx) -} - -func TestSeriesFrameAnnotationsEmpty(t *testing.T) { - rec := newAnnotationRecorder() - rec.assertError(t,-1) - rec.assertError(t,0) -} - -func TestSeriesFrameAnnotationsSingle(t *testing.T) { - rec := newAnnotationRecorder() - - rec.record(a("foo")) - rec.assertValue(t, "foo", 0) - rec.assertError(t,1) - - rec.record(a("foo")) - rec.assertValue(t, "foo", 1) - rec.assertError(t,2) - - rec.reset() - rec.assertError(t,0) -} - -func TestSeriesFrameAnnotationsMultiple(t *testing.T) { - rec := newAnnotationRecorder() - rec.record(a("foo")) - rec.record(a("foo")) - rec.record(a("bar")) - - rec.assertValue(t, "foo", 0) - rec.assertValue(t, "foo", 1) - rec.assertValue(t, "bar", 2) - rec.assertError(t, 3) - - rec.reset() - rec.assertError(t,0) -} - -func TestSeriesFrameAnnotationsMultipleChanges(t *testing.T) { - rec := newAnnotationRecorder() - rec.record(a("foo")) - rec.record(a("bar")) - rec.record(a("baz")) - rec.record(a("foo")) - - rec.assertValue(t, "foo", 0) - rec.assertValue(t, "bar", 1) - rec.assertValue(t, "baz", 2) - rec.assertValue(t, "foo", 3) - rec.assertError(t, 4) - - rec.reset() - rec.assertError(t,0) -} diff --git a/src/dbnode/encoding/tile/frame_units.go b/src/dbnode/encoding/tile/frame_units.go deleted file mode 100644 index 14ade01737..0000000000 --- a/src/dbnode/encoding/tile/frame_units.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "fmt" - - xtime "github.com/m3db/m3/src/x/time" -) - -type unitRecorder struct { - count int - u xtime.Unit - us []xtime.Unit -} - -var _ SeriesFrameUnits = (*unitRecorder)(nil) - -func newUnitRecorder() *unitRecorder { - return &unitRecorder{} -} - -func (u *unitRecorder) Value(idx int) (xtime.Unit, error) { - if idx < 0 || idx >= u.count { - return 0, fmt.Errorf("unitRecorder.Value index (%d) out of bounds [0; %d)", idx, u.count) - } - - if u.singleValue() { - return u.u, nil - } - - return u.us[idx], nil -} - -func (u *unitRecorder) singleValue() bool { - return u.count > 0 && len(u.us) == 0 -} - -func (u *unitRecorder) record(unit xtime.Unit) { - u.count++ - if u.count == 1 { - u.u = unit - return - } - - // NB: unit has already changed in this dataset. - if len(u.us) > 0 { - u.us = append(u.us, unit) - return - } - - // NB: same unit as previously recorded; skip. - if u.u == unit { - return - } - - if u.us == nil { - u.us = make([]xtime.Unit, 0, u.count) - } - - for i := 0; i < u.count-1; i++ { - u.us = append(u.us, u.u) - } - - u.us = append(u.us, unit) -} - -func (u *unitRecorder) reset() { - u.count = 0 - u.us = u.us[:0] -} diff --git a/src/dbnode/encoding/tile/frame_units_test.go b/src/dbnode/encoding/tile/frame_units_test.go deleted file mode 100644 index a63b297610..0000000000 --- a/src/dbnode/encoding/tile/frame_units_test.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "testing" - - xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func (u *unitRecorder) assertValue(t *testing.T, expected xtime.Unit, idx int) { - actual, err := u.Value(idx) - require.NoError(t, err) - assert.Equal(t, expected, actual, "index %d", idx) -} - -func (u *unitRecorder) assertError(t *testing.T, idx int) { - _, err := u.Value(idx) - assert.Error(t, err, "index %d", idx) -} - -func TestSeriesFrameUnitsEmpty(t *testing.T) { - rec := newUnitRecorder() - rec.assertError(t,-1) - rec.assertError(t,0) -} - -func TestSeriesFrameUnitsSingle(t *testing.T) { - rec := newUnitRecorder() - - rec.record(xtime.Second) - rec.assertValue(t, xtime.Second, 0) - rec.assertError(t,1) - - rec.record(xtime.Second) - rec.assertValue(t, xtime.Second, 1) - rec.assertError(t,2) - - rec.reset() - rec.assertError(t,0) -} - -func TestSeriesFrameUnitsMultiple(t *testing.T) { - rec := newUnitRecorder() - - rec.record(xtime.Second) - rec.record(xtime.Second) - rec.record(xtime.Day) - rec.record(xtime.Second) - - rec.assertValue(t, xtime.Second, 0) - rec.assertValue(t, xtime.Second, 1) - rec.assertValue(t, xtime.Day, 2) - rec.assertValue(t, xtime.Second, 3) - rec.assertError(t, 4) - - rec.reset() - rec.assertError(t, 0) -} - -func TestSeriesFrameUnitsMultipleChanges(t *testing.T) { - rec := newUnitRecorder() - rec.record(xtime.Second) - rec.record(xtime.Day) - rec.record(xtime.Nanosecond) - - rec.assertValue(t, xtime.Second, 0) - rec.assertValue(t, xtime.Day, 1) - rec.assertValue(t, xtime.Nanosecond, 2) - rec.assertError(t, 3) - - rec.reset() - rec.assertError(t, 0) -} diff --git a/src/dbnode/encoding/tile/series_block_frame.go b/src/dbnode/encoding/tile/series_block_frame.go deleted file mode 100644 index 30d406c1c7..0000000000 --- a/src/dbnode/encoding/tile/series_block_frame.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "time" - - "github.com/m3db/m3/src/dbnode/ts" - xtime "github.com/m3db/m3/src/x/time" -) - -const ( - // NB: 2 hr block / 15 sec scrape as an initial size. - initLength = 2 * 60 * 4 -) - -type recorder struct { - vals []float64 - times []time.Time // todo: consider delta-delta here? - - units *unitRecorder - annotations *annotationRecorder -} - -func newRecorder() *recorder { - return &recorder{ - vals: make([]float64, 0, initLength), - times: make([]time.Time, 0, initLength), - - units: newUnitRecorder(), - annotations: newAnnotationRecorder(), - } -} - -func (r *recorder) reset() { - r.units.reset() - r.annotations.reset() - r.vals = r.vals[:0] - r.times = r.times[:0] -} - -func (r *recorder) record(dp ts.Datapoint, u xtime.Unit, a ts.Annotation) { - r.vals = append(r.vals, dp.Value) - r.times = append(r.times, dp.Timestamp) - r.units.record(u) - r.annotations.record(a) -} - -func newSeriesBlockFrame(recorder *recorder) SeriesBlockFrame { - return SeriesBlockFrame{recorder: recorder} -} - -func (f *SeriesBlockFrame) reset( - start xtime.UnixNano, - end xtime.UnixNano, -) { - f.recorder.reset() - f.FrameStartInclusive = start - f.FrameEndExclusive = end -} - -func (f *SeriesBlockFrame) record(dp ts.Datapoint, u xtime.Unit, a ts.Annotation) { - f.recorder.record(dp, u, a) -} - -// Values returns values in this SeriesBlockFrame. -func (f *SeriesBlockFrame) Values() []float64 { - return f.recorder.vals -} - -// Timestamps returns timestamps for the SeriesBlockFrame. -func (f *SeriesBlockFrame) Timestamps() []time.Time { - return f.recorder.times -} - -// Units returns units for the SeriesBlockFrame. -func (f *SeriesBlockFrame) Units() SeriesFrameUnits { - return f.recorder.units -} - -// Annotations returns annotations for the SeriesBlockFrame. -func (f *SeriesBlockFrame) Annotations() SeriesFrameAnnotations { - return f.recorder.annotations -} diff --git a/src/dbnode/encoding/tile/series_block_frame_test.go b/src/dbnode/encoding/tile/series_block_frame_test.go deleted file mode 100644 index b6f64d0903..0000000000 --- a/src/dbnode/encoding/tile/series_block_frame_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/ts" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSeriesBlockFrame(t *testing.T) { - seriesBlockFrame := newSeriesBlockFrame(newRecorder()) - start := time.Now().Truncate(time.Hour) - timeAt := func(i int) time.Time { return start.Add(time.Minute * time.Duration(i)) } - - addPoints := func(size int) { - for i := 0; i < size; i++ { - seriesBlockFrame.record( - ts.Datapoint{ - Value: float64(i), - Timestamp: timeAt(i), - }, - xtime.Microsecond, - ts.Annotation("foobar"), - ) - } - } - - verify := func(frame SeriesBlockFrame, size int) { - ex := make([]float64, size) - for i := range ex { - ex[i] = float64(i) - } - - vals := frame.Values() - require.Equal(t, size, len(vals)) - for i := 0; i < size; i++ { - assert.Equal(t, float64(i), vals[i]) - } - - assert.Equal(t, ex, frame.Values()) - - times := frame.Timestamps() - require.Equal(t, size, len(times)) - for i := 0; i < size; i++ { - require.Equal(t, timeAt(i), times[i]) - } - - for i := 0; i < size; i++ { - annotation, err := frame.Annotations().Value(i) - require.NoError(t, err) - assert.Equal(t, ts.Annotation("foobar"), annotation) - } - - for i := 0; i < size; i++ { - units, err := frame.Units().Value(i) - require.NoError(t, err) - assert.Equal(t, xtime.Microsecond, units) - } - } - - size := 5 - addPoints(size) - - verify(seriesBlockFrame, size) - seriesBlockFrame.reset(0, 0) - - size = 15 - addPoints(size) - verify(seriesBlockFrame, size) -} diff --git a/src/dbnode/encoding/tile/series_block_iterator.go b/src/dbnode/encoding/tile/series_block_iterator.go deleted file mode 100644 index 4c635c15a4..0000000000 --- a/src/dbnode/encoding/tile/series_block_iterator.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "time" - - "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" -) - -type seriesBlockIter struct { - reader fs.CrossBlockReader - - err error - exhausted bool - - step time.Duration - start xtime.UnixNano - - iter SeriesFrameIterator - blockIter fs.CrossBlockIterator - encodedTags ts.EncodedTags - id ident.BytesID -} - -// NewSeriesBlockIterator creates a new SeriesBlockIterator. -func NewSeriesBlockIterator( - reader fs.CrossBlockReader, - opts Options, -) (SeriesBlockIterator, error) { - return &seriesBlockIter{ - reader: reader, - - start: opts.Start, - step: opts.FrameSize, - - iter: newSeriesFrameIterator(newRecorder()), - blockIter: fs.NewCrossBlockIterator(opts.ReaderIteratorPool), - }, nil -} - -func (b *seriesBlockIter) Next() bool { - if b.exhausted || b.err != nil { - return false - } - - if !b.reader.Next() { - b.exhausted = true - b.err = b.reader.Err() - return false - } - - var blockRecords []fs.BlockRecord - b.id, b.encodedTags, blockRecords = b.reader.Current() - b.blockIter.Reset(blockRecords) - b.iter.Reset(b.start, b.step, b.blockIter) - return true -} - -func (b *seriesBlockIter) Current() (SeriesFrameIterator, ident.BytesID, ts.EncodedTags) { - return b.iter, b.id, b.encodedTags -} - -func (b *seriesBlockIter) Close() error { - b.blockIter.Close() - return b.iter.Close() -} - -func (b *seriesBlockIter) Err() error { - return b.err -} diff --git a/src/dbnode/encoding/tile/series_block_iterator_test.go b/src/dbnode/encoding/tile/series_block_iterator_test.go deleted file mode 100644 index 7ed9969890..0000000000 --- a/src/dbnode/encoding/tile/series_block_iterator_test.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/ident" - xtest "github.com/m3db/m3/src/x/test" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSeriesBlockIterator(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - it := encoding.NewMockReaderIterator(ctrl) - start := time.Now().Truncate(time.Hour) - - it.EXPECT().Err().Return(nil) - it.EXPECT().Reset(gomock.Any(), nil) - it.EXPECT().Next().Return(true) - it.EXPECT().Current().Return(ts.Datapoint{ - Timestamp: start, - Value: 1, - }, xtime.Second, nil) - it.EXPECT().Next().Return(false) - - iterPool := encoding.NewMockReaderIteratorPool(ctrl) - iterPool.EXPECT().Get().Return(it) - - opts := Options{ - FrameSize: time.Duration(100), - Start: xtime.UnixNano(0), - ReaderIteratorPool: iterPool, - } - - reader := fs.NewMockCrossBlockReader(ctrl) - reader.EXPECT().Next().Return(true) - tags := ts.EncodedTags("encoded tags") - records := []fs.BlockRecord{{Data: []byte("block_record")}} - reader.EXPECT().Current().Return(ident.StringID("foobar"), tags, records) - reader.EXPECT().Next().Return(false) - reader.EXPECT().Err().Return(nil) - - iter, err := NewSeriesBlockIterator(reader, opts) - require.NoError(t, err) - assert.True(t, iter.Next()) - frameIter, id, iterTags := iter.Current() - assert.True(t, frameIter.Next()) - frame := frameIter.Current() - - assert.False(t, frameIter.Next()) - assert.False(t, iter.Next()) - assert.Equal(t, []float64{1.0}, frame.Values()) - - assert.Equal(t, "foobar", id.String()) - assert.Equal(t, string(tags), string(iterTags)) -} diff --git a/src/dbnode/encoding/tile/series_frame_iterator.go b/src/dbnode/encoding/tile/series_frame_iterator.go deleted file mode 100644 index f76ebea25a..0000000000 --- a/src/dbnode/encoding/tile/series_frame_iterator.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "errors" - "fmt" - "time" - - "github.com/m3db/m3/src/dbnode/persist/fs" - xtime "github.com/m3db/m3/src/x/time" -) - -type seriesFrameIter struct { - err error - - exhausted bool - started bool - curr SeriesBlockFrame - - iter fs.CrossBlockIterator - - frameStep time.Duration - frameStart xtime.UnixNano -} - -func newSeriesFrameIterator(recorder *recorder) SeriesFrameIterator { - return &seriesFrameIter{ - err: errors.New("unset"), - curr: newSeriesBlockFrame(recorder), - } -} - -func (b *seriesFrameIter) Reset( - start xtime.UnixNano, - frameStep time.Duration, - iter fs.CrossBlockIterator, -) error { - if frameStep <= 0 { - b.err = fmt.Errorf("frame step must be > 0, is %d", frameStep) - return b.err - } - - b.err = nil - b.iter = iter - b.exhausted = false - b.started = false - b.frameStart = start - b.frameStep = frameStep - b.curr.reset(start, start+xtime.UnixNano(frameStep)) - - return nil -} - -func (b *seriesFrameIter) Err() error { - return b.err -} - -func (b *seriesFrameIter) Close() error { - if b.iter != nil { - b.iter = nil - } - - return nil -} - -func (b *seriesFrameIter) Next() bool { - if b.err != nil || b.exhausted { - return false - } - - if !b.started { - b.started = true - // NB: initialize iterator to valid value to frameStart. - if !b.iter.Next() { - return false - } - } else { - b.curr.reset(b.frameStart, b.frameStart+xtime.UnixNano(b.frameStep)) - } - - cutover := b.frameStart + xtime.UnixNano(b.frameStep) - b.curr.FrameStartInclusive = b.frameStart - b.curr.FrameEndExclusive = cutover - b.frameStart = cutover - firstPoint, firstUnit, firstAnnotation := b.iter.Current() - if firstPoint.TimestampNanos >= cutover { - // NB: empty block. - return true - } - - var hasAny, hasMore bool - b.curr.record(firstPoint, firstUnit, firstAnnotation) - for b.iter.Next() { - hasAny = true - dp, unit, annotation := b.iter.Current() - if dp.TimestampNanos >= cutover { - hasMore = true - break - } - - b.curr.record(dp, unit, annotation) - } - - if !hasAny { - b.exhausted = true - return true - } - - if err := b.iter.Err(); err != nil { - b.err = err - return false - } - - if !hasMore { - b.exhausted = true - } - - return true -} - -func (b *seriesFrameIter) Current() SeriesBlockFrame { - return b.curr -} diff --git a/src/dbnode/encoding/tile/series_frame_iterator_test.go b/src/dbnode/encoding/tile/series_frame_iterator_test.go deleted file mode 100644 index 8be361aa6b..0000000000 --- a/src/dbnode/encoding/tile/series_frame_iterator_test.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "math" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/ts" - xtest "github.com/m3db/m3/src/x/test" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func newSequentialIterator( - ctrl *gomock.Controller, - start time.Time, - step time.Duration, - numPoints int, -) fs.CrossBlockIterator { - it := fs.NewMockCrossBlockIterator(ctrl) - currVal, currTs, currTsNano := 0.0, start, xtime.ToUnixNano(start) - for i := 0; i < numPoints; i++ { - i := i - it.EXPECT().Next().DoAndReturn(func() bool { - // NB: only increment after first Next. - if i > 0 { - currVal++ - currTs = currTs.Add(step) - currTsNano += xtime.UnixNano(step) - } - return true - }).Times(1) - - it.EXPECT().Current().DoAndReturn(func() (ts.Datapoint, xtime.Unit, []byte) { - return ts.Datapoint{ - Value: currVal, - Timestamp: currTs, - TimestampNanos: currTsNano, - }, xtime.Second, nil - }).AnyTimes() - } - - it.EXPECT().Next().Return(false) - it.EXPECT().Err().Return(nil).AnyTimes() - it.EXPECT().Close() - - return it -} - -func halfFrameSizes(numPoints int) []float64 { - frames := make([]float64, numPoints*2-1) - v := 0.0 - for i := range frames { - if i%2 == 0 { - frames[i] = v - v++ - } else { - frames[i] = math.NaN() - } - } - - return frames -} - -func halfFrameCounts(numPoints int) []int { - frames := make([]int, numPoints*2-1) - for i := range frames { - if i%2 == 0 { - frames[i] = 1 - } - } - - return frames -} - -func TestSeriesFrameIterator(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - numPoints := 30 - start := time.Now().Truncate(time.Hour) - - stepSize := time.Second * 10 - - tests := []struct { - name string - frameSize time.Duration - exCounts []int - exSums []float64 - }{ - { - name: "5 second frame, 1 point every 2 frames - with empty frames", - frameSize: time.Second * 5, - exSums: halfFrameSizes(numPoints), - exCounts: halfFrameCounts(numPoints), - }, - { - name: "1 minute frame, 6 points per frame", - frameSize: time.Minute * 1, - exSums: []float64{ - 15 /* Σ 0..5 */, 51 /* Σ 6..11 */, 87, /* Σ 12..17 */ - 123 /*Σ18..23 */, 159 /*Σ24..30 */}, - exCounts: []int{6, 6, 6, 6, 6}, - }, - { - frameSize: time.Minute * 2, - exSums: []float64{66 /* Σ 0..11 */, 210 /* Σ 12..23 */, 159 /*Σ24..30 */}, - exCounts: []int{12, 12, 6}, - }, - { - name: "3 minute frame, 18 points per frame", - frameSize: time.Minute * 3, - exSums: []float64{153 /*Σ0..17 */, 282 /* Σ 18..30 */}, - exCounts: []int{18, 12}, - }, - { - name: "4 minute frame, 24 points per frame", - frameSize: time.Minute * 4, - exSums: []float64{276 /*Σ0..23 */, 159 /* Σ 24..30 */}, - exCounts: []int{24, 6}, - }, - { - name: "5 minute frame, 30 points per frame", - frameSize: time.Minute * 5, - exSums: []float64{435}, - exCounts: []int{30}, - }, - { - name: "6 minute frame, 30 points per frame (exhausted)", - frameSize: time.Minute * 6, - exSums: []float64{435}, - exCounts: []int{30}, - }, - } - - recorder := newRecorder() - it := newSeriesFrameIterator(recorder) - require.False(t, it.Next()) - require.Error(t, it.Err()) - - for _, tt := range tests { - iter := newSequentialIterator(ctrl, start, stepSize, numPoints) - require.NoError(t, it.Reset( - xtime.ToUnixNano(start), - tt.frameSize, - iter, - )) - - step := 0 - exVal := 0.0 - exTime := start.UnixNano() - for it.Next() { - require.True(t, step < len(tt.exSums)) - frame := it.Current() - assert.NotNil(t, frame) - - if s := tt.exSums[step]; math.IsNaN(s) { - assert.Equal(t, 0, len(frame.Values())) - } else { - v := 0.0 - for _, val := range frame.Values() { - v += val - } - - assert.Equal(t, s, v) - } - - vals := frame.Values() - require.Equal(t, tt.exCounts[step], len(vals)) - for i := 0; i < tt.exCounts[step]; i++ { - assert.Equal(t, exVal, vals[i]) - exVal++ - } - - times := frame.Timestamps() - require.Equal(t, tt.exCounts[step], len(times)) - for i := 0; i < tt.exCounts[step]; i++ { - assert.Equal(t, exTime, times[i].UnixNano()) - exTime = exTime + int64(time.Second*10) - } - - step++ - } - - assert.Equal(t, len(tt.exSums), step) - assert.NoError(t, iter.Err()) - iter.Close() - } - - assert.NoError(t, it.Err()) - assert.NoError(t, it.Close()) -} diff --git a/src/dbnode/encoding/tile/types.go b/src/dbnode/encoding/tile/types.go deleted file mode 100644 index 96329d6f0d..0000000000 --- a/src/dbnode/encoding/tile/types.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tile - -import ( - "time" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" -) - -// SeriesFrameUnits describes units in this series frame. -type SeriesFrameUnits interface { - // Value returns the annotation at the given frame index (or an error if index is out of bounds). - Value(idx int) (xtime.Unit, error) -} - -// SeriesFrameAnnotations describes annotations in this series frame. -type SeriesFrameAnnotations interface { - // Value returns the annotation at the given frame index (or an error if index is out of bounds). - Value(idx int) (ts.Annotation, error) -} - -// SeriesFrameIterator is a frame-wise iterator across a series block. -type SeriesFrameIterator interface { - // Err returns any errors encountered. - Err() error - // Next moves to the next element. - Next() bool - // Close closes the iterator. - Close() error - // Current returns the current series block frame. - Current() SeriesBlockFrame - // Reset resets the series frame iterator. - Reset( - start xtime.UnixNano, - step time.Duration, - iter fs.CrossBlockIterator, - ) error -} - -// SeriesBlockIterator provides concurrent iteration across multiple series -// in a frame-wise fashion. -type SeriesBlockIterator interface { - // Err returns any errors encountered. - Err() error - // Next moves to the next element. - Next() bool - // Close closes the iterator. - Close() error - // Current returns the next set of series frame iterators. - Current() (SeriesFrameIterator, ident.BytesID, ts.EncodedTags) -} - -// Options are series block iterator options. -type Options struct { - // FrameSize is the frame size in nanos. - FrameSize time.Duration - // Start is the start time for the iterator in nanos from epoch. - Start xtime.UnixNano - // EncodingOpts are options for the encoder. - EncodingOpts encoding.Options - // ReaderIteratorPool yields ReaderIterators. - ReaderIteratorPool encoding.ReaderIteratorPool -} - -// SeriesBlockFrame contains either all raw values -// for a given series in a block if the frame size -// was not specified, or the number of values -// that fall into the next sequential frame -// for a series in the block given the progression -// through each time series from the query Start time. -// e.g. with 10minute frame size that aligns with the -// query start, each series will return -// 12 frames in a two hour block. -type SeriesBlockFrame struct { - // FrameStartInclusive is inclusive start of frame. - FrameStartInclusive xtime.UnixNano - // FrameEndExclusive is exclusive end of frame. - FrameEndExclusive xtime.UnixNano - // recorder is the recorder. - recorder *recorder -}