Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
43311: colserde: add simple serialization for DECIMALs r=yuzefovich a=yuzefovich

This commit adds simple serialization of DECIMALs using the provided
MarshalText method.

Touches: #37044.

Release note (sql change): Previously, DECIMALs could not be sent over
the network when the computation was performed by the vectorized engine.
This has been fixed, and the vectorized engine now fully supports
DECIMAL type.

43793: sql: properly enforce uniqueness of key columns for FK references r=lucy-zhang a=lucy-zhang

We introduced a bug in 19.2 that would allow, e.g., a unique index on `(a, b,
c)` to be used as an index that is supposed to enforce uniqueness for a foreign
key constraint pointing only to `(a, b)`. This PR reintroduces a check that the
indexed columns match the FK columns exactly.

Fixes #42680.

Release note (bug fix): Fixed a bug introduced in 19.2 that would allow foreign
keys to use a unique index on the referenced columns that indexed more columns
than were included in the columns used in the FK constraint, which allows
potentially violating uniqueness in the referenced columns themselves.

43956: build: post issues from local roachtests r=andreimatei a=tbg

This wasn't happening due to some missing env vars. I noticed this since
there are many failures of acceptance/version-upgrade not reflected via
created issues (I get email notifications from TC).

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Lucy Zhang <lucy-zhang@users.noreply.github.com>
Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
4 people committed Jan 14, 2020
4 parents d7ec8ab + 7bbd378 + 6430e47 + 8116b50 commit fdd070b
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 84 deletions.
8 changes: 7 additions & 1 deletion build/teamcity-local-roachtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ tc_start_block "Run local roachtests"
# NB: roachtest has its own teamcity output format and posts issues (whenever
# GITHUB_API_TOKEN) is set, so it doesn't fit into the streamlined process
# around the run_json_test helper.
build/builder.sh env COCKROACH_DEV_LICENSE="$COCKROACH_DEV_LICENSE" GITHUB_API_TOKEN="${GITHUB_API_TOKEN-}" \
build/builder.sh env \
COCKROACH_DEV_LICENSE="${COCKROACH_DEV_LICENSE}" \
GITHUB_API_TOKEN="${GITHUB_API_TOKEN-}" \
BUILD_VCS_NUMBER="${BUILD_VCS_NUMBER-}" \
TC_BUILD_ID="${TC_BUILD_ID-}" \
TC_SERVER_URL="${TC_SERVER_URL-}" \
TC_BUILD_BRANCH="${TC_BUILD_BRANCH-}" \
stdbuf -oL -eL \
./bin/roachtest run acceptance kv/splits cdc/bank \
--local \
Expand Down
116 changes: 71 additions & 45 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ var supportedTypes = func() map[coltypes.T]struct{} {
for _, t := range []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Float64,
coltypes.Int16,
coltypes.Int32,
coltypes.Int64,
coltypes.Float64,
coltypes.Timestamp,
} {
typs[t] = struct{}{}
Expand Down Expand Up @@ -115,15 +116,25 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
arrowBitmap = n.NullBitmap()
}

