Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colserde: add simple serialization for DECIMALs #43311

Merged
merged 1 commit into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 71 additions & 45 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ var supportedTypes = func() map[coltypes.T]struct{} {
for _, t := range []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Float64,
coltypes.Int16,
coltypes.Int32,
coltypes.Int64,
coltypes.Float64,
coltypes.Timestamp,
} {
typs[t] = struct{}{}
Expand Down Expand Up @@ -115,15 +116,25 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
arrowBitmap = n.NullBitmap()
}

if typ == coltypes.Bool || typ == coltypes.Timestamp {
// Bools and Timestamps are handled differently from other coltypes.
// Refer to the comment on ArrowBatchConverter.builders for more
// information.
if typ == coltypes.Bool || typ == coltypes.Decimal || typ == coltypes.Timestamp {
// Bools, Decimals, and Timestamps are handled differently from other
// coltypes. Refer to the comment on ArrowBatchConverter.builders for
// more information.
var data *array.Data
switch typ {
case coltypes.Bool:
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data = c.builders.boolBuilder.NewBooleanArray().Data()
case coltypes.Decimal:
decimals := vec.Decimal()[:n]
for _, d := range decimals {
marshaled, err := d.MarshalText()
if err != nil {
return nil, err
}
c.builders.binaryBuilder.Append(marshaled)
}
data = c.builders.binaryBuilder.NewBinaryArray().Data()
case coltypes.Timestamp:
timestamps := vec.Timestamp()[:n]
for _, ts := range timestamps {
Expand Down Expand Up @@ -246,49 +257,64 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
d := data[i]

var arr array.Interface
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Decimal:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Decimal()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalText(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
}
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
arr = bytesArr
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
} else {
arr = bytesArr
default:
var col interface{}
switch typ {
case coltypes.Int16:
Expand Down
31 changes: 12 additions & 19 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,9 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) {
const maxTyps = 16
rng, _ := randutil.NewPseudoRand()

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(asubiotto,jordan): We do not support decimal conversion yet.
if typ == coltypes.Decimal {
continue
}
availableTyps = append(availableTyps, typ)
}
typs := make([]coltypes.T, rng.Intn(maxTyps)+1)
for i := range typs {
typs[i] = availableTyps[rng.Intn(len(availableTyps))]
typs[i] = coltypes.AllTypes[rng.Intn(len(coltypes.AllTypes))]
}

capacity := rng.Intn(int(coldata.BatchSize())) + 1
Expand Down Expand Up @@ -128,16 +120,6 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
}
}

func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

unsupportedTypes := []coltypes.T{coltypes.Decimal}
for _, typ := range unsupportedTypes {
_, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -215,6 +197,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
typs := []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Int64,
coltypes.Timestamp,
}
Expand All @@ -224,6 +207,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
numBytes := []int64{
int64(coldata.BatchSize()),
fixedLen * int64(coldata.BatchSize()),
0, // The number of bytes for decimals will be set below.
8 * int64(coldata.BatchSize()),
3 * 8 * int64(coldata.BatchSize()),
}
Expand Down Expand Up @@ -251,6 +235,15 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
}
}
batch.ColVec(0).SetCol(newBytes)
} else if typ == coltypes.Decimal {
// Decimal is variable length type, so we want to calculate precisely the
// total size of all decimals in the vector.
decimals := batch.ColVec(0).Decimal()
for _, d := range decimals {
marshaled, err := d.MarshalText()
require.NoError(b, err)
numBytes[typIdx] += int64(len(marshaled))
}
}
c, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.NoError(b, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/col/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func schema(fb *flatbuffers.Builder, typs []coltypes.T) flatbuffers.UOffsetT {
arrowserde.FloatingPointAddPrecision(fb, arrowserde.PrecisionDOUBLE)
fbTypOffset = arrowserde.FloatingPointEnd(fb)
fbTyp = arrowserde.TypeFloatingPoint
case coltypes.Decimal:
// Decimals are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
fbTypOffset = arrowserde.BinaryEnd(fb)
fbTyp = arrowserde.TypeDecimal
case coltypes.Timestamp:
// Timestamps are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
Expand Down Expand Up @@ -456,6 +461,8 @@ func typeFromField(field *arrowserde.Field) (coltypes.T, error) {
default:
return coltypes.Unhandled, errors.Errorf(`unhandled float precision %d`, floatType.Precision())
}
case arrowserde.TypeDecimal:
return coltypes.Decimal, nil
case arrowserde.TypeTimestamp:
return coltypes.Timestamp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int {
// null bitmap and one for the values.
numBuffers := 2
switch t {
case coltypes.Bytes, coltypes.Timestamp:
case coltypes.Bytes, coltypes.Decimal, coltypes.Timestamp:
// This type has an extra offsets buffer.
numBuffers = 3
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -134,6 +135,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo
}
builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid)
}
case coltypes.Decimal:
var err error
builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
data := make([][]byte, n)
for i := range data {
var d apd.Decimal
// int64(rng.Uint64()) to get negative numbers, too.
d.SetFinite(int64(rng.Uint64()), int32(rng.Intn(40)-20))
data[i], err = d.MarshalText()
if err != nil {
panic(err)
}
}
builder.(*array.BinaryBuilder).AppendValues(data, valid)
case coltypes.Timestamp:
var err error
now := timeutil.Now()
Expand Down Expand Up @@ -188,22 +203,14 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
)

var (
supportedTypes = make([]coltypes.T, 0, len(coltypes.AllTypes))
typs = make([]coltypes.T, rng.Intn(maxTypes)+1)
data = make([]*array.Data, len(typs))
dataLen = rng.Intn(maxDataLen) + 1
nullProbability = rng.Float64()
buf = bytes.Buffer{}
)

// We do not support decimals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Decimal {
continue
}
supportedTypes = append(supportedTypes, t)
}

supportedTypes := coltypes.AllTypes
for i := range typs {
typs[i] = supportedTypes[rng.Intn(len(supportedTypes))]
data[i] = randomDataFromType(rng, typs[i], dataLen, nullProbability)
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/colexec/types_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func TestSupportedSQLTypesIntegration(t *testing.T) {
rng, _ := randutil.NewPseudoRand()

for _, typ := range allSupportedSQLTypes {
if typ.Equal(*types.Decimal) {
// Serialization of Decimals is currently not supported.
// TODO(yuzefovich): remove this once it is supported.
continue
}
for _, numRows := range []uint16{
// A few interesting sizes.
1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a = 1.00
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans /1-/1/#
Expand All @@ -364,7 +364,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a < 2
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans -/2