diff --git a/pkg/col/colserde/arrowbatchconverter.go b/pkg/col/colserde/arrowbatchconverter.go index 1aaf1a68b094..bfdb7dbddb46 100644 --- a/pkg/col/colserde/arrowbatchconverter.go +++ b/pkg/col/colserde/arrowbatchconverter.go @@ -84,10 +84,11 @@ var supportedTypes = func() map[coltypes.T]struct{} { for _, t := range []coltypes.T{ coltypes.Bool, coltypes.Bytes, + coltypes.Decimal, + coltypes.Float64, coltypes.Int16, coltypes.Int32, coltypes.Int64, - coltypes.Float64, coltypes.Timestamp, } { typs[t] = struct{}{} @@ -115,15 +116,25 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data, arrowBitmap = n.NullBitmap() } - if typ == coltypes.Bool || typ == coltypes.Timestamp { - // Bools and Timestamps are handled differently from other coltypes. - // Refer to the comment on ArrowBatchConverter.builders for more - // information. + if typ == coltypes.Bool || typ == coltypes.Decimal || typ == coltypes.Timestamp { + // Bools, Decimals, and Timestamps are handled differently from other + // coltypes. Refer to the comment on ArrowBatchConverter.builders for + // more information. var data *array.Data switch typ { case coltypes.Bool: c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */) data = c.builders.boolBuilder.NewBooleanArray().Data() + case coltypes.Decimal: + decimals := vec.Decimal()[:n] + for _, d := range decimals { + marshaled, err := d.MarshalText() + if err != nil { + return nil, err + } + c.builders.binaryBuilder.Append(marshaled) + } + data = c.builders.binaryBuilder.NewBinaryArray().Data() case coltypes.Timestamp: timestamps := vec.Timestamp()[:n] for _, ts := range timestamps { @@ -246,49 +257,64 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch) d := data[i] var arr array.Interface - if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp { - switch typ { - case coltypes.Bool: - boolArr := array.NewBooleanData(d) - vecArr := vec.Bool() - for i := 0; i < boolArr.Len(); i++ { - vecArr[i] = boolArr.Value(i) - } - arr = boolArr - case coltypes.Bytes: - bytesArr := array.NewBinaryData(d) - bytes := bytesArr.ValueBytes() - if bytes == nil { - // All bytes values are empty, so the representation is solely with the - // offsets slice, so create an empty slice so that the conversion - // corresponds. - bytes = make([]byte, 0) - } - coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets()) - arr = bytesArr - case coltypes.Timestamp: - // TODO(yuzefovich): this serialization is quite inefficient - improve - // it. - bytesArr := array.NewBinaryData(d) - bytes := bytesArr.ValueBytes() - if bytes == nil { - // All bytes values are empty, so the representation is solely with the - // offsets slice, so create an empty slice so that the conversion - // corresponds. - bytes = make([]byte, 0) + switch typ { + case coltypes.Bool: + boolArr := array.NewBooleanData(d) + vecArr := vec.Bool() + for i := 0; i < boolArr.Len(); i++ { + vecArr[i] = boolArr.Value(i) + } + arr = boolArr + case coltypes.Bytes: + bytesArr := array.NewBinaryData(d) + bytes := bytesArr.ValueBytes() + if bytes == nil { + // All bytes values are empty, so the representation is solely with the + // offsets slice, so create an empty slice so that the conversion + // corresponds. + bytes = make([]byte, 0) + } + coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets()) + arr = bytesArr + case coltypes.Decimal: + // TODO(yuzefovich): this serialization is quite inefficient - improve + // it. + bytesArr := array.NewBinaryData(d) + bytes := bytesArr.ValueBytes() + if bytes == nil { + // All bytes values are empty, so the representation is solely with the + // offsets slice, so create an empty slice so that the conversion + // corresponds. + bytes = make([]byte, 0) + } + offsets := bytesArr.ValueOffsets() + vecArr := vec.Decimal() + for i := 0; i < len(offsets)-1; i++ { + if err := vecArr[i].UnmarshalText(bytes[offsets[i]:offsets[i+1]]); err != nil { + return err } - offsets := bytesArr.ValueOffsets() - vecArr := vec.Timestamp() - for i := 0; i < len(offsets)-1; i++ { - if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil { - return err - } + } + arr = bytesArr + case coltypes.Timestamp: + // TODO(yuzefovich): this serialization is quite inefficient - improve + // it. + bytesArr := array.NewBinaryData(d) + bytes := bytesArr.ValueBytes() + if bytes == nil { + // All bytes values are empty, so the representation is solely with the + // offsets slice, so create an empty slice so that the conversion + // corresponds. + bytes = make([]byte, 0) + } + offsets := bytesArr.ValueOffsets() + vecArr := vec.Timestamp() + for i := 0; i < len(offsets)-1; i++ { + if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil { + return err } - arr = bytesArr - default: - panic(fmt.Sprintf("unexpected type %s", typ)) } - } else { + arr = bytesArr + default: var col interface{} switch typ { case coltypes.Int16: diff --git a/pkg/col/colserde/arrowbatchconverter_test.go b/pkg/col/colserde/arrowbatchconverter_test.go index 6f124daa7878..1c8ff5197f9d 100644 --- a/pkg/col/colserde/arrowbatchconverter_test.go +++ b/pkg/col/colserde/arrowbatchconverter_test.go @@ -29,17 +29,9 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) { const maxTyps = 16 rng, _ := randutil.NewPseudoRand() - availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes)) - for _, typ := range coltypes.AllTypes { - // TODO(asubiotto,jordan): We do not support decimal conversion yet. - if typ == coltypes.Decimal { - continue - } - availableTyps = append(availableTyps, typ) - } typs := make([]coltypes.T, rng.Intn(maxTyps)+1) for i := range typs { - typs[i] = availableTyps[rng.Intn(len(availableTyps))] + typs[i] = coltypes.AllTypes[rng.Intn(len(coltypes.AllTypes))] } capacity := rng.Intn(int(coldata.BatchSize())) + 1 @@ -128,16 +120,6 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) { } } -func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) { - defer leaktest.AfterTest(t)() - - unsupportedTypes := []coltypes.T{coltypes.Decimal} - for _, typ := range unsupportedTypes { - _, err := colserde.NewArrowBatchConverter([]coltypes.T{typ}) - require.Error(t, err) - } -} - func TestArrowBatchConverterRandom(t *testing.T) { defer leaktest.AfterTest(t)() @@ -215,6 +197,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) { typs := []coltypes.T{ coltypes.Bool, coltypes.Bytes, + coltypes.Decimal, coltypes.Int64, coltypes.Timestamp, } @@ -224,6 +207,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) { numBytes := []int64{ int64(coldata.BatchSize()), fixedLen * int64(coldata.BatchSize()), + 0, // The number of bytes for decimals will be set below. 8 * int64(coldata.BatchSize()), 3 * 8 * int64(coldata.BatchSize()), } @@ -251,6 +235,15 @@ func BenchmarkArrowBatchConverter(b *testing.B) { } } batch.ColVec(0).SetCol(newBytes) + } else if typ == coltypes.Decimal { + // Decimal is variable length type, so we want to calculate precisely the + // total size of all decimals in the vector. + decimals := batch.ColVec(0).Decimal() + for _, d := range decimals { + marshaled, err := d.MarshalText() + require.NoError(b, err) + numBytes[typIdx] += int64(len(marshaled)) + } } c, err := colserde.NewArrowBatchConverter([]coltypes.T{typ}) require.NoError(b, err) diff --git a/pkg/col/colserde/file.go b/pkg/col/colserde/file.go index 5cc05103cfeb..b9f05be0ce1d 100644 --- a/pkg/col/colserde/file.go +++ b/pkg/col/colserde/file.go @@ -368,6 +368,11 @@ func schema(fb *flatbuffers.Builder, typs []coltypes.T) flatbuffers.UOffsetT { arrowserde.FloatingPointAddPrecision(fb, arrowserde.PrecisionDOUBLE) fbTypOffset = arrowserde.FloatingPointEnd(fb) fbTyp = arrowserde.TypeFloatingPoint + case coltypes.Decimal: + // Decimals are marshaled into bytes, so we use binary headers. + arrowserde.BinaryStart(fb) + fbTypOffset = arrowserde.BinaryEnd(fb) + fbTyp = arrowserde.TypeDecimal case coltypes.Timestamp: // Timestamps are marshaled into bytes, so we use binary headers. arrowserde.BinaryStart(fb) @@ -456,6 +461,8 @@ func typeFromField(field *arrowserde.Field) (coltypes.T, error) { default: return coltypes.Unhandled, errors.Errorf(`unhandled float precision %d`, floatType.Precision()) } + case arrowserde.TypeDecimal: + return coltypes.Decimal, nil case arrowserde.TypeTimestamp: return coltypes.Timestamp, nil } diff --git a/pkg/col/colserde/record_batch.go b/pkg/col/colserde/record_batch.go index 9eef869cb4f9..4ab267c1bc74 100644 --- a/pkg/col/colserde/record_batch.go +++ b/pkg/col/colserde/record_batch.go @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int { // null bitmap and one for the values. numBuffers := 2 switch t { - case coltypes.Bytes, coltypes.Timestamp: + case coltypes.Bytes, coltypes.Decimal, coltypes.Timestamp: // This type has an extra offsets buffer. numBuffers = 3 } diff --git a/pkg/col/colserde/record_batch_test.go b/pkg/col/colserde/record_batch_test.go index a70a1550e45b..9fb2ea5694e7 100644 --- a/pkg/col/colserde/record_batch_test.go +++ b/pkg/col/colserde/record_batch_test.go @@ -22,6 +22,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" "github.com/apache/arrow/go/arrow/memory" + "github.com/cockroachdb/apd" "github.com/cockroachdb/cockroach/pkg/col/colserde" "github.com/cockroachdb/cockroach/pkg/col/coltypes" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -134,6 +135,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo } builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid) } + case coltypes.Decimal: + var err error + builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) + data := make([][]byte, n) + for i := range data { + var d apd.Decimal + // int64(rng.Uint64()) to get negative numbers, too. + d.SetFinite(int64(rng.Uint64()), int32(rng.Intn(40)-20)) + data[i], err = d.MarshalText() + if err != nil { + panic(err) + } + } + builder.(*array.BinaryBuilder).AppendValues(data, valid) case coltypes.Timestamp: var err error now := timeutil.Now() @@ -188,7 +203,6 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) { ) var ( - supportedTypes = make([]coltypes.T, 0, len(coltypes.AllTypes)) typs = make([]coltypes.T, rng.Intn(maxTypes)+1) data = make([]*array.Data, len(typs)) dataLen = rng.Intn(maxDataLen) + 1 @@ -196,14 +210,7 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) { buf = bytes.Buffer{} ) - // We do not support decimals. - for _, t := range coltypes.AllTypes { - if t == coltypes.Decimal { - continue - } - supportedTypes = append(supportedTypes, t) - } - + supportedTypes := coltypes.AllTypes for i := range typs { typs[i] = supportedTypes[rng.Intn(len(supportedTypes))] data[i] = randomDataFromType(rng, typs[i], dataLen, nullProbability) diff --git a/pkg/sql/colexec/types_integration_test.go b/pkg/sql/colexec/types_integration_test.go index d21f7750e2e2..916bc86272ae 100644 --- a/pkg/sql/colexec/types_integration_test.go +++ b/pkg/sql/colexec/types_integration_test.go @@ -59,11 +59,6 @@ func TestSupportedSQLTypesIntegration(t *testing.T) { rng, _ := randutil.NewPseudoRand() for _, typ := range allSupportedSQLTypes { - if typ.Equal(*types.Decimal) { - // Serialization of Decimals is currently not supported. - // TODO(yuzefovich): remove this once it is supported. - continue - } for _, numRows := range []uint16{ // A few interesting sizes. 1, diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans index d1e249360f2d..161fbb6c19c2 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans @@ -352,7 +352,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a = 1.00 ---- tree field description · distributed true -· vectorized false +· vectorized true scan · · · table decimal_t@primary · spans /1-/1/# @@ -364,7 +364,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a < 2 ---- tree field description · distributed true -· vectorized false +· vectorized true scan · · · table decimal_t@primary · spans -/2