if typ == coltypes.Bool || typ == coltypes.Timestamp {
// Bools and Timestamps are handled differently from other coltypes.
// Refer to the comment on ArrowBatchConverter.builders for more
// information.
if typ == coltypes.Bool || typ == coltypes.Decimal || typ == coltypes.Timestamp {
// Bools, Decimals, and Timestamps are handled differently from other
// coltypes. Refer to the comment on ArrowBatchConverter.builders for
// more information.
var data *array.Data
switch typ {
case coltypes.Bool:
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data = c.builders.boolBuilder.NewBooleanArray().Data()
case coltypes.Decimal:
decimals := vec.Decimal()[:n]
for _, d := range decimals {
marshaled, err := d.MarshalText()
if err != nil {
return nil, err
}
c.builders.binaryBuilder.Append(marshaled)
}
data = c.builders.binaryBuilder.NewBinaryArray().Data()
case coltypes.Timestamp:
timestamps := vec.Timestamp()[:n]
for _, ts := range timestamps {
Expand Down Expand Up @@ -246,49 +257,64 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
d := data[i]

var arr array.Interface
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
vecArr := vec.Bool()
for i := 0; i < boolArr.Len(); i++ {
vecArr[i] = boolArr.Value(i)
}
arr = boolArr
case coltypes.Bytes:
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Decimal:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Decimal()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalText(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
}
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
arr = bytesArr
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
} else {
arr = bytesArr
default:
var col interface{}
switch typ {
case coltypes.Int16:
Expand Down
31 changes: 12 additions & 19 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,9 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) {
const maxTyps = 16
rng, _ := randutil.NewPseudoRand()

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(asubiotto,jordan): We do not support decimal conversion yet.
if typ == coltypes.Decimal {
continue
}
availableTyps = append(availableTyps, typ)
}
typs := make([]coltypes.T, rng.Intn(maxTyps)+1)
for i := range typs {
typs[i] = availableTyps[rng.Intn(len(availableTyps))]
typs[i] = coltypes.AllTypes[rng.Intn(len(coltypes.AllTypes))]
}

capacity := rng.Intn(int(coldata.BatchSize())) + 1
Expand Down Expand Up @@ -128,16 +120,6 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
}
}

func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

unsupportedTypes := []coltypes.T{coltypes.Decimal}
for _, typ := range unsupportedTypes {
_, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -215,6 +197,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
typs := []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Decimal,
coltypes.Int64,
coltypes.Timestamp,
}
Expand All @@ -224,6 +207,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
numBytes := []int64{
int64(coldata.BatchSize()),
fixedLen * int64(coldata.BatchSize()),
0, // The number of bytes for decimals will be set below.
8 * int64(coldata.BatchSize()),
3 * 8 * int64(coldata.BatchSize()),
}
Expand Down Expand Up @@ -251,6 +235,15 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
}
}
batch.ColVec(0).SetCol(newBytes)
} else if typ == coltypes.Decimal {
// Decimal is variable length type, so we want to calculate precisely the
// total size of all decimals in the vector.
decimals := batch.ColVec(0).Decimal()
for _, d := range decimals {
marshaled, err := d.MarshalText()
require.NoError(b, err)
numBytes[typIdx] += int64(len(marshaled))
}
}
c, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.NoError(b, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/col/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func schema(fb *flatbuffers.Builder, typs []coltypes.T) flatbuffers.UOffsetT {
arrowserde.FloatingPointAddPrecision(fb, arrowserde.PrecisionDOUBLE)
fbTypOffset = arrowserde.FloatingPointEnd(fb)
fbTyp = arrowserde.TypeFloatingPoint
case coltypes.Decimal:
// Decimals are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
fbTypOffset = arrowserde.BinaryEnd(fb)
fbTyp = arrowserde.TypeDecimal
case coltypes.Timestamp:
// Timestamps are marshaled into bytes, so we use binary headers.
arrowserde.BinaryStart(fb)
Expand Down Expand Up @@ -456,6 +461,8 @@ func typeFromField(field *arrowserde.Field) (coltypes.T, error) {
default:
return coltypes.Unhandled, errors.Errorf(`unhandled float precision %d`, floatType.Precision())
}
case arrowserde.TypeDecimal:
return coltypes.Decimal, nil
case arrowserde.TypeTimestamp:
return coltypes.Timestamp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int {
// null bitmap and one for the values.
numBuffers := 2
switch t {
case coltypes.Bytes, coltypes.Timestamp:
case coltypes.Bytes, coltypes.Decimal, coltypes.Timestamp:
// This type has an extra offsets buffer.
numBuffers = 3
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -134,6 +135,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo
}
builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid)
}
case coltypes.Decimal:
var err error
builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
data := make([][]byte, n)
for i := range data {
var d apd.Decimal
// int64(rng.Uint64()) to get negative numbers, too.
d.SetFinite(int64(rng.Uint64()), int32(rng.Intn(40)-20))
data[i], err = d.MarshalText()
if err != nil {
panic(err)
}
}
builder.(*array.BinaryBuilder).AppendValues(data, valid)
case coltypes.Timestamp:
var err error
now := timeutil.Now()
Expand Down Expand Up @@ -188,22 +203,14 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
)

var (
supportedTypes = make([]coltypes.T, 0, len(coltypes.AllTypes))
typs = make([]coltypes.T, rng.Intn(maxTypes)+1)
data = make([]*array.Data, len(typs))
dataLen = rng.Intn(maxDataLen) + 1
nullProbability = rng.Float64()
buf = bytes.Buffer{}
)

// We do not support decimals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Decimal {
continue
}
supportedTypes = append(supportedTypes, t)
}

supportedTypes := coltypes.AllTypes
for i := range typs {
typs[i] = supportedTypes[rng.Intn(len(supportedTypes))]
data[i] = randomDataFromType(rng, typs[i], dataLen, nullProbability)
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/colexec/types_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func TestSupportedSQLTypesIntegration(t *testing.T) {
rng, _ := randutil.NewPseudoRand()

for _, typ := range allSupportedSQLTypes {
if typ.Equal(*types.Decimal) {
// Serialization of Decimals is currently not supported.
// TODO(yuzefovich): remove this once it is supported.
continue
}
for _, numRows := range []uint16{
// A few interesting sizes.
1,
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -2742,3 +2742,18 @@ ALTER TABLE table1_42498 ADD FOREIGN KEY (col2, col1) REFERENCES table2_42498 (c

statement ok
DROP TABLE table1_42498, table2_42498 CASCADE

# Regression test for #42680: The unique index used for the referenced columns
# must index only those columns and no others, in order to enforce uniqueness
# for the FK constraint.
subtest 42680_unique_index_must_exactly_match_columns

# The table has a unique index on (a, b) but not (a).
statement ok
CREATE TABLE target (a INT, b INT, UNIQUE INDEX (a, b));

statement ok
CREATE TABLE source (a INT, INDEX (a));

statement error there is no unique constraint matching given keys for referenced table target
ALTER TABLE source ADD FOREIGN KEY (a) REFERENCES target (a);
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_tighten_spans
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a = 1.00
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans /1-/1/#
Expand All @@ -364,7 +364,7 @@ EXPLAIN SELECT * FROM decimal_t WHERE a < 2
----
tree field description
· distributed true
· vectorized false
· vectorized true
scan · ·
· table decimal_t@primary
· spans -/2
13 changes: 13 additions & 0 deletions pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ func (c ColumnIDs) HasPrefix(input ColumnIDs) bool {
return true
}

// Equals returns true if the input list is equal to this list.
func (c ColumnIDs) Equals(input ColumnIDs) bool {
if len(input) != len(c) {
return false
}
for i := range input {
if input[i] != c[i] {
return false
}
}
return true
}

// FamilyID is a custom type for ColumnFamilyDescriptor IDs.
type FamilyID uint32

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlbase/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ func FindFKReferencedIndex(
) (*IndexDescriptor, error) {
// Search for a unique index on the referenced table that matches our foreign
// key columns.
if ColumnIDs(referencedTable.PrimaryIndex.ColumnIDs).HasPrefix(referencedColIDs) {
if ColumnIDs(referencedTable.PrimaryIndex.ColumnIDs).Equals(referencedColIDs) {
return &referencedTable.PrimaryIndex, nil
}
// If the PK doesn't match, find the index corresponding to the referenced column.
for _, idx := range referencedTable.Indexes {
if idx.Unique && ColumnIDs(idx.ColumnIDs).HasPrefix(referencedColIDs) {
if idx.Unique && ColumnIDs(idx.ColumnIDs).Equals(referencedColIDs) {
return &idx, nil
}
}
Expand Down

0 comments on commit fdd070b

Please sign in to comment.