Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.2: colexec: add timestamp type #42416

Merged
merged 3 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/col/coldata/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package coldata

import (
"fmt"
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand Down Expand Up @@ -76,6 +77,8 @@ type Vec interface {
// TODO(jordan): should this be [][]byte?
// Decimal returns an apd.Decimal slice.
Decimal() []apd.Decimal
// Timestamp returns a time.Time slice.
Timestamp() []time.Time

// Col returns the raw, typeless backing storage for this Vec.
Col() interface{}
Expand Down Expand Up @@ -151,6 +154,8 @@ func NewMemColumn(t coltypes.T, n int) Vec {
return &memColumn{t: t, col: make([]float64, n), nulls: nulls}
case coltypes.Decimal:
return &memColumn{t: t, col: make([]apd.Decimal, n), nulls: nulls}
case coltypes.Timestamp:
return &memColumn{t: t, col: make([]time.Time, n), nulls: nulls}
default:
panic(fmt.Sprintf("unhandled type %s", t))
}
Expand Down Expand Up @@ -192,6 +197,10 @@ func (m *memColumn) Decimal() []apd.Decimal {
return m.col.([]apd.Decimal)
}

func (m *memColumn) Timestamp() []time.Time {
return m.col.([]time.Time)
}

func (m *memColumn) Col() interface{} {
return m.col
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package coldata

import (
"fmt"
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand All @@ -45,6 +46,9 @@ type _GOTYPESLICE interface{}
// Dummy import to pull in "apd" package.
var _ apd.Decimal

// Dummy import to pull in "time" package.
var _ time.Time

// */}}

func (m *memColumn) Append(args SliceArgs) {
Expand Down
39 changes: 35 additions & 4 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var supportedTypes = func() map[coltypes.T]struct{} {
coltypes.Int32,
coltypes.Int64,
coltypes.Float64,
coltypes.Timestamp,
} {
typs[t] = struct{}{}
}
Expand All @@ -113,9 +114,10 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
arrowBitmap = n.NullBitmap()
}

if typ == coltypes.Bool || typ == coltypes.Bytes {
// Bools and Bytes are handled differently from other coltypes. Refer to the
// comment on ArrowBatchConverter.builders for more information.
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
// Bools, Bytes, and Timestamps are handled differently from other
// coltypes. Refer to the comment on ArrowBatchConverter.builders for
// more information.
var data *array.Data
switch typ {
case coltypes.Bool:
Expand All @@ -124,6 +126,16 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
case coltypes.Bytes:
c.builders.binaryBuilder.AppendValues(vec.Bytes().PrimitiveRepr()[:n], nil /* valid */)
data = c.builders.binaryBuilder.NewBinaryArray().Data()
case coltypes.Timestamp:
timestamps := vec.Timestamp()[:n]
for _, ts := range timestamps {
marshaled, err := ts.MarshalBinary()
if err != nil {
return nil, err
}
c.builders.binaryBuilder.Append(marshaled)
}
data = c.builders.binaryBuilder.NewBinaryArray().Data()
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
Expand Down Expand Up @@ -220,7 +232,7 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
d := data[i]

var arr array.Interface
if typ == coltypes.Bool || typ == coltypes.Bytes {
if typ == coltypes.Bool || typ == coltypes.Bytes || typ == coltypes.Timestamp {
switch typ {
case coltypes.Bool:
boolArr := array.NewBooleanData(d)
Expand All @@ -244,6 +256,25 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
vecArr.Set(i, bytes[offsets[i]:offsets[i+1]])
}
arr = bytesArr
case coltypes.Timestamp:
// TODO(yuzefovich): this serialization is quite inefficient - improve
// it.
bytesArr := array.NewBinaryData(d)
bytes := bytesArr.ValueBytes()
if bytes == nil {
// All bytes values are empty, so the representation is solely with the
// offsets slice, so create an empty slice so that the conversion
// corresponds.
bytes = make([]byte, 0)
}
offsets := bytesArr.ValueOffsets()
vecArr := vec.Timestamp()
for i := 0; i < len(offsets)-1; i++ {
if err := vecArr[i].UnmarshalBinary(bytes[offsets[i]:offsets[i+1]]); err != nil {
return err
}
}
arr = bytesArr
default:
panic(fmt.Sprintf("unexpected type %s", typ))
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func randomBatch() ([]coltypes.T, coldata.Batch) {

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(asubiotto): We do not support decimal conversion yet.
if typ == coltypes.Decimal {
// TODO(asubiotto,jordan): We do not support decimal, timestamp conversion yet.
if typ == coltypes.Decimal || typ == coltypes.Timestamp {
continue
}
availableTyps = append(availableTyps, typ)
Expand Down Expand Up @@ -119,9 +119,11 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

typs := []coltypes.T{coltypes.Decimal}
_, err := NewArrowBatchConverter(typs)
require.Error(t, err)
unsupportedTypes := []coltypes.T{coltypes.Decimal}
for _, typ := range unsupportedTypes {
_, err := NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
Expand Down Expand Up @@ -196,11 +198,21 @@ func BenchmarkArrowBatchConverter(b *testing.B) {

rng, _ := randutil.NewPseudoRand()

typs := []coltypes.T{coltypes.Bool, coltypes.Bytes, coltypes.Int64}
typs := []coltypes.T{
coltypes.Bool,
coltypes.Bytes,
coltypes.Int64,
coltypes.Timestamp,
}
// numBytes corresponds 1:1 to typs and specifies how many bytes we are
// converting on one iteration of the benchmark for the corresponding type in
// typs.
numBytes := []int64{int64(coldata.BatchSize()), fixedLen * int64(coldata.BatchSize()), 8 * int64(coldata.BatchSize())}
numBytes := []int64{
int64(coldata.BatchSize()),
fixedLen * int64(coldata.BatchSize()),
8 * int64(coldata.BatchSize()),
3 * 8 * int64(coldata.BatchSize()),
}
// Run a benchmark on every type we care about.
for typIdx, typ := range typs {
batch := colexec.RandomBatch(rng, []coltypes.T{typ}, int(coldata.BatchSize()), 0 /* length */, 0 /* nullProbability */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func numBuffersForType(t coltypes.T) int {
// null bitmap and one for the values.
numBuffers := 2
switch t {
case coltypes.Bytes:
case coltypes.Bytes, coltypes.Timestamp:
// This type has an extra offsets buffer.
numBuffers = 3
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"strings"
"testing"
"time"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -132,6 +134,20 @@ func randomDataFromType(rng *rand.Rand, t coltypes.T, n int, nullProbability flo
}
builder.(*array.FixedSizeBinaryBuilder).AppendValues(data, valid)
}
case coltypes.Timestamp:
var err error
now := timeutil.Now()
builder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
data := make([][]byte, n)
for i := range data {
delta := rng.Int63()
ts := now.Add(time.Duration(delta))
data[i], err = ts.MarshalBinary()
if err != nil {
panic(err)
}
}
builder.(*array.BinaryBuilder).AppendValues(data, valid)
default:
panic(fmt.Sprintf("unsupported type %s", t))
}
Expand Down Expand Up @@ -180,7 +196,7 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
buf = bytes.Buffer{}
)

// We do not support decimals yet.
// We do not support decimals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Decimal {
continue
Expand Down
7 changes: 4 additions & 3 deletions pkg/col/coltypes/t_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/col/coltypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"strings"
"text/template"
"time"

"github.com/cockroachdb/apd"
)
Expand All @@ -38,6 +39,8 @@ const (
Int64
// Float64 is a column of type float64
Float64
// Timestamp is a column of type time.Time
Timestamp

// Unhandled is a temporary value that represents an unhandled type.
// TODO(jordan): this should be replaced by a panic once all types are
Expand Down Expand Up @@ -74,6 +77,7 @@ func init() {
CompatibleTypes[Int32] = append(CompatibleTypes[Int32], NumberTypes...)
CompatibleTypes[Int64] = append(CompatibleTypes[Int64], NumberTypes...)
CompatibleTypes[Float64] = append(CompatibleTypes[Float64], NumberTypes...)
CompatibleTypes[Timestamp] = append(CompatibleTypes[Timestamp], Timestamp)
}

// FromGoType returns the type for a Go value, if applicable. Shouldn't be used at
Expand All @@ -96,6 +100,8 @@ func FromGoType(v interface{}) T {
return Bytes
case apd.Decimal:
return Decimal
case time.Time:
return Timestamp
default:
panic(fmt.Sprintf("type %T not supported yet", t))
}
Expand All @@ -118,6 +124,8 @@ func (t T) GoTypeName() string {
return "int64"
case Float64:
return "float64"
case Timestamp:
return "time.Time"
default:
panic(fmt.Sprintf("unhandled type %d", t))
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package colencoding

import (
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -225,6 +227,14 @@ func decodeTableKeyToCol(
rkey, t, err = encoding.DecodeVarintDescending(key)
}
vec.Int64()[idx] = t
case types.TimestampFamily:
var t time.Time
if dir == sqlbase.IndexDescriptor_ASC {
rkey, t, err = encoding.DecodeTimeAscending(key)
} else {
rkey, t, err = encoding.DecodeTimeDescending(key)
}
vec.Timestamp()[idx] = t
default:
return rkey, errors.AssertionFailedf("unsupported type %+v", log.Safe(valType))
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colencoding/value_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package colencoding

import (
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -91,6 +93,10 @@ func decodeUntaggedDatumToCol(vec coldata.Vec, idx uint16, t *types.T, buf []byt
if err == nil {
vec.Bytes().Set(int(idx), data.GetBytes())
}
case types.TimestampFamily:
var t time.Time
buf, t, err = encoding.DecodeUntaggedTimeValue(buf)
vec.Timestamp()[idx] = t
default:
return buf, errors.AssertionFailedf(
"couldn't decode type: %s", log.Safe(t))
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package colexec

import (
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
Expand All @@ -45,6 +47,9 @@ func newAnyNotNullAgg(t coltypes.T) (aggregateFunc, error) {
// Dummy import to pull in "apd" package.
var _ apd.Decimal

// Dummy import to pull in "time" package.
var _ time.Time

// _GOTYPESLICE is the template Go type slice variable for this operator. It
// will be replaced by the Go slice representation for each type in coltypes.T, for
// example []int64 for coltypes.Int64.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/builtin_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (b *defaultBuiltinFuncOperator) Next(ctx context.Context) coldata.Batch {

for j := range b.argumentCols {
col := batch.ColVec(b.argumentCols[j])
b.row[j] = PhysicalTypeColElemToDatum(col, rowIdx, b.da, b.columnTypes[b.argumentCols[j]])
b.row[j] = PhysicalTypeColElemToDatum(col, rowIdx, b.da, &b.columnTypes[b.argumentCols[j]])
hasNulls = hasNulls || b.row[j] == tree.DNull
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ func (rf *cFetcher) pushState(state fetcherState) {
// getDatumAt returns the converted datum object at the given (colIdx, rowIdx).
// This function is meant for tracing and should not be used in hot paths.
func (rf *cFetcher) getDatumAt(colIdx int, rowIdx uint16, typ types.T) tree.Datum {
return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ)
return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, &typ)
}

// processValue processes the state machine's current value component, setting
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/const_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package colexec

import (
"context"
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand All @@ -36,6 +37,9 @@ import (
// Dummy import to pull in "apd" package.
var _ apd.Decimal

// Dummy import to pull in "time" package.
var _ time.Time

// _TYPES_T is the template type variable for coltypes.T. It will be replaced by
// coltypes.Foo for each type Foo in the coltypes.T type.
const _TYPES_T = coltypes.Unhandled
Expand Down
Loading