Skip to content

Commit

Permalink
[dbnode] Faster M3TSZ decoding by using 64 bit operations (#2827)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Feb 17, 2021
1 parent 30e1c10 commit ca40427
Show file tree
Hide file tree
Showing 57 changed files with 911 additions and 659 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
package parser

import (
"io"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

const sep rune = '!'
const tagSep rune = '.'

// Data is a set of datapoints.
type Data []ts.Datapoint

Expand All @@ -46,9 +41,7 @@ type IngestSeries struct {
Tags Tags
}

var iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
var iterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())

func buildBlockReader(
block Data,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tools/read_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/ident"

"github.com/pborman/getopt"
Expand Down Expand Up @@ -159,7 +159,7 @@ func main() {
if benchMode != benchmarkSeries {
data.IncRef()

iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
iter := m3tsz.NewReaderIterator(xio.NewBytesReader64(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
Expand Down
11 changes: 1 addition & 10 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package client
import (
"errors"
"fmt"
"io"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
Expand All @@ -43,11 +42,6 @@ const (
asyncWriteWorkerPoolDefaultSize = 128
)

var (
errConfigurationMustSupplyConfig = errors.New(
"must supply config when no topology initializer parameter supplied")
)

// Configuration is a configuration that can be used to construct a client.
type Configuration struct {
// The environment (static or dynamic) configuration.
Expand Down Expand Up @@ -412,10 +406,7 @@ func (c Configuration) NewAdminClient(
encodingOpts = encoding.NewOptions()
}

v = v.SetReaderIteratorAllocate(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
intOptimized := m3tsz.DefaultIntOptimizationEnabled
return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts)
})
v = v.SetReaderIteratorAllocate(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts))

if c.Proto != nil && c.Proto.Enabled {
v = v.SetEncodingProto(encodingOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package client

import (
"fmt"
"io"
"math/rand"
"os"
"sort"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -272,9 +270,7 @@ func initTestFetchTaggedPools() *testFetchTaggedPools {
pools.readerSlices.Init()

pools.multiReader = encoding.NewMultiReaderIteratorPool(opts)
pools.multiReader.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
})
pools.multiReader.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()))

pools.seriesIter = encoding.NewSeriesIteratorPool(opts)
pools.seriesIter.Init()
Expand Down
11 changes: 6 additions & 5 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package client

import (
"errors"
"io"
"math"
"runtime"
"time"
Expand All @@ -37,6 +36,7 @@ import (
m3dbruntime "github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -473,16 +473,17 @@ func (o *options) Validate() error {

func (o *options) SetEncodingM3TSZ() Options {
opts := *o
opts.readerIteratorAllocate = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
opts.readerIteratorAllocate = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())
opts.isProtoEnabled = false
return &opts
}

func (o *options) SetEncodingProto(encodingOpts encoding.Options) Options {
opts := *o
opts.readerIteratorAllocate = func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator {
opts.readerIteratorAllocate = func(
r xio.Reader64,
descr namespace.SchemaDescr,
) encoding.ReaderIterator {
return proto.NewIterator(r, descr, encodingOpts)
}
opts.isProtoEnabled = true
Expand Down
59 changes: 28 additions & 31 deletions src/dbnode/client/session_fetch_bulk_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -58,17 +57,19 @@ import (
)

var (
blockSize = 2 * time.Hour
nsID = ident.StringID("testNs1")
nsRetentionOpts = retention.NewOptions().
SetBlockSize(blockSize).
SetRetentionPeriod(48 * blockSize)
blockSize = 2 * time.Hour
nsID = ident.StringID("testNs1")

nsRetentionOpts = retention.NewOptions().SetBlockSize(blockSize).SetRetentionPeriod(48 * blockSize)

testTagDecodingPool = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions().SetSize(1))

testTagEncodingPool = serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))

testIDPool = newSessionTestOptions().IdentifierPool()
fooID = ident.StringID("foo")
fooTags checked.Bytes
Expand Down Expand Up @@ -101,9 +102,7 @@ func testsNsMetadata(t *testing.T) namespace.Metadata {

func newSessionTestMultiReaderIteratorPool() encoding.MultiReaderIteratorPool {
p := encoding.NewMultiReaderIteratorPool(nil)
p.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
})
p.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()))
return p
}

Expand Down Expand Up @@ -1455,10 +1454,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) {
require.True(t, ok)
segment, err := reader.Segment()
require.NoError(t, err)
rawBlockData := make([]byte, segment.Len())
n, err := reader.Read(rawBlockData)
require.NoError(t, err)
require.Equal(t, len(rawBlockData), n)
rawBlockData, err := xio.ToBytes(reader)
require.Equal(t, io.EOF, err)
require.Equal(t, len(rawBlockData), segment.Len())
rawBlockLen := int64(len(rawBlockData))

var (
Expand Down Expand Up @@ -1510,25 +1508,25 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) {
Return(&rpc.FetchBlocksRawResult_{
Elements: []*rpc.Blocks{
// First foo block intact
&rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{
{ID: []byte("foo"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: rawBlockData[:len(rawBlockData)-1],
Tail: []byte{rawBlockData[len(rawBlockData)-1]},
},
}},
}},
// First bar block intact, second with error
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{
{ID: []byte("bar"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: rawBlockData[:len(rawBlockData)-1],
Tail: []byte{rawBlockData[len(rawBlockData)-1]},
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{
{ID: []byte("bar"), Blocks: []*rpc.Block{
{Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{
Type: rpc.ErrorType_INTERNAL_ERROR,
Message: "an error",
}},
Expand Down Expand Up @@ -1606,10 +1604,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) {
require.True(t, ok)
segment, err := reader.Segment()
require.NoError(t, err)
rawBlockData := make([]byte, segment.Len())
n, err := reader.Read(rawBlockData)
require.NoError(t, err)
require.Equal(t, len(rawBlockData), n)
rawBlockData, err := xio.ToBytes(reader)
require.Equal(t, io.EOF, err)
require.Equal(t, len(rawBlockData), segment.Len())
rawBlockLen := int64(len(rawBlockData))

var (
Expand Down Expand Up @@ -1666,26 +1663,26 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) {
Return(&rpc.FetchBlocksRawResult_{
Elements: []*rpc.Blocks{
// valid foo block
&rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{
{ID: []byte("foo"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
{ID: []byte("bar"), Blocks: []*rpc.Block{
// invalid bar block
&rpc.Block{Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{
{Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
{ID: []byte("bar"), Blocks: []*rpc.Block{
// valid bar block, no checksum
&rpc.Block{Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{
{Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
Expand Down Expand Up @@ -1769,8 +1766,8 @@ func TestBlocksResultAddBlockFromPeerReadMerged(t *testing.T) {
require.NoError(t, err)

// Assert block has data
data, err := ioutil.ReadAll(xio.NewSegmentReader(seg))
require.NoError(t, err)
data, err := xio.ToBytes(xio.NewSegmentReader(seg))
require.Equal(t, io.EOF, err)
assert.Equal(t, []byte{1, 2, 3}, data)
}

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) {
}

// SignExtend sign extends the highest bit of v which has numBits (<=64).
func SignExtend(v uint64, numBits uint) int64 {
func SignExtend(v uint64, numBits uint8) int64 {
shift := 64 - numBits
return (int64(v) << shift) >> shift
}
Loading

0 comments on commit ca40427

Please sign in to comment.