Skip to content

Commit

Permalink
Merge #41662
Browse files Browse the repository at this point in the history
41662: colexec: add timestamp type r=yuzefovich a=yuzefovich

This PR is the rebased version of #40250 with a modification of how we're creating `DTimestamp` in `PhysicalTypeColElemToDatum`.

**colexec,coldata: support timestamp type**

This commit adds support for the TIMESTAMP type to the coldata and colexec
packages.

**colexec: add random projection tests for all types**

This test randomly runs binary comparisons against all types with random
data, verifying that the result of Datum.Compare matches the result of
the exec projection.

This found the bug with date infinity matching tracked in #40354, and
immediately found a bug with the timestamp implementation in the
previous commit, so I'm fairly sure it's a useful random test to have
around.

**colserde: add simple serialization for TIMESTAMP**

This commit adds simple (and quite inefficient) serialization
for TIMESTAMP type which uses the provided marshalling and
unmarshalling methods.

Release note: None

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 30, 2019
2 parents de4a6f9 + 48c8364 commit 0f47384
Show file tree
Hide file tree
Showing 37 changed files with 393 additions and 43 deletions.
9 changes: 9 additions & 0 deletions pkg/col/coldata/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package coldata

import (
"fmt"
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand Down Expand Up @@ -76,6 +77,8 @@ type Vec interface {
// TODO(jordan): should this be [][]byte?
// Decimal returns an apd.Decimal slice.
Decimal() []apd.Decimal
// Timestamp returns a time.Time slice.
Timestamp() []time.Time

// Col returns the raw, typeless backing storage for this Vec.
Col() interface{}
Expand Down Expand Up @@ -151,6 +154,8 @@ func NewMemColumn(t coltypes.T, n int) Vec {
return &memColumn{t: t, col: make([]float64, n), nulls: nulls}
case coltypes.Decimal:
return &memColumn{t: t, col: make([]apd.Decimal, n), nulls: nulls}
case coltypes.Timestamp:
return &memColumn{t: t, col: make([]time.Time, n), nulls: nulls}
default:
panic(fmt.Sprintf("unhandled type %s", t))
}
Expand Down Expand Up @@ -192,6 +197,10 @@ func (m *memColumn) Decimal() []apd.Decimal {
return m.col.([]apd.Decimal)
}

func (m *memColumn) Timestamp() []time.Time {
return m.col.([]time.Time)
}

func (m *memColumn) Col() interface{} {
return m.col
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package coldata

import (
"fmt"
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand All @@ -45,6 +46,9 @@ type _GOTYPESLICE interface{}
// Dummy import to pull in "apd" package.
var _ apd.Decimal

// Dummy import to pull in "time" package.
var _ time.Time

// */}}

func (m *memColumn) Append(args SliceArgs) {
Expand Down
57 changes: 50 additions & 7 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"reflect"
"unsafe"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand All @@ -33,6 +34,9 @@ type ArrowBatchConverter struct {
builders struct {
// boolBuilder builds arrow bool columns as a bitmap from a bool slice.
boolBuilder *array.BooleanBuilder
// binaryBuilder builds arrow []byte columns as one []byte slice with
// accompanying offsets from a [][]byte slice.
binaryBuilder *array.BinaryBuilder
}

scratch struct {
Expand All @@ -56,11 +60,13 @@ func NewArrowBatchConverter(typs []coltypes.T) (*ArrowBatchConverter, error) {
}
c := &ArrowBatchConverter{typs: typs}
c.builders.boolBuilder = array.NewBooleanBuilder(memory.DefaultAllocator)
c.builders.binaryBuilder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
c.scratch.arrowData = make([]*array.Data, len(typs))
c.scratch.buffers = make([][]*memory.Buffer, len(typs))
for i := range c.scratch.buffers {
// Most types need only two buffers: one for the nulls, and one for the
// values, but some type (i.e. Bytes) need an extra buffer for the offsets.
// values, but some types (i.e. Bytes) need an extra buffer for the
// offsets.
c.scratch.buffers[i] = make([]*memory.Buffer, 0, 3)
}
return c, nil
Expand All @@ -82,6 +88,7 @@ var supportedTypes = func() map[coltypes.T]struct{} {
coltypes.Int32,
coltypes.Int64,
coltypes.Float64,
coltypes.Timestamp,
} {
typs[t] = struct{}{}
}
Expand All @@ -108,11 +115,28 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
arrowBitmap = n.NullBitmap()
}

if typ == coltypes.Bool {
// Bools are handled differently from other coltypes. Refer to the
// comment on ArrowBatchConverter.builders for more information.
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data := c.builders.boolBuilder.NewBooleanArray().Data()
if typ == coltypes.Bool || typ == coltypes.Timestamp {
// Bools and Timestamps are handled differently from other coltypes.
// Refer to the comment on ArrowBatchConverter.builders for more
// information.
var data *array.Data
switch typ {
case coltypes.Bool:
c.builders.boolBuilder.AppendValues(vec.Bool()[:n], nil /* valid */)
data = c.builders.boolBuilder.NewBooleanArray().Data()
case coltypes.Timestamp:
timestamps := vec.Timestamp()[:n]
for _, ts := range timestamps {
marshaled, err := ts.MarshalBinary()
if err != nil {
return nil, err
}
c.builders.binaryBuilder.Append(marshaled)
}
data = c.builders.binaryBuilder.NewBinaryArray().Data()
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
if arrowBitmap != nil {
// Overwrite empty null bitmap with the true bitmap.
data.Buffers()[0] = memory.NewBufferBytes(arrowBitmap)
Expand Down Expand Up @@ -222,7 +246,7 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
d := data[i]

var arr array.Interface
if typ == coltypes.Bool || typ == coltypes.Bytes {
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
Expand All @@ -242,6 +266,25 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
}
coldata.BytesFromArrowSerializationFormat(vec.Bytes(), bytes, bytesArr.ValueOffsets())
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
}
arr = bytesArr
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func randomBatch() ([]coltypes.T, coldata.Batch) {

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(asubiotto): We do not support decimal conversion yet.
if typ == coltypes.Decimal {
// TODO(asubiotto,jordan): We do not support decimal, timestamp conversion yet.
if typ == coltypes.Decimal || typ == coltypes.Timestamp {
continue
}
availableTyps = append(availableTyps, typ)
Expand Down Expand Up @@ -119,9 +119,11 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

typs := []coltypes.T{coltypes.Decimal}
_, err := NewArrowBatchConverter(typs)
require.Error(t, err)
unsupportedTypes := []coltypes.T{coltypes.Decimal}
for _, typ := range unsupportedTypes {
_, err := NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
Expand Down Expand Up @@ -196,11 +198,21 @@ func BenchmarkArrowBatchConverter(b *testing.B) {

rng, _ := randutil.NewPseudoRand()

typs := []coltypes.T{coltypes.Bool, coltypes.Bytes, coltypes.Int64}
typs := []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Int64,
coltypes.Timestamp,
}
// numBytes corresponds 1:1 to typs and specifies how many bytes we are
// converting on one iteration of the benchmark for the corresponding type in
// typs.
numBytes := []int64{int64(coldata.BatchSize()), fixedLen * int64(coldata.BatchSize()), 8 * int64(coldata.BatchSize())}
numBytes := []int64{
int64(coldata.BatchSize()),
fixedLen * int64(coldata.BatchSize()),
8 * int64(coldata.BatchSize()),
3 * 8 * int64(coldata.BatchSize()),
}
// Run a benchmark on every type we care about.
for typIdx, typ := range typs {
batch := colexec.RandomBatch(rng, []coltypes.T{typ}, int(coldata.BatchSize()), 0 /* length */, 0 /* nullProbability */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int {
// null bitmap and one for the values.
numBuffers := 2
switch t {
case coltypes.Bytes:
case coltypes.Bytes, coltypes.Timestamp:
// This type has an extra offsets buffer.
numBuffers = 3
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"strings"
"testing"
"time"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -132,6 +134,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo
}
builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid)
}
case coltypes.Timestamp:
var err error
now := timeutil.Now()
builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
data := make([][]byte, n)
for i := range data {
delta := rng.Int63()
ts := now.Add(time.Duration(delta))
data[i], err = ts.MarshalBinary()
if err != nil {
panic(err)
}
}
builder.(*array.BinaryBuilder).AppendValues(data, valid)
default:
panic(fmt.Sprintf("unsupported type %s", t))
}
Expand Down Expand Up @@ -180,7 +196,7 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
buf = bytes.Buffer{}
)

// We do not support decimals yet.
// We do not support decimals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Decimal {
continue
Expand Down
7 changes: 4 additions & 3 deletions pkg/col/coltypes/t_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/col/coltypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"strings"
"text/template"
"time"

"github.com/cockroachdb/apd"
)
Expand All @@ -38,6 +39,8 @@ const (
Int64
// Float64 is a column of type float64
Float64
// Timestamp is a column of type time.Time
Timestamp

// Unhandled is a temporary value that represents an unhandled type.
// TODO(jordan): this should be replaced by a panic once all types are
Expand Down Expand Up @@ -74,6 +77,7 @@ func init() {
CompatibleTypes[Int32] = append(CompatibleTypes[Int32], NumberTypes...)
CompatibleTypes[Int64] = append(CompatibleTypes[Int64], NumberTypes...)
CompatibleTypes[Float64] = append(CompatibleTypes[Float64], NumberTypes...)
CompatibleTypes[Timestamp] = append(CompatibleTypes[Timestamp], Timestamp)
}

// FromGoType returns the type for a Go value, if applicable. Shouldn't be used at
Expand All @@ -96,6 +100,8 @@ func FromGoType(v interface{}) T {
return Bytes
case apd.Decimal:
return Decimal
case time.Time:
return Timestamp
default:
panic(fmt.Sprintf("type %T not supported yet", t))
}
Expand All @@ -118,6 +124,8 @@ func (t T) GoTypeName() string {
return "int64"
case Float64:
return "float64"
case Timestamp:
return "time.Time"
default:
panic(fmt.Sprintf("unhandled type %d", t))
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package colencoding

import (
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -225,6 +227,14 @@ func decodeTableKeyToCol(
rkey, t, err = encoding.DecodeVarintDescending(key)
}
vec.Int64()[idx] = t
case types.TimestampFamily:
var t time.Time
if dir == sqlbase.IndexDescriptor_ASC {
rkey, t, err = encoding.DecodeTimeAscending(key)
} else {
rkey, t, err = encoding.DecodeTimeDescending(key)
}
vec.Timestamp()[idx] = t
default:
return rkey, errors.AssertionFailedf("unsupported type %+v", log.Safe(valType))
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colencoding/value_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package colencoding

import (
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -91,6 +93,10 @@ func decodeUntaggedDatumToCol(vec coldata.Vec, idx uint16, t *types.T, buf []byt
if err == nil {
vec.Bytes().Set(int(idx), data.GetBytes())
}
case types.TimestampFamily:
var t time.Time
buf, t, err = encoding.DecodeUntaggedTimeValue(buf)
vec.Timestamp()[idx] = t
default:
return buf, errors.AssertionFailedf(
"couldn't decode type: %s", log.Safe(t))
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package colexec

import (
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand All @@ -45,6 +47,9 @@ func newAnyNotNullAgg(t coltypes.T) (aggregateFunc, error) {
// Dummy import to pull in "apd" package.
var _ apd.Decimal

// Dummy import to pull in "time" package.
var _ time.Time

// _GOTYPESLICE is the template Go type slice variable for this operator. It
// will be replaced by the Go slice representation for each type in coltypes.T, for
// example []int64 for coltypes.Int64.
Expand Down
Loading

0 comments on commit 0f47384

Please sign in to comment.