Skip to content

Commit

Permalink
colserde: add simple serialization for DECIMALs
Browse files Browse the repository at this point in the history
This commit adds simple serialization of DECIMALs using the provided
MarshalText method.

Release note (sql change): Previously, DECIMALs could not be sent over
the network when the computation was performed by the vectorized engine.
This has been fixed, and the vectorized engine now fully supports
DECIMAL type.
  • Loading branch information
yuzefovich committed Jan 13, 2020
1 parent 74912fa commit 7bbd378
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 81 deletions.
116 changes: 71 additions & 45 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ var supportedTypes = func() map[coltypes.T]struct{} {
for _, t := range []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Float64,
coltypes.Int16,
coltypes.Int32,
coltypes.Int64,
coltypes.Float64,
coltypes.Timestamp,
} {
typs[t] = struct{}{}
Expand Down Expand Up @@ -115,15 +116,25 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
arrowBitmap = n.NullBitmap()
}

if typ == coltypes.Bool || typ == coltypes.Timestamp {
// Bools and Timestamps are handled differently from other coltypes.
// Refer to the comment on ArrowBatchConverter.builders for more
// information.
if typ == coltypes.Bool || typ == coltypes.Decimal || typ == coltypes.Timestamp {
// Bools, Decimals, and Timestamps are handled differently from other
// coltypes. Refer to the comment on ArrowBatchConverter.builders for
// more information.
var data *array.Data
switch typ {
case coltypes.Bool:
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data = c.builders.boolBuilder.NewBooleanArray().Data()
case coltypes.Decimal:
decimals := vec.Decimal()[:n]
for _, d := range decimals {
marshaled, err := d.MarshalText()
if err != nil {
return nil, err
}
c.builders.binaryBuilder.Append(marshaled)
}
data = c.builders.binaryBuilder.NewBinaryArray().Data()
case coltypes.Timestamp:
timestamps := vec.Timestamp()[:n]
for _, ts := range timestamps {
Expand Down Expand Up @@ -246,49 +257,64 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
d := data[i]

var arr array.Interface
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Decimal:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Decimal()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalText(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
}
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
arr = bytesArr
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
} else {
arr = bytesArr
default:
var col interface{}
switch typ {
case coltypes.Int16:
Expand Down
31 changes: 12 additions & 19 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,9 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) {
const maxTyps = 16
rng, _ := randutil.NewPseudoRand()

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(asubiotto,jordan): We do not support decimal conversion yet.
if typ == coltypes.Decimal {
continue
}
availableTyps = append(availableTyps, typ)
}
typs := make([]coltypes.T, rng.Intn(maxTyps)+1)
for i := range typs {
typs[i] = availableTyps[rng.Intn(len(availableTyps))]
typs[i] = coltypes.AllTypes[rng.Intn(len(coltypes.AllTypes))]
}

capacity := rng.Intn(int(coldata.BatchSize())) + 1
Expand Down Expand Up @@ -128,16 +120,6 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
}
}

func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

unsupportedTypes := []coltypes.T{coltypes.Decimal}
for _, typ := range unsupportedTypes {
_, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -215,6 +197,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
typs := []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Int64,
coltypes.Timestamp,
}
Expand All @@ -224,6 +207,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
numBytes := []int64{
int64(coldata.BatchSize()),
fixedLen * int64(coldata.BatchSize()),
0, // The number of bytes for decimals will be set below.
8 * int64(coldata.BatchSize()),
3 * 8 * int64(coldata.BatchSize()),
}
Expand Down Expand Up @@ -251,6 +235,15 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
}
}
batch.ColVec(0).SetCol(newBytes)
} else if typ == coltypes.Decimal {
// Decimal is variable length type, so we want to calculate precisely the
// total size of all decimals in the vector.
decimals := batch.ColVec(0).Decimal()
for _, d := range decimals {
marshaled, err := d.MarshalText()
require.NoError(b, err)
numBytes[typIdx] += int64(len(marshaled))
}
}
c, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.NoError(b, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/col/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func schema(fb *flatbuffers.Builder, typs []coltypes.T) flatbuffers.UOffsetT {
arrowserde.FloatingPointAddPrecision(fb, arrowserde.PrecisionDOUBLE)
fbTypOffset = arrowserde.FloatingPointEnd(fb)
fbTyp = arrowserde.TypeFloatingPoint
case coltypes.Decimal:
// Decimals are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
fbTypOffset = arrowserde.BinaryEnd(fb)
fbTyp = arrowserde.TypeDecimal
case coltypes.Timestamp:
// Timestamps are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
Expand Down Expand Up @@ -456,6 +461,8 @@ func typeFromField(field *arrowserde.Field) (coltypes.T, error) {
default:
return coltypes.Unhandled, errors.Errorf(`unhandled float precision %d`, floatType.Precision())
}
case arrowserde.TypeDecimal:
return coltypes.Decimal, nil
case arrowserde.TypeTimestamp:
return coltypes.Timestamp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int {
// null bitmap and one for the values.
numBuffers := 2
switch t {
case coltypes.Bytes, coltypes.Timestamp:
case coltypes.Bytes, coltypes.Decimal, coltypes.Timestamp:
// This type has an extra offsets buffer.
numBuffers = 3
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -134,6 +135,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo
}
builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid)
}
case coltypes.Decimal:
var err error
builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
data := make([][]byte, n)
for i := range data {
var d apd.Decimal
// int64(rng.Uint64()) to get negative numbers, too.
d.SetFinite(int64(rng.Uint64()), int32(rng.Intn(40)-20))
data[i], err = d.MarshalText()
if err != nil {
panic(err)
}
}
builder.(*array.BinaryBuilder).AppendValues(data, valid)
case coltypes.Timestamp:
var err error
now := timeutil.Now()
Expand Down Expand Up @@ -188,22 +203,14 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
)

var (
supportedTypes = make([]coltypes.T, 0, len(coltypes.AllTypes))
typs = make([]coltypes.T, rng.Intn(maxTypes)+1)
data = make([]*array.Data, len(typs))
dataLen = rng.Intn(maxDataLen) + 1
nullProbability = rng.Float64()
buf = bytes.Buffer{}
)

// We do not support decimals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Decimal {
continue
}
supportedTypes = append(supportedTypes, t)
}

supportedTypes := coltypes.AllTypes
for i := range typs {
typs[i] = supportedTypes[rng.Intn(len(supportedTypes))]
data[i] = randomDataFromType(rng, typs[i], dataLen, nullProbability)
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/colexec/types_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func TestSupportedSQLTypesIntegration(t *testing.T) {
rng, _ := randutil.NewPseudoRand()

for _, typ := range allSupportedSQLTypes {
if typ.Equal(*types.Decimal) {
// Serialization of Decimals is currently not supported.
// TODO(yuzefovich): remove this once it is supported.
continue
}
for _, numRows := range []uint16{
// A few interesting sizes.
1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a = 1.00
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans /1-/1/#
Expand All @@ -364,7 +364,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a < 2
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans -/2

0 comments on commit 7bbd378

Please sign in to comment.