From 26149d9fab0360e6d4d9a295f934100470c4bc37 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 14 Nov 2023 11:44:19 -0500 Subject: [PATCH] GH-38718: [Go][Format][Integration] Add StringView/BinaryView to Go implementation (#35769) ### Rationale for this change See #35628 for the rationale and description of the StringView/BinaryView array types. This change is adding Go as a second implementation of it. ### What changes are included in this PR? Add Array Types for `StringView` and `BinaryView` along with `StringViewType` and `BinaryViewType` and necessary enums and builders. These arrays can be round tripped through JSON and IPC. ### Are these changes tested? Yes, unit tests have been added and integration tests run * Closes: [#38718](https://github.com/apache/arrow/issues/38718) * Closes: #38718 Lead-authored-by: Matt Topol Co-authored-by: Alex Shcherbakov Signed-off-by: Benjamin Kietzman --- .gitattributes | 3 + docs/source/status.rst | 4 + format/Schema.fbs | 2 +- go/arrow/array/array.go | 3 +- go/arrow/array/binary.go | 121 +++++++ go/arrow/array/binary_test.go | 24 ++ go/arrow/array/binarybuilder.go | 329 ++++++++++++++++++ go/arrow/array/bufferbuilder.go | 108 ++++++ go/arrow/array/builder.go | 4 + go/arrow/array/compare.go | 12 + go/arrow/array/concat.go | 30 +- go/arrow/array/concat_test.go | 3 + go/arrow/array/string.go | 196 ++++++++++- go/arrow/array/string_test.go | 173 +++++++++ go/arrow/compute/executor.go | 5 +- go/arrow/datatype.go | 7 + go/arrow/datatype_binary.go | 41 +++ go/arrow/datatype_binary_test.go | 30 ++ go/arrow/datatype_viewheader.go | 141 ++++++++ go/arrow/datatype_viewheader_inline.go | 31 ++ go/arrow/datatype_viewheader_inline_go1.19.go | 35 ++ go/arrow/datatype_viewheader_inline_tinygo.go | 35 ++ go/arrow/internal/arrdata/arrdata.go | 81 +++++ go/arrow/internal/arrjson/arrjson.go | 150 ++++++++ go/arrow/internal/arrjson/arrjson_test.go | 259 ++++++++++++++ go/arrow/internal/flatbuf/MetadataVersion.go | 2 +- .../internal/testing/gen/random_array_gen.go | 34 ++ go/arrow/ipc/endian_swap.go | 4 + go/arrow/ipc/file_reader.go | 38 +- go/arrow/ipc/message.go | 10 +- go/arrow/ipc/metadata.go | 40 ++- go/arrow/ipc/writer.go | 36 +- go/arrow/type_traits_view.go | 53 +++ 33 files changed, 2011 insertions(+), 33 deletions(-) create mode 100644 go/arrow/datatype_viewheader.go create mode 100644 go/arrow/datatype_viewheader_inline.go create mode 100644 go/arrow/datatype_viewheader_inline_go1.19.go create mode 100644 go/arrow/datatype_viewheader_inline_tinygo.go create mode 100644 go/arrow/type_traits_view.go diff --git a/.gitattributes b/.gitattributes index 69f4139c4e4f4..70007c26c8b9b 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3,6 +3,9 @@ cpp/src/generated/*.cpp linguist-generated=true cpp/src/generated/*.h linguist-generated=true go/**/*.s linguist-generated=true go/arrow/unionmode_string.go linguist-generated=true +go/arrow/internal/flatbuf/*.go linguist-generated=true +go/**/*.pb.go linguist-generated=true +go/parquet/internal/gen-go/parquet/*.go linguist-generated=true r/R/RcppExports.R linguist-generated=true r/R/arrowExports.R linguist-generated=true r/src/RcppExports.cpp linguist-generated=true diff --git a/docs/source/status.rst b/docs/source/status.rst index c8c0e6dfc1dfe..c059ab3cef971 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -68,6 +68,10 @@ Data Types +-------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | Large Utf8 | ✓ | ✓ | ✓ | | | ✓ | ✓ | | +-------------------+-------+-------+-------+------------+-------+-------+-------+-------+ +| Binary View | ✓ | | ✓ | | | | | | ++-------------------+-------+-------+-------+------------+-------+-------+-------+-------+ +| String View | ✓ | | ✓ | | | | | | ++-------------------+-------+-------+-------+------------+-------+-------+-------+-------+ +-------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | Data type | C++ | Java | Go | JavaScript | C# | Rust | Julia | Swift | diff --git a/format/Schema.fbs b/format/Schema.fbs index 6adbcb115cde3..dbf482e6cc786 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -40,7 +40,7 @@ enum MetadataVersion:short { /// >= 0.8.0 (December 2017). Non-backwards compatible with V3. V4, - /// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 + /// >= 1.0.0 (July 2020). Backwards compatible with V4 (V5 readers can read V4 /// metadata and IPC messages). Implementations are recommended to provide a /// V4 compatibility mode with V5 format changes disabled. /// diff --git a/go/arrow/array/array.go b/go/arrow/array/array.go index bbe301ee661f3..5aacc8f99a4ee 100644 --- a/go/arrow/array/array.go +++ b/go/arrow/array/array.go @@ -178,7 +178,8 @@ func init() { arrow.RUN_END_ENCODED: func(data arrow.ArrayData) arrow.Array { return NewRunEndEncodedData(data) }, arrow.LIST_VIEW: func(data arrow.ArrayData) arrow.Array { return NewListViewData(data) }, arrow.LARGE_LIST_VIEW: func(data arrow.ArrayData) arrow.Array { return NewLargeListViewData(data) }, - + arrow.BINARY_VIEW: func(data arrow.ArrayData) arrow.Array { return NewBinaryViewData(data) }, + arrow.STRING_VIEW: func(data arrow.ArrayData) arrow.Array { return NewStringViewData(data) }, // invalid data types to fill out array to size 2^6 - 1 63: invalidDataType, } diff --git a/go/arrow/array/binary.go b/go/arrow/array/binary.go index bf27139fddbaa..c226297da04c6 100644 --- a/go/arrow/array/binary.go +++ b/go/arrow/array/binary.go @@ -24,6 +24,7 @@ import ( "unsafe" "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/internal/json" ) @@ -318,6 +319,126 @@ func arrayEqualLargeBinary(left, right *LargeBinary) bool { return true } +type ViewLike interface { + arrow.Array + ValueHeader(int) *arrow.ViewHeader +} + +type BinaryView struct { + array + values []arrow.ViewHeader + dataBuffers []*memory.Buffer +} + +func NewBinaryViewData(data arrow.ArrayData) *BinaryView { + a := &BinaryView{} + a.refCount = 1 + a.setData(data.(*Data)) + return a +} + +func (a *BinaryView) setData(data *Data) { + if len(data.buffers) < 2 { + panic("len(data.buffers) < 2") + } + a.array.setData(data) + + if valueData := data.buffers[1]; valueData != nil { + a.values = arrow.ViewHeaderTraits.CastFromBytes(valueData.Bytes()) + } + + a.dataBuffers = data.buffers[2:] +} + +func (a *BinaryView) ValueHeader(i int) *arrow.ViewHeader { + if i < 0 || i >= a.array.data.length { + panic("arrow/array: index out of range") + } + return &a.values[a.array.data.offset+i] +} + +func (a *BinaryView) Value(i int) []byte { + s := a.ValueHeader(i) + if s.IsInline() { + return s.InlineBytes() + } + start := s.BufferOffset() + buf := a.dataBuffers[s.BufferIndex()] + return buf.Bytes()[start : start+int32(s.Len())] +} + +// ValueString returns the value at index i as a string instead of +// a byte slice, without copying the underlying data. +func (a *BinaryView) ValueString(i int) string { + b := a.Value(i) + return *(*string)(unsafe.Pointer(&b)) +} + +func (a *BinaryView) String() string { + var o strings.Builder + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + switch { + case a.IsNull(i): + o.WriteString(NullValueStr) + default: + fmt.Fprintf(&o, "%q", a.ValueString(i)) + } + } + o.WriteString("]") + return o.String() +} + +// ValueStr is paired with AppendValueFromString in that it returns +// the value at index i as a string: Semantically this means that for +// a null value it will return the string "(null)", otherwise it will +// return the value as a base64 encoded string suitable for CSV/JSON. +// +// This is always going to be less performant than just using ValueString +// and exists to fulfill the Array interface to provide a method which +// can produce a human readable string for a given index. +func (a *BinaryView) ValueStr(i int) string { + if a.IsNull(i) { + return NullValueStr + } + return base64.StdEncoding.EncodeToString(a.Value(i)) +} + +func (a *BinaryView) GetOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + return a.Value(i) +} + +func (a *BinaryView) MarshalJSON() ([]byte, error) { + vals := make([]interface{}, a.Len()) + for i := 0; i < a.Len(); i++ { + vals[i] = a.GetOneForMarshal(i) + } + // golang marshal standard says that []byte will be marshalled + // as a base64-encoded string + return json.Marshal(vals) +} + +func arrayEqualBinaryView(left, right *BinaryView) bool { + leftBufs, rightBufs := left.dataBuffers, right.dataBuffers + for i := 0; i < left.Len(); i++ { + if left.IsNull(i) { + continue + } + if !left.ValueHeader(i).Equals(leftBufs, right.ValueHeader(i), rightBufs) { + return false + } + } + return true +} + var ( _ arrow.Array = (*Binary)(nil) + _ arrow.Array = (*LargeBinary)(nil) + _ arrow.Array = (*BinaryView)(nil) ) diff --git a/go/arrow/array/binary_test.go b/go/arrow/array/binary_test.go index 9c1770950a8b5..c9e165515225b 100644 --- a/go/arrow/array/binary_test.go +++ b/go/arrow/array/binary_test.go @@ -700,3 +700,27 @@ func TestBinaryStringRoundTrip(t *testing.T) { assert.True(t, Equal(arr, arr1)) } + +func TestBinaryViewStringRoundTrip(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + values := []string{"a", "bc", "", "", "supercalifragilistic", "", "expeallodocious"} + valid := []bool{true, true, false, false, true, true, true} + + b := NewBinaryViewBuilder(mem) + defer b.Release() + + b.AppendStringValues(values, valid) + arr := b.NewArray().(*BinaryView) + defer arr.Release() + + for i := 0; i < arr.Len(); i++ { + assert.NoError(t, b.AppendValueFromString(arr.ValueStr(i))) + } + + arr1 := b.NewArray().(*BinaryView) + defer arr1.Release() + + assert.True(t, Equal(arr, arr1)) +} diff --git a/go/arrow/array/binarybuilder.go b/go/arrow/array/binarybuilder.go index a51bc799e4965..21ad576508e9e 100644 --- a/go/arrow/array/binarybuilder.go +++ b/go/arrow/array/binarybuilder.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "sync/atomic" + "unsafe" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/internal/debug" @@ -370,6 +371,334 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error { return b.Unmarshal(dec) } +const ( + dfltBlockSize = 32 << 10 // 32 KB + viewValueSizeLimit int32 = math.MaxInt32 +) + +type BinaryViewBuilder struct { + builder + dtype arrow.BinaryDataType + + data *memory.Buffer + rawData []arrow.ViewHeader + + blockBuilder multiBufferBuilder +} + +func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder { + return &BinaryViewBuilder{ + dtype: arrow.BinaryTypes.BinaryView, + builder: builder{ + refCount: 1, + mem: mem, + }, + blockBuilder: multiBufferBuilder{ + refCount: 1, + blockSize: dfltBlockSize, + mem: mem, + }, + } +} + +func (b *BinaryViewBuilder) SetBlockSize(sz uint) { + b.blockBuilder.blockSize = int(sz) +} + +func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype } + +func (b *BinaryViewBuilder) Release() { + debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") + + if atomic.AddInt64(&b.refCount, -1) != 0 { + return + } + + if b.nullBitmap != nil { + b.nullBitmap.Release() + b.nullBitmap = nil + } + if b.data != nil { + b.data.Release() + b.data = nil + b.rawData = nil + } +} + +func (b *BinaryViewBuilder) init(capacity int) { + b.builder.init(capacity) + b.data = memory.NewResizableBuffer(b.mem) + bytesN := arrow.ViewHeaderTraits.BytesRequired(capacity) + b.data.Resize(bytesN) + b.rawData = arrow.ViewHeaderTraits.CastFromBytes(b.data.Bytes()) +} + +func (b *BinaryViewBuilder) Resize(n int) { + nbuild := n + if n < minBuilderCapacity { + n = minBuilderCapacity + } + + if b.capacity == 0 { + b.init(n) + return + } + + b.builder.resize(nbuild, b.init) + b.data.Resize(arrow.ViewHeaderTraits.BytesRequired(n)) + b.rawData = arrow.ViewHeaderTraits.CastFromBytes(b.data.Bytes()) +} + +func (b *BinaryViewBuilder) ReserveData(length int) { + if int32(length) > viewValueSizeLimit { + panic(fmt.Errorf("%w: BinaryView or StringView elements cannot reference strings larger than 2GB", + arrow.ErrInvalid)) + } + b.blockBuilder.Reserve(int(length)) +} + +func (b *BinaryViewBuilder) Reserve(n int) { + b.builder.reserve(n, b.Resize) +} + +func (b *BinaryViewBuilder) Append(v []byte) { + if int32(len(v)) > viewValueSizeLimit { + panic(fmt.Errorf("%w: BinaryView or StringView elements cannot reference strings larger than 2GB", arrow.ErrInvalid)) + } + + if !arrow.IsViewInline(len(v)) { + b.ReserveData(len(v)) + } + + b.Reserve(1) + b.UnsafeAppend(v) +} + +// AppendString is identical to Append, only accepting a string instead +// of a byte slice, avoiding the extra copy that would occur if you simply +// did []byte(v). +// +// This is different than AppendValueFromString which exists for the +// Builder interface, in that this expects raw binary data which is +// appended unmodified. AppendValueFromString expects base64 encoded binary +// data instead. +func (b *BinaryViewBuilder) AppendString(v string) { + // create a []byte without copying the bytes + // in go1.20 this would be unsafe.StringData + val := *(*[]byte)(unsafe.Pointer(&struct { + string + int + }{v, len(v)})) + b.Append(val) +} + +func (b *BinaryViewBuilder) AppendNull() { + b.Reserve(1) + b.UnsafeAppendBoolToBitmap(false) +} + +func (b *BinaryViewBuilder) AppendNulls(n int) { + b.Reserve(n) + for i := 0; i < n; i++ { + b.UnsafeAppendBoolToBitmap(false) + } +} + +func (b *BinaryViewBuilder) AppendEmptyValue() { + b.Reserve(1) + b.UnsafeAppendBoolToBitmap(true) +} + +func (b *BinaryViewBuilder) AppendEmptyValues(n int) { + b.Reserve(n) + b.unsafeAppendBoolsToBitmap(nil, n) +} + +func (b *BinaryViewBuilder) UnsafeAppend(v []byte) { + hdr := &b.rawData[b.length] + hdr.SetBytes(v) + if !hdr.IsInline() { + b.blockBuilder.UnsafeAppend(hdr, v) + } + b.UnsafeAppendBoolToBitmap(true) +} + +func (b *BinaryViewBuilder) AppendValues(v [][]byte, valid []bool) { + if len(v) != len(valid) && len(valid) != 0 { + panic("len(v) != len(valid) && len(valid) != 0") + } + + if len(v) == 0 { + return + } + + b.Reserve(len(v)) + outOfLineTotal := 0 + for i, vv := range v { + if len(valid) == 0 || valid[i] { + if !arrow.IsViewInline(len(vv)) { + outOfLineTotal += len(vv) + } + } + } + + b.ReserveData(outOfLineTotal) + for i, vv := range v { + if len(valid) == 0 || valid[i] { + hdr := &b.rawData[b.length+i] + hdr.SetBytes(vv) + if !hdr.IsInline() { + b.blockBuilder.UnsafeAppend(hdr, vv) + } + } + } + + b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) +} + +func (b *BinaryViewBuilder) AppendStringValues(v []string, valid []bool) { + if len(v) != len(valid) && len(valid) != 0 { + panic("len(v) != len(valid) && len(valid) != 0") + } + + if len(v) == 0 { + return + } + + b.Reserve(len(v)) + outOfLineTotal := 0 + for i, vv := range v { + if len(valid) == 0 || valid[i] { + if !arrow.IsViewInline(len(vv)) { + outOfLineTotal += len(vv) + } + } + } + + b.ReserveData(outOfLineTotal) + for i, vv := range v { + if len(valid) == 0 || valid[i] { + hdr := &b.rawData[b.length+i] + hdr.SetString(vv) + if !hdr.IsInline() { + b.blockBuilder.UnsafeAppendString(hdr, vv) + } + } + } + + b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) +} + +// AppendValueFromString is paired with ValueStr for fulfilling the +// base Builder interface. This is intended to read in a human-readable +// string such as from CSV or JSON and append it to the array. +// +// For Binary values are expected to be base64 encoded (and will be +// decoded as such before being appended). +func (b *BinaryViewBuilder) AppendValueFromString(s string) error { + if s == NullValueStr { + b.AppendNull() + return nil + } + + if b.dtype.IsUtf8() { + b.Append([]byte(s)) + return nil + } + + decodedVal, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return fmt.Errorf("could not decode base64 string: %w", err) + } + b.Append(decodedVal) + return nil +} + +func (b *BinaryViewBuilder) UnmarshalOne(dec *json.Decoder) error { + t, err := dec.Token() + if err != nil { + return err + } + + switch v := t.(type) { + case string: + data, err := base64.StdEncoding.DecodeString(v) + if err != nil { + return err + } + b.Append(data) + case []byte: + b.Append(v) + case nil: + b.AppendNull() + default: + return &json.UnmarshalTypeError{ + Value: fmt.Sprint(t), + Type: reflect.TypeOf([]byte{}), + Offset: dec.InputOffset(), + } + } + return nil +} + +func (b *BinaryViewBuilder) Unmarshal(dec *json.Decoder) error { + for dec.More() { + if err := b.UnmarshalOne(dec); err != nil { + return err + } + } + return nil +} + +func (b *BinaryViewBuilder) UnmarshalJSON(data []byte) error { + dec := json.NewDecoder(bytes.NewReader(data)) + t, err := dec.Token() + if err != nil { + return err + } + + if delim, ok := t.(json.Delim); !ok || delim != '[' { + return fmt.Errorf("binary view builder must unpack from json array, found %s", delim) + } + + return b.Unmarshal(dec) +} + +func (b *BinaryViewBuilder) newData() (data *Data) { + bytesRequired := arrow.ViewHeaderTraits.BytesRequired(b.length) + if bytesRequired > 0 && bytesRequired < b.data.Len() { + // trim buffers + b.data.Resize(bytesRequired) + } + + dataBuffers := b.blockBuilder.Finish() + data = NewData(b.dtype, b.length, append([]*memory.Buffer{ + b.nullBitmap, b.data}, dataBuffers...), nil, b.nulls, 0) + b.reset() + + if b.data != nil { + b.data.Release() + b.data = nil + b.rawData = nil + for _, buf := range dataBuffers { + buf.Release() + } + } + return +} + +func (b *BinaryViewBuilder) NewBinaryViewArray() (a *BinaryView) { + data := b.newData() + a = NewBinaryViewData(data) + data.Release() + return +} + +func (b *BinaryViewBuilder) NewArray() arrow.Array { + return b.NewBinaryViewArray() +} + var ( _ Builder = (*BinaryBuilder)(nil) + _ Builder = (*BinaryViewBuilder)(nil) ) diff --git a/go/arrow/array/bufferbuilder.go b/go/arrow/array/bufferbuilder.go index cb381e25b32a2..13741ba8926ac 100644 --- a/go/arrow/array/bufferbuilder.go +++ b/go/arrow/array/bufferbuilder.go @@ -18,7 +18,9 @@ package array import ( "sync/atomic" + "unsafe" + "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/bitutil" "github.com/apache/arrow/go/v15/arrow/internal/debug" "github.com/apache/arrow/go/v15/arrow/memory" @@ -151,3 +153,109 @@ func (b *bufferBuilder) unsafeAppend(data []byte) { copy(b.bytes[b.length:], data) b.length += len(data) } + +type multiBufferBuilder struct { + refCount int64 + blockSize int + + mem memory.Allocator + blocks []*memory.Buffer + currentOutBuffer int +} + +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (b *multiBufferBuilder) Retain() { + atomic.AddInt64(&b.refCount, 1) +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (b *multiBufferBuilder) Release() { + debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") + + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.Reset() + } +} + +func (b *multiBufferBuilder) Reserve(nbytes int) { + if len(b.blocks) == 0 { + out := memory.NewResizableBuffer(b.mem) + if nbytes < b.blockSize { + nbytes = b.blockSize + } + out.Reserve(nbytes) + b.currentOutBuffer = 0 + b.blocks = []*memory.Buffer{out} + return + } + + curBuf := b.blocks[b.currentOutBuffer] + remain := curBuf.Cap() - curBuf.Len() + if nbytes <= remain { + return + } + + // search for underfull block that has enough bytes + for i, block := range b.blocks { + remaining := block.Cap() - block.Len() + if nbytes <= remaining { + b.currentOutBuffer = i + return + } + } + + // current buffer doesn't have enough space, no underfull buffers + // make new buffer and set that as our current. + newBuf := memory.NewResizableBuffer(b.mem) + if nbytes < b.blockSize { + nbytes = b.blockSize + } + + newBuf.Reserve(nbytes) + b.currentOutBuffer = len(b.blocks) + b.blocks = append(b.blocks, newBuf) +} + +func (b *multiBufferBuilder) RemainingBytes() int { + if len(b.blocks) == 0 { + return 0 + } + + buf := b.blocks[b.currentOutBuffer] + return buf.Cap() - buf.Len() +} + +func (b *multiBufferBuilder) Reset() { + b.currentOutBuffer = 0 + for _, block := range b.Finish() { + block.Release() + } +} + +func (b *multiBufferBuilder) UnsafeAppend(hdr *arrow.ViewHeader, val []byte) { + buf := b.blocks[b.currentOutBuffer] + idx, offset := b.currentOutBuffer, buf.Len() + hdr.SetIndexOffset(int32(idx), int32(offset)) + + n := copy(buf.Buf()[offset:], val) + buf.ResizeNoShrink(offset + n) +} + +func (b *multiBufferBuilder) UnsafeAppendString(hdr *arrow.ViewHeader, val string) { + // create a byte slice with zero-copies + // in go1.20 this would be equivalent to unsafe.StringData + v := *(*[]byte)(unsafe.Pointer(&struct { + string + int + }{val, len(val)})) + b.UnsafeAppend(hdr, v) +} + +func (b *multiBufferBuilder) Finish() (out []*memory.Buffer) { + b.currentOutBuffer = 0 + out, b.blocks = b.blocks, nil + return +} diff --git a/go/arrow/array/builder.go b/go/arrow/array/builder.go index bb15298e03ccf..279804a1cdb9f 100644 --- a/go/arrow/array/builder.go +++ b/go/arrow/array/builder.go @@ -364,6 +364,10 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder { case arrow.RUN_END_ENCODED: typ := dtype.(*arrow.RunEndEncodedType) return NewRunEndEncodedBuilder(mem, typ.RunEnds(), typ.Encoded()) + case arrow.BINARY_VIEW: + return NewBinaryViewBuilder(mem) + case arrow.STRING_VIEW: + return NewStringViewBuilder(mem) } panic(fmt.Errorf("arrow/array: unsupported builder for %T", dtype)) } diff --git a/go/arrow/array/compare.go b/go/arrow/array/compare.go index 778de41e32c67..372293a61d6cb 100644 --- a/go/arrow/array/compare.go +++ b/go/arrow/array/compare.go @@ -232,6 +232,12 @@ func Equal(left, right arrow.Array) bool { case *LargeString: r := right.(*LargeString) return arrayEqualLargeString(l, r) + case *BinaryView: + r := right.(*BinaryView) + return arrayEqualBinaryView(l, r) + case *StringView: + r := right.(*StringView) + return arrayEqualStringView(l, r) case *Int8: r := right.(*Int8) return arrayEqualInt8(l, r) @@ -482,6 +488,12 @@ func arrayApproxEqual(left, right arrow.Array, opt equalOption) bool { case *LargeString: r := right.(*LargeString) return arrayEqualLargeString(l, r) + case *BinaryView: + r := right.(*BinaryView) + return arrayEqualBinaryView(l, r) + case *StringView: + r := right.(*StringView) + return arrayEqualStringView(l, r) case *Int8: r := right.(*Int8) return arrayEqualInt8(l, r) diff --git a/go/arrow/array/concat.go b/go/arrow/array/concat.go index 53c5be06895b9..fa3554c1c0555 100644 --- a/go/arrow/array/concat.go +++ b/go/arrow/array/concat.go @@ -600,6 +600,35 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, } case arrow.FixedWidthDataType: out.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem) + case arrow.BinaryViewDataType: + out.buffers = out.buffers[:2] + for _, d := range data { + for _, buf := range d.Buffers()[2:] { + buf.Retain() + out.buffers = append(out.buffers, buf) + } + } + + out.buffers[1] = concatBuffers(gatherFixedBuffers(data, 1, arrow.ViewHeaderSizeBytes), mem) + + var ( + s = arrow.ViewHeaderTraits.CastFromBytes(out.buffers[1].Bytes()) + i = data[0].Len() + precedingBufsCount int + ) + + for idx := 1; idx < len(data); idx++ { + precedingBufsCount += len(data[idx-1].Buffers()) - 2 + + for end := i + data[idx].Len(); i < end; i++ { + if s[i].IsInline() { + continue + } + + bufIndex := s[i].BufferIndex() + int32(precedingBufsCount) + s[i].SetIndexOffset(bufIndex, s[i].BufferOffset()) + } + } case arrow.BinaryDataType: offsetWidth := dt.Layout().Buffers[1].ByteWidth offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem) @@ -739,7 +768,6 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, out.childData[0].Release() return nil, err } - default: return nil, fmt.Errorf("concatenate not implemented for type %s", dt) } diff --git a/go/arrow/array/concat_test.go b/go/arrow/array/concat_test.go index 1cc484ad1a923..7b22d97a41e00 100644 --- a/go/arrow/array/concat_test.go +++ b/go/arrow/array/concat_test.go @@ -84,6 +84,7 @@ func TestConcatenate(t *testing.T) { {arrow.StructOf()}, {arrow.MapOf(arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Int8)}, {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.PrimitiveTypes.Float64}}, + {arrow.BinaryTypes.StringView}, } for _, tt := range tests { @@ -150,6 +151,8 @@ func (cts *ConcatTestSuite) generateArr(size int64, nullprob float64) arrow.Arra return cts.rng.String(size, 0, 15, nullprob) case arrow.LARGE_STRING: return cts.rng.LargeString(size, 0, 15, nullprob) + case arrow.STRING_VIEW: + return cts.rng.StringView(size, 0, 20, nullprob) case arrow.LIST: valuesSize := size * 4 values := cts.rng.Int8(valuesSize, 0, 127, nullprob).(*array.Int8) diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go index 9ab7c938ef5d8..90a4628f0d0fb 100644 --- a/go/arrow/array/string.go +++ b/go/arrow/array/string.go @@ -28,6 +28,11 @@ import ( "github.com/apache/arrow/go/v15/internal/json" ) +type StringLike interface { + arrow.Array + Value(int) string +} + // String represents an immutable sequence of variable-length UTF-8 strings. type String struct { array @@ -310,6 +315,108 @@ func arrayEqualLargeString(left, right *LargeString) bool { return true } +type StringView struct { + array + values []arrow.ViewHeader + dataBuffers []*memory.Buffer +} + +func NewStringViewData(data arrow.ArrayData) *StringView { + a := &StringView{} + a.refCount = 1 + a.setData(data.(*Data)) + return a +} + +// Reset resets the String with a different set of Data. +func (a *StringView) Reset(data arrow.ArrayData) { + a.setData(data.(*Data)) +} + +func (a *StringView) setData(data *Data) { + if len(data.buffers) < 2 { + panic("len(data.buffers) < 2") + } + a.array.setData(data) + + if valueData := data.buffers[1]; valueData != nil { + a.values = arrow.ViewHeaderTraits.CastFromBytes(valueData.Bytes()) + } + + a.dataBuffers = data.buffers[2:] +} + +func (a *StringView) ValueHeader(i int) *arrow.ViewHeader { + if i < 0 || i >= a.array.data.length { + panic("arrow/array: index out of range") + } + return &a.values[a.array.data.offset+i] +} + +func (a *StringView) Value(i int) string { + s := a.ValueHeader(i) + if s.IsInline() { + return s.InlineString() + } + start := s.BufferOffset() + buf := a.dataBuffers[s.BufferIndex()] + value := buf.Bytes()[start : start+int32(s.Len())] + return *(*string)(unsafe.Pointer(&value)) +} + +func (a *StringView) String() string { + var o strings.Builder + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + switch { + case a.IsNull(i): + o.WriteString(NullValueStr) + default: + fmt.Fprintf(&o, "%q", a.Value(i)) + } + } + o.WriteString("]") + return o.String() +} + +func (a *StringView) ValueStr(i int) string { + if a.IsNull(i) { + return NullValueStr + } + return a.Value(i) +} + +func (a *StringView) GetOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + return a.Value(i) +} + +func (a *StringView) MarshalJSON() ([]byte, error) { + vals := make([]interface{}, a.Len()) + for i := 0; i < a.Len(); i++ { + vals[i] = a.GetOneForMarshal(i) + } + return json.Marshal(vals) +} + +func arrayEqualStringView(left, right *StringView) bool { + leftBufs, rightBufs := left.dataBuffers, right.dataBuffers + for i := 0; i < left.Len(); i++ { + if left.IsNull(i) { + continue + } + if !left.ValueHeader(i).Equals(leftBufs, right.ValueHeader(i), rightBufs) { + return false + } + } + return true +} + // A StringBuilder is used to build a String array using the Append methods. type StringBuilder struct { *BinaryBuilder @@ -344,10 +451,6 @@ func (b *StringBuilder) Value(i int) string { return string(b.BinaryBuilder.Value(i)) } -// func (b *StringBuilder) UnsafeAppend(v string) { -// b.BinaryBuilder.UnsafeAppend([]byte(v)) -// } - // NewArray creates a String array from the memory buffers used by the builder and resets the StringBuilder // so it can be used to build a new array. func (b *StringBuilder) NewArray() arrow.Array { @@ -441,10 +544,6 @@ func (b *LargeStringBuilder) Value(i int) string { return string(b.BinaryBuilder.Value(i)) } -// func (b *LargeStringBuilder) UnsafeAppend(v string) { -// b.BinaryBuilder.UnsafeAppend([]byte(v)) -// } - // NewArray creates a String array from the memory buffers used by the builder and resets the StringBuilder // so it can be used to build a new array. func (b *LargeStringBuilder) NewArray() arrow.Array { @@ -504,9 +603,87 @@ func (b *LargeStringBuilder) UnmarshalJSON(data []byte) error { return b.Unmarshal(dec) } +type StringViewBuilder struct { + *BinaryViewBuilder +} + +func NewStringViewBuilder(mem memory.Allocator) *StringViewBuilder { + bldr := &StringViewBuilder{ + BinaryViewBuilder: NewBinaryViewBuilder(mem), + } + bldr.dtype = arrow.BinaryTypes.StringView + return bldr +} + +func (b *StringViewBuilder) Append(v string) { + b.BinaryViewBuilder.AppendString(v) +} + +func (b *StringViewBuilder) AppendValues(v []string, valid []bool) { + b.BinaryViewBuilder.AppendStringValues(v, valid) +} + +func (b *StringViewBuilder) UnmarshalOne(dec *json.Decoder) error { + t, err := dec.Token() + if err != nil { + return err + } + + switch v := t.(type) { + case string: + b.Append(v) + case []byte: + b.BinaryViewBuilder.Append(v) + case nil: + b.AppendNull() + default: + return &json.UnmarshalTypeError{ + Value: fmt.Sprint(t), + Type: reflect.TypeOf([]byte{}), + Offset: dec.InputOffset(), + } + } + return nil +} + +func (b *StringViewBuilder) Unmarshal(dec *json.Decoder) error { + for dec.More() { + if err := b.UnmarshalOne(dec); err != nil { + return err + } + } + return nil +} + +func (b *StringViewBuilder) UnmarshalJSON(data []byte) error { + dec := json.NewDecoder(bytes.NewReader(data)) + t, err := dec.Token() + if err != nil { + return err + } + + if delim, ok := t.(json.Delim); !ok || delim != '[' { + return fmt.Errorf("binary view builder must unpack from json array, found %s", delim) + } + + return b.Unmarshal(dec) +} + +func (b *StringViewBuilder) NewArray() arrow.Array { + return b.NewStringViewArray() +} + +func (b *StringViewBuilder) NewStringViewArray() (a *StringView) { + data := b.newData() + a = NewStringViewData(data) + data.Release() + return +} + type StringLikeBuilder interface { Builder Append(string) + AppendValues([]string, []bool) UnsafeAppend([]byte) ReserveData(int) } @@ -514,8 +691,11 @@ type StringLikeBuilder interface { var ( _ arrow.Array = (*String)(nil) _ arrow.Array = (*LargeString)(nil) + _ arrow.Array = (*StringView)(nil) _ Builder = (*StringBuilder)(nil) _ Builder = (*LargeStringBuilder)(nil) + _ Builder = (*StringViewBuilder)(nil) _ StringLikeBuilder = (*StringBuilder)(nil) _ StringLikeBuilder = (*LargeStringBuilder)(nil) + _ StringLikeBuilder = (*StringViewBuilder)(nil) ) diff --git a/go/arrow/array/string_test.go b/go/arrow/array/string_test.go index d743a3ec7f37f..803fae51347c1 100644 --- a/go/arrow/array/string_test.go +++ b/go/arrow/array/string_test.go @@ -619,3 +619,176 @@ func TestStringValueLen(t *testing.T) { assert.Equal(t, len(v), slice.ValueLen(i)) } } +func TestStringViewArray(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + var ( + // only the last string is long enough to not get inlined + want = []string{"hello", "世界", "", "say goodbye daffy"} + valids = []bool{true, true, false, true} + ) + + sb := array.NewStringViewBuilder(mem) + defer sb.Release() + + sb.Retain() + sb.Release() + + assert.NoError(t, sb.AppendValueFromString(want[0])) + sb.AppendValues(want[1:2], nil) + + sb.AppendNull() + sb.Append(want[3]) + + if got, want := sb.Len(), len(want); got != want { + t.Fatalf("invalid len: got=%d, want=%d", got, want) + } + + if got, want := sb.NullN(), 1; got != want { + t.Fatalf("invalid nulls: got=%d, want=%d", got, want) + } + + arr := sb.NewStringViewArray() + defer arr.Release() + + arr.Retain() + arr.Release() + + assert.Equal(t, "hello", arr.ValueStr(0)) + + if got, want := arr.Len(), len(want); got != want { + t.Fatalf("invalid len: got=%d, want=%d", got, want) + } + + if got, want := arr.NullN(), 1; got != want { + t.Fatalf("invalid nulls: got=%d, want=%d", got, want) + } + + for i := range want { + if arr.IsNull(i) != !valids[i] { + t.Fatalf("arr[%d]-validity: got=%v want=%v", i, !arr.IsNull(i), valids[i]) + } + switch { + case arr.IsNull(i): + default: + got := arr.Value(i) + if got != want[i] { + t.Fatalf("arr[%d]: got=%q, want=%q", i, got, want[i]) + } + } + } + + sub := array.MakeFromData(arr.Data()) + defer sub.Release() + + if sub.DataType().ID() != arrow.STRING_VIEW { + t.Fatalf("invalid type: got=%q, want=string view", sub.DataType().Name()) + } + + if _, ok := sub.(*array.StringView); !ok { + t.Fatalf("could not type-assert to array.String") + } + + if got, want := arr.String(), `["hello" "世界" (null) "say goodbye daffy"]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } + + // only the last string gets stuck into a buffer the rest are inlined + // in the headers. + if !bytes.Equal([]byte(`say goodbye daffy`), arr.Data().Buffers()[2].Bytes()) { + t.Fatalf("got=%q, want=%q", string(arr.Data().Buffers()[2].Bytes()), `say goodbye daffy`) + } + + // check the prefix for the non-inlined value + if [4]byte{'s', 'a', 'y', ' '} != arr.ValueHeader(3).Prefix() { + t.Fatalf("got=%q, want=%q", arr.ValueHeader(3).Prefix(), `say `) + } + + slice := array.NewSliceData(arr.Data(), 2, 4) + defer slice.Release() + + sub1 := array.MakeFromData(slice) + defer sub1.Release() + + v, ok := sub1.(*array.StringView) + if !ok { + t.Fatalf("could not type-assert to array.StringView") + } + + if got, want := v.String(), `[(null) "say goodbye daffy"]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } + + if !bytes.Equal([]byte(`say goodbye daffy`), v.Data().Buffers()[2].Bytes()) { + t.Fatalf("got=%q, want=%q", string(v.Data().Buffers()[2].Bytes()), `say goodbye daffy`) + } + + // check the prefix for the non-inlined value + if [4]byte{'s', 'a', 'y', ' '} != v.ValueHeader(1).Prefix() { + t.Fatalf("got=%q, want=%q", v.ValueHeader(1).Prefix(), `say `) + } +} + +func TestStringViewBuilder_Empty(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + want := []string{"hello", "世界", "", "say goodbye daffy"} + + ab := array.NewStringViewBuilder(mem) + defer ab.Release() + + stringValues := func(a *array.StringView) []string { + vs := make([]string, a.Len()) + for i := range vs { + vs[i] = a.Value(i) + } + return vs + } + + ab.AppendValues([]string{}, nil) + a := ab.NewStringViewArray() + assert.Zero(t, a.Len()) + a.Release() + + ab.AppendValues(nil, nil) + a = ab.NewStringViewArray() + assert.Zero(t, a.Len()) + a.Release() + + ab.AppendValues([]string{}, nil) + ab.AppendValues(want, nil) + a = ab.NewStringViewArray() + assert.Equal(t, want, stringValues(a)) + a.Release() + + ab.AppendValues(want, nil) + ab.AppendValues([]string{}, nil) + a = ab.NewStringViewArray() + assert.Equal(t, want, stringValues(a)) + a.Release() +} + +// TestStringReset tests the Reset() method on the String type by creating two different Strings and then +// reseting the contents of string2 with the values from string1. +func TestStringViewReset(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + sb1 := array.NewStringViewBuilder(mem) + sb2 := array.NewStringViewBuilder(mem) + defer sb1.Release() + defer sb2.Release() + + sb1.Append("string1") + sb1.AppendNull() + + var ( + string1 = sb1.NewStringViewArray() + string2 = sb2.NewStringViewArray() + + string1Data = string1.Data() + ) + string2.Reset(string1Data) + + assert.Equal(t, "string1", string2.Value(0)) +} diff --git a/go/arrow/compute/executor.go b/go/arrow/compute/executor.go index 1cba0b1e19f69..db89b206daf5f 100644 --- a/go/arrow/compute/executor.go +++ b/go/arrow/compute/executor.go @@ -171,6 +171,8 @@ func addComputeDataPrealloc(dt arrow.DataType, widths []bufferPrealloc) []buffer return append(widths, bufferPrealloc{bitWidth: 32, addLen: 1}) case arrow.LARGE_BINARY, arrow.LARGE_STRING, arrow.LARGE_LIST: return append(widths, bufferPrealloc{bitWidth: 64, addLen: 1}) + case arrow.STRING_VIEW, arrow.BINARY_VIEW: + return append(widths, bufferPrealloc{bitWidth: arrow.ViewHeaderSizeBytes * 8}) } return widths } @@ -1007,9 +1009,10 @@ func (v *vectorExecutor) WrapResults(ctx context.Context, out <-chan Datum, hasC case <-ctx.Done(): return nil case output = <-out: - if output == nil { + if output == nil || ctx.Err() != nil { return nil } + // if the inputs contained at least one chunked array // then we want to return chunked output if hasChunked { diff --git a/go/arrow/datatype.go b/go/arrow/datatype.go index 24113b55899dc..1e5d8fb98aa59 100644 --- a/go/arrow/datatype.go +++ b/go/arrow/datatype.go @@ -210,6 +210,11 @@ type BinaryDataType interface { binary() } +type BinaryViewDataType interface { + BinaryDataType + view() +} + type OffsetsDataType interface { DataType OffsetTypeTraits() OffsetTraits @@ -272,6 +277,8 @@ func (b BufferSpec) Equals(other BufferSpec) bool { type DataTypeLayout struct { Buffers []BufferSpec HasDict bool + // VariadicSpec is what the buffers beyond len(Buffers) are expected to conform to. + VariadicSpec *BufferSpec } func SpecFixedWidth(w int) BufferSpec { return BufferSpec{KindFixedWidth, w} } diff --git a/go/arrow/datatype_binary.go b/go/arrow/datatype_binary.go index a3a8568645052..f3e601f08ec79 100644 --- a/go/arrow/datatype_binary.go +++ b/go/arrow/datatype_binary.go @@ -83,16 +83,57 @@ func (t *LargeStringType) Layout() DataTypeLayout { func (t *LargeStringType) OffsetTypeTraits() OffsetTraits { return Int64Traits } func (LargeStringType) IsUtf8() bool { return true } +type BinaryViewType struct{} + +func (*BinaryViewType) ID() Type { return BINARY_VIEW } +func (*BinaryViewType) Name() string { return "binary_view" } +func (*BinaryViewType) String() string { return "binary_view" } +func (*BinaryViewType) IsUtf8() bool { return false } +func (*BinaryViewType) binary() {} +func (*BinaryViewType) view() {} +func (t *BinaryViewType) Fingerprint() string { return typeFingerprint(t) } +func (*BinaryViewType) Layout() DataTypeLayout { + variadic := SpecVariableWidth() + return DataTypeLayout{ + Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(ViewHeaderSizeBytes)}, + VariadicSpec: &variadic, + } +} + +type StringViewType struct{} + +func (*StringViewType) ID() Type { return STRING_VIEW } +func (*StringViewType) Name() string { return "string_view" } +func (*StringViewType) String() string { return "string_view" } +func (*StringViewType) IsUtf8() bool { return true } +func (*StringViewType) binary() {} +func (*StringViewType) view() {} +func (t *StringViewType) Fingerprint() string { return typeFingerprint(t) } +func (*StringViewType) Layout() DataTypeLayout { + variadic := SpecVariableWidth() + return DataTypeLayout{ + Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(ViewHeaderSizeBytes)}, + VariadicSpec: &variadic, + } +} + var ( BinaryTypes = struct { Binary BinaryDataType String BinaryDataType LargeBinary BinaryDataType LargeString BinaryDataType + BinaryView BinaryDataType + StringView BinaryDataType }{ Binary: &BinaryType{}, String: &StringType{}, LargeBinary: &LargeBinaryType{}, LargeString: &LargeStringType{}, + BinaryView: &BinaryViewType{}, + StringView: &StringViewType{}, } + + _ BinaryViewDataType = (*StringViewType)(nil) + _ BinaryViewDataType = (*BinaryViewType)(nil) ) diff --git a/go/arrow/datatype_binary_test.go b/go/arrow/datatype_binary_test.go index 25ba6e8db4ba4..083d69ee3e5d4 100644 --- a/go/arrow/datatype_binary_test.go +++ b/go/arrow/datatype_binary_test.go @@ -81,3 +81,33 @@ func TestLargeStringType(t *testing.T) { t.Fatalf("invalid string type stringer. got=%v, want=%v", got, want) } } + +func TestBinaryViewType(t *testing.T) { + var nt *arrow.BinaryViewType + if got, want := nt.ID(), arrow.BINARY_VIEW; got != want { + t.Fatalf("invalid string type id. got=%v, want=%v", got, want) + } + + if got, want := nt.Name(), "binary_view"; got != want { + t.Fatalf("invalid string type name. got=%v, want=%v", got, want) + } + + if got, want := nt.String(), "binary_view"; got != want { + t.Fatalf("invalid string type stringer. got=%v, want=%v", got, want) + } +} + +func TestStringViewType(t *testing.T) { + var nt *arrow.StringViewType + if got, want := nt.ID(), arrow.STRING_VIEW; got != want { + t.Fatalf("invalid string type id. got=%v, want=%v", got, want) + } + + if got, want := nt.Name(), "string_view"; got != want { + t.Fatalf("invalid string type name. got=%v, want=%v", got, want) + } + + if got, want := nt.String(), "string_view"; got != want { + t.Fatalf("invalid string type stringer. got=%v, want=%v", got, want) + } +} diff --git a/go/arrow/datatype_viewheader.go b/go/arrow/datatype_viewheader.go new file mode 100644 index 0000000000000..54b9256b34604 --- /dev/null +++ b/go/arrow/datatype_viewheader.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package arrow + +import ( + "bytes" + "unsafe" + + "github.com/apache/arrow/go/v15/arrow/endian" + "github.com/apache/arrow/go/v15/arrow/internal/debug" + "github.com/apache/arrow/go/v15/arrow/memory" +) + +const ( + ViewPrefixLen = 4 + viewInlineSize = 12 +) + +func IsViewInline(length int) bool { + return length < viewInlineSize +} + +// ViewHeader is a variable length string (utf8) or byte slice with +// a 4 byte prefix and inline optimization for small values (12 bytes +// or fewer). This is similar to Go's standard string but limited by +// a length of Uint32Max and up to the first four bytes of the string +// are copied into the struct. This prefix allows failing comparisons +// early and can reduce CPU cache working set when dealing with short +// strings. +// +// There are two situations: +// +// Entirely inlined string data +// |----|------------| +// ^ ^ +// | | +// size inline string data, zero padded +// +// Reference into buffer +// |----|----|----|----| +// ^ ^ ^ ^ +// | | | | +// size prefix buffer index and offset to out-of-line portion +// +// Adapted from TU Munich's UmbraDB [1], Velox, DuckDB. +// +// [1]: https://db.in.tum.de/~freitag/papers/p29-neumann-cidr20.pdf +type ViewHeader struct { + size int32 + // the first 4 bytes of this are the prefix for the string + // if size <= StringHeaderInlineSize, then the entire string + // is in the data array and is zero padded. + // if size > StringHeaderInlineSize, the next 8 bytes are 2 uint32 + // values which are the buffer index and offset in that buffer + // containing the full string. + data [viewInlineSize]byte +} + +func (sh *ViewHeader) IsInline() bool { + return sh.size <= int32(viewInlineSize) +} + +func (sh *ViewHeader) Len() int { return int(sh.size) } +func (sh *ViewHeader) Prefix() [ViewPrefixLen]byte { + return *(*[4]byte)(unsafe.Pointer(&sh.data)) +} + +func (sh *ViewHeader) BufferIndex() int32 { + return int32(endian.Native.Uint32(sh.data[ViewPrefixLen:])) +} + +func (sh *ViewHeader) BufferOffset() int32 { + return int32(endian.Native.Uint32(sh.data[ViewPrefixLen+4:])) +} + +func (sh *ViewHeader) InlineBytes() (data []byte) { + debug.Assert(sh.IsInline(), "calling InlineBytes on non-inline ViewHeader") + return sh.data[:sh.size] +} + +func (sh *ViewHeader) SetBytes(data []byte) int { + sh.size = int32(len(data)) + if sh.IsInline() { + return copy(sh.data[:], data) + } + return copy(sh.data[:4], data) +} + +func (sh *ViewHeader) SetString(data string) int { + sh.size = int32(len(data)) + if sh.IsInline() { + return copy(sh.data[:], data) + } + return copy(sh.data[:4], data) +} + +func (sh *ViewHeader) SetIndexOffset(bufferIndex, offset int32) { + endian.Native.PutUint32(sh.data[ViewPrefixLen:], uint32(bufferIndex)) + endian.Native.PutUint32(sh.data[ViewPrefixLen+4:], uint32(offset)) +} + +func (sh *ViewHeader) Equals(buffers []*memory.Buffer, other *ViewHeader, otherBuffers []*memory.Buffer) bool { + if sh.sizeAndPrefixAsInt64() != other.sizeAndPrefixAsInt64() { + return false + } + + if sh.IsInline() { + return sh.inlinedAsInt64() == other.inlinedAsInt64() + } + + return bytes.Equal(sh.getBufferBytes(buffers), other.getBufferBytes(otherBuffers)) +} + +func (sh *ViewHeader) getBufferBytes(buffers []*memory.Buffer) []byte { + offset := sh.BufferOffset() + return buffers[sh.BufferIndex()].Bytes()[offset : offset+sh.size] +} + +func (sh *ViewHeader) inlinedAsInt64() int64 { + s := unsafe.Slice((*int64)(unsafe.Pointer(sh)), 2) + return s[1] +} + +func (sh *ViewHeader) sizeAndPrefixAsInt64() int64 { + s := unsafe.Slice((*int64)(unsafe.Pointer(sh)), 2) + return s[0] +} diff --git a/go/arrow/datatype_viewheader_inline.go b/go/arrow/datatype_viewheader_inline.go new file mode 100644 index 0000000000000..89ac1d06adcdf --- /dev/null +++ b/go/arrow/datatype_viewheader_inline.go @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.20 + +package arrow + +import ( + "unsafe" + + "github.com/apache/arrow/go/v15/arrow/internal/debug" +) + +func (sh *ViewHeader) InlineString() (data string) { + debug.Assert(sh.IsInline(), "calling InlineString on non-inline ViewHeader") + + return unsafe.String((*byte)(unsafe.Pointer(&sh.data)), sh.size) +} diff --git a/go/arrow/datatype_viewheader_inline_go1.19.go b/go/arrow/datatype_viewheader_inline_go1.19.go new file mode 100644 index 0000000000000..aec66009d9492 --- /dev/null +++ b/go/arrow/datatype_viewheader_inline_go1.19.go @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !go1.20 && !tinygo + +package arrow + +import ( + "reflect" + "unsafe" + + "github.com/apache/arrow/go/v15/arrow/internal/debug" +) + +func (sh *ViewHeader) InlineString() (data string) { + debug.Assert(sh.IsInline(), "calling InlineString on non-inline ViewHeader") + + h := (*reflect.StringHeader)(unsafe.Pointer(&data)) + h.Data = uintptr(unsafe.Pointer(&sh.data)) + h.Len = int(sh.size) + return +} diff --git a/go/arrow/datatype_viewheader_inline_tinygo.go b/go/arrow/datatype_viewheader_inline_tinygo.go new file mode 100644 index 0000000000000..bff63a273a722 --- /dev/null +++ b/go/arrow/datatype_viewheader_inline_tinygo.go @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !go1.20 && tinygo + +package arrow + +import ( + "reflect" + "unsafe" + + "github.com/apache/arrow/go/v15/arrow/internal/debug" +) + +func (sh *ViewHeader) InlineString() (data string) { + debug.Assert(sh.IsInline(), "calling InlineString on non-inline ViewHeader") + + h := (*reflect.StringHeader)(unsafe.Pointer(&data)) + h.Data = uintptr(unsafe.Pointer(&sh.data)) + h.Len = uintptr(sh.size) + return +} diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go index 6631e4245c19d..985388094eb51 100644 --- a/go/arrow/internal/arrdata/arrdata.go +++ b/go/arrow/internal/arrdata/arrdata.go @@ -54,6 +54,7 @@ func init() { Records["extension"] = makeExtensionRecords() Records["union"] = makeUnionRecords() Records["run_end_encoded"] = makeRunEndEncodedRecords() + Records["view_types"] = makeStringViewRecords() for k := range Records { RecordNames = append(RecordNames, k) @@ -1155,6 +1156,65 @@ func makeRunEndEncodedRecords() []arrow.Record { return recs } +func makeStringViewRecords() []arrow.Record { + mem := memory.NewGoAllocator() + schema := arrow.NewSchema([]arrow.Field{ + {Name: "binary_view", Type: arrow.BinaryTypes.BinaryView, Nullable: true}, + {Name: "string_view", Type: arrow.BinaryTypes.StringView, Nullable: true}, + }, nil) + + mask := []bool{true, false, false, true, true} + chunks := [][]arrow.Array{ + { + viewTypeArrayOf(mem, [][]byte{[]byte("1é"), []byte("2"), []byte("3"), []byte("4"), []byte("5")}, mask), + viewTypeArrayOf(mem, []string{"1é", "2", "3", "4", "5"}, mask), + }, + { + viewTypeArrayOf(mem, [][]byte{[]byte("1é"), []byte("22222222222222"), []byte("33333333333333"), []byte("4444"), []byte("5555")}, mask), + viewTypeArrayOf(mem, []string{"1é", "22222222222222", "33333333333333", "4444", "5555"}, nil), + }, + { + viewTypeArrayOf(mem, [][]byte{[]byte("1é1é"), []byte("22222222222222"), []byte("33333333333333"), []byte("44"), []byte("55")}, nil), + viewTypeArrayOf(mem, []string{"1é1é", "22222222222222", "33333333333333", "44", "55"}, mask), + }, + } + + defer func() { + for _, chunk := range chunks { + for _, col := range chunk { + col.Release() + } + } + }() + + recs := make([]arrow.Record, len(chunks)) + for i, chunk := range chunks { + recs[i] = array.NewRecord(schema, chunk, -1) + } + + return recs +} + +func viewTypeArrayOf(mem memory.Allocator, a interface{}, valids []bool) arrow.Array { + if mem == nil { + mem = memory.NewGoAllocator() + } + + switch a := a.(type) { + case []string: + bldr := array.NewStringViewBuilder(mem) + defer bldr.Release() + bldr.AppendValues(a, valids) + return bldr.NewArray() + case [][]byte: + bldr := array.NewBinaryViewBuilder(mem) + defer bldr.Release() + bldr.AppendValues(a, valids) + return bldr.NewArray() + } + return nil +} + func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{}, valids []bool) arrow.Array { var storage arrow.Array switch st := dt.StorageType().(type) { @@ -1750,5 +1810,26 @@ func buildArray(bldr array.Builder, data arrow.Array) { bldr.AppendNull() } } + + case *array.BinaryViewBuilder: + data := data.(*array.BinaryView) + for i := 0; i < data.Len(); i++ { + switch { + case data.IsValid(i): + bldr.Append(data.Value(i)) + default: + bldr.AppendNull() + } + } + case *array.StringViewBuilder: + data := data.(*array.StringView) + for i := 0; i < data.Len(); i++ { + switch { + case data.IsValid(i): + bldr.Append(data.Value(i)) + default: + bldr.AppendNull() + } + } } } diff --git a/go/arrow/internal/arrjson/arrjson.go b/go/arrow/internal/arrjson/arrjson.go index 87bdc1f44d875..f74b615362642 100644 --- a/go/arrow/internal/arrjson/arrjson.go +++ b/go/arrow/internal/arrjson/arrjson.go @@ -158,6 +158,10 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, error) { typ = nameJSON{"utf8"} case *arrow.LargeStringType: typ = nameJSON{"largeutf8"} + case *arrow.BinaryViewType: + typ = nameJSON{"binaryview"} + case *arrow.StringViewType: + typ = nameJSON{"utf8view"} case *arrow.Date32Type: typ = unitZoneJSON{Name: "date", Unit: "DAY"} case *arrow.Date64Type: @@ -342,6 +346,10 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow arrowType = arrow.BinaryTypes.String case "largeutf8": arrowType = arrow.BinaryTypes.LargeString + case "binaryview": + arrowType = arrow.BinaryTypes.BinaryView + case "utf8view": + arrowType = arrow.BinaryTypes.StringView case "date": t := unitZoneJSON{} if err = json.Unmarshal(typ, &t); err != nil { @@ -818,6 +826,7 @@ type Array struct { Offset interface{} `json:"OFFSET,omitempty"` Size interface{} `json:"SIZE,omitempty"` Children []Array `json:"children,omitempty"` + Variadic []string `json:"VARIADIC_BUFFERS,omitempty"` } func (a *Array) MarshalJSON() ([]byte, error) { @@ -1078,6 +1087,18 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr bldr.AppendValues(data, valids) return returnNewArrayData(bldr) + case arrow.BinaryViewDataType: + valids := validsToBitmap(validsFromJSON(arr.Valids), mem) + nulls := arr.Count - bitutil.CountSetBits(valids.Bytes(), 0, arr.Count) + headers := stringHeadersFromJSON(mem, !dt.IsUtf8(), arr.Data) + extraBufs := variadicBuffersFromJSON(arr.Variadic) + defer valids.Release() + defer headers.Release() + + return array.NewData(dt, arr.Count, + append([]*memory.Buffer{valids, headers}, extraBufs...), + nil, nulls, 0) + case *arrow.ListType: valids := validsFromJSON(arr.Valids) elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0]) @@ -1486,6 +1507,24 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array { Offset: strOffsets, } + case *array.StringView: + variadic := variadicBuffersToJSON(arr.Data().Buffers()[2:]) + return Array{ + Name: field.Name, + Count: arr.Len(), + Valids: validsToJSON(arr), + Data: stringHeadersToJSON(arr, false), + Variadic: variadic, + } + case *array.BinaryView: + variadic := variadicBuffersToJSON(arr.Data().Buffers()[2:]) + return Array{ + Name: field.Name, + Count: arr.Len(), + Valids: validsToJSON(arr), + Data: stringHeadersToJSON(arr, true), + Variadic: variadic, + } case *array.List: o := Array{ Name: field.Name, @@ -2309,3 +2348,114 @@ func durationToJSON(arr *array.Duration) []interface{} { } return o } + +func variadicBuffersFromJSON(bufs []string) []*memory.Buffer { + out := make([]*memory.Buffer, len(bufs)) + for i, data := range bufs { + rawData, err := hex.DecodeString(data) + if err != nil { + panic(err) + } + + out[i] = memory.NewBufferBytes(rawData) + } + return out +} + +func variadicBuffersToJSON(bufs []*memory.Buffer) []string { + out := make([]string, len(bufs)) + for i, data := range bufs { + out[i] = strings.ToUpper(hex.EncodeToString(data.Bytes())) + } + return out +} + +func stringHeadersFromJSON(mem memory.Allocator, isBinary bool, data []interface{}) *memory.Buffer { + buf := memory.NewResizableBuffer(mem) + buf.Resize(arrow.ViewHeaderTraits.BytesRequired(len(data))) + + values := arrow.ViewHeaderTraits.CastFromBytes(buf.Bytes()) + + for i, d := range data { + switch v := d.(type) { + case nil: + continue + case map[string]interface{}: + if inlined, ok := v["INLINED"]; ok { + if isBinary { + val, err := hex.DecodeString(inlined.(string)) + if err != nil { + panic(fmt.Errorf("could not decode %v: %v", inlined, err)) + } + values[i].SetBytes(val) + } else { + values[i].SetString(inlined.(string)) + } + continue + } + + idx, offset := v["BUFFER_INDEX"].(json.Number), v["OFFSET"].(json.Number) + bufIdx, err := idx.Int64() + if err != nil { + panic(err) + } + + bufOffset, err := offset.Int64() + if err != nil { + panic(err) + } + + values[i].SetIndexOffset(int32(bufIdx), int32(bufOffset)) + prefix, err := hex.DecodeString(v["PREFIX"].(string)) + if err != nil { + panic(err) + } + sz, err := v["SIZE"].(json.Number).Int64() + if err != nil { + panic(err) + } + + rawData := make([]byte, sz) + copy(rawData, prefix) + values[i].SetBytes(rawData) + } + } + return buf +} + +func stringHeadersToJSON(arr array.ViewLike, isBinary bool) []interface{} { + type StringHeader struct { + Size int `json:"SIZE"` + Prefix *string `json:"PREFIX,omitempty"` + BufferIdx *int `json:"BUFFER_INDEX,omitempty"` + BufferOff *int `json:"OFFSET,omitempty"` + Inlined *string `json:"INLINED,omitempty"` + } + + o := make([]interface{}, arr.Len()) + for i := range o { + hdr := arr.ValueHeader(i) + if hdr.IsInline() { + data := hdr.InlineString() + if isBinary { + data = strings.ToUpper(hex.EncodeToString(hdr.InlineBytes())) + } + o[i] = StringHeader{ + Size: hdr.Len(), + Inlined: &data, + } + continue + } + + idx, off := int(hdr.BufferIndex()), int(hdr.BufferOffset()) + prefix := hdr.Prefix() + encodedPrefix := strings.ToUpper(hex.EncodeToString(prefix[:])) + o[i] = StringHeader{ + Size: hdr.Len(), + Prefix: &encodedPrefix, + BufferIdx: &idx, + BufferOff: &off, + } + } + return o +} diff --git a/go/arrow/internal/arrjson/arrjson_test.go b/go/arrow/internal/arrjson/arrjson_test.go index 7beadee370edb..31f3cb238ec16 100644 --- a/go/arrow/internal/arrjson/arrjson_test.go +++ b/go/arrow/internal/arrjson/arrjson_test.go @@ -48,6 +48,7 @@ func TestReadWrite(t *testing.T) { wantJSONs["dictionary"] = makeDictionaryWantJSONs() wantJSONs["union"] = makeUnionWantJSONs() wantJSONs["run_end_encoded"] = makeRunEndEncodedWantJSONs() + wantJSONs["view_types"] = makeViewTypesWantJSONs() tempDir := t.TempDir() for name, recs := range arrdata.Records { @@ -6127,3 +6128,261 @@ func makeRunEndEncodedWantJSONs() string { ] }` } + +func makeViewTypesWantJSONs() string { + return `{ + "schema": { + "fields": [ + { + "name": "binary_view", + "type": { + "name": "binaryview" + }, + "nullable": true, + "children": [] + }, + { + "name": "string_view", + "type": { + "name": "utf8view" + }, + "nullable": true, + "children": [] + } + ] + }, + "batches": [ + { + "count": 5, + "columns": [ + { + "name": "binary_view", + "count": 5, + "VALIDITY": [ + 1, + 0, + 0, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 3, + "INLINED": "31C3A9" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 1, + "INLINED": "34" + }, + { + "SIZE": 1, + "INLINED": "35" + } + ], + "VARIADIC_BUFFERS": [""] + }, + { + "name": "string_view", + "count": 5, + "VALIDITY": [ + 1, + 0, + 0, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 3, + "INLINED": "1é" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 1, + "INLINED": "4" + }, + { + "SIZE": 1, + "INLINED": "5" + } + ], + "VARIADIC_BUFFERS": [""] + } + ] + }, + { + "count": 5, + "columns": [ + { + "name": "binary_view", + "count": 5, + "VALIDITY": [ + 1, + 0, + 0, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 3, + "INLINED": "31C3A9" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 4, + "INLINED": "34343434" + }, + { + "SIZE": 4, + "INLINED": "35353535" + } + ], + "VARIADIC_BUFFERS": [""] + }, + { + "name": "string_view", + "count": 5, + "VALIDITY": [ + 1, + 1, + 1, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 3, + "INLINED": "1é" + }, + { + "SIZE": 14, + "PREFIX": "32323232", + "BUFFER_INDEX": 0, + "OFFSET": 0 + }, + { + "SIZE": 14, + "PREFIX": "33333333", + "BUFFER_INDEX": 0, + "OFFSET": 14 + }, + { + "SIZE": 4, + "INLINED": "4444" + }, + { + "SIZE": 4, + "INLINED": "5555" + } + ], + "VARIADIC_BUFFERS": [ + "32323232323232323232323232323333333333333333333333333333" + ] + } + ] + }, + { + "count": 5, + "columns": [ + { + "name": "binary_view", + "count": 5, + "VALIDITY": [ + 1, + 1, + 1, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 6, + "INLINED": "31C3A931C3A9" + }, + { + "SIZE": 14, + "PREFIX": "32323232", + "BUFFER_INDEX": 0, + "OFFSET": 0 + }, + { + "SIZE": 14, + "PREFIX": "33333333", + "BUFFER_INDEX": 0, + "OFFSET": 14 + }, + { + "SIZE": 2, + "INLINED": "3434" + }, + { + "SIZE": 2, + "INLINED": "3535" + } + ], + "VARIADIC_BUFFERS": [ + "32323232323232323232323232323333333333333333333333333333" + ] + }, + { + "name": "string_view", + "count": 5, + "VALIDITY": [ + 1, + 0, + 0, + 1, + 1 + ], + "DATA": [ + { + "SIZE": 6, + "INLINED": "1é1é" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 0, + "INLINED": "" + }, + { + "SIZE": 2, + "INLINED": "44" + }, + { + "SIZE": 2, + "INLINED": "55" + } + ], + "VARIADIC_BUFFERS": [""] + } + ] + } + ] +}` +} diff --git a/go/arrow/internal/flatbuf/MetadataVersion.go b/go/arrow/internal/flatbuf/MetadataVersion.go index 21b234f9c2b21..bb5e99dd588ad 100644 --- a/go/arrow/internal/flatbuf/MetadataVersion.go +++ b/go/arrow/internal/flatbuf/MetadataVersion.go @@ -31,7 +31,7 @@ const ( MetadataVersionV3 MetadataVersion = 2 /// >= 0.8.0 (December 2017). Non-backwards compatible with V3. MetadataVersionV4 MetadataVersion = 3 - /// >= 1.0.0 (July 2020. Backwards compatible with V4 (V5 readers can read V4 + /// >= 1.0.0 (July 2020). Backwards compatible with V4 (V5 readers can read V4 /// metadata and IPC messages). Implementations are recommended to provide a /// V4 compatibility mode with V5 format changes disabled. /// diff --git a/go/arrow/internal/testing/gen/random_array_gen.go b/go/arrow/internal/testing/gen/random_array_gen.go index b42273ff93fac..57b417bd2b878 100644 --- a/go/arrow/internal/testing/gen/random_array_gen.go +++ b/go/arrow/internal/testing/gen/random_array_gen.go @@ -351,6 +351,40 @@ func (r *RandomArrayGenerator) LargeString(size int64, minLength, maxLength int6 return bldr.NewArray() } +func (r *RandomArrayGenerator) StringView(size int64, minLength, maxLength int64, nullProb float64) arrow.Array { + return r.generateBinaryView(arrow.BinaryTypes.StringView, size, minLength, maxLength, nullProb) +} + +func (r *RandomArrayGenerator) generateBinaryView(dt arrow.DataType, size int64, minLength, maxLength int64, nullProb float64) arrow.Array { + lengths := r.Int32(size, int32(minLength), int32(maxLength), nullProb).(*array.Int32) + defer lengths.Release() + + bldr := array.NewBuilder(r.mem, dt).(array.StringLikeBuilder) + defer bldr.Release() + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + + buf := make([]byte, 0, maxLength) + gen := func(n int32) string { + out := buf[:n] + for i := range out { + out[i] = uint8(dist.Int31n(int32('z')-int32('A')+1) + int32('A')) + } + return string(out) + } + + for i := 0; i < lengths.Len(); i++ { + if lengths.IsNull(i) { + bldr.AppendNull() + continue + } + bldr.Append(gen(lengths.Value(i))) + } + + return bldr.NewArray() +} + func (r *RandomArrayGenerator) Numeric(dt arrow.Type, size int64, min, max int64, nullprob float64) arrow.Array { switch dt { case arrow.INT8: diff --git a/go/arrow/ipc/endian_swap.go b/go/arrow/ipc/endian_swap.go index d2e0948434abc..35ba0e4e764f9 100644 --- a/go/arrow/ipc/endian_swap.go +++ b/go/arrow/ipc/endian_swap.go @@ -18,6 +18,7 @@ package ipc import ( "errors" + "fmt" "math/bits" "github.com/apache/arrow/go/v15/arrow" @@ -119,7 +120,10 @@ func swapType(dt arrow.DataType, data *array.Data) (err error) { return swapType(dt.IndexType, data) case arrow.FixedWidthDataType: byteSwapBuffer(dt.BitWidth(), data.Buffers()[1]) + default: + err = fmt.Errorf("%w: swapping endianness of %s", arrow.ErrNotImplemented, dt) } + return } diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 330355d3a60c3..1c7eb31799cfa 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -430,13 +430,18 @@ func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode { return &node } +func (src *ipcSource) variadicCount(i int) int64 { + return src.meta.VariadicBufferCounts(i) +} + type arrayLoaderContext struct { - src ipcSource - ifield int - ibuffer int - max int - memo *dictutils.Memo - version MetadataVersion + src ipcSource + ifield int + ibuffer int + ivariadic int + max int + memo *dictutils.Memo + version MetadataVersion } func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode { @@ -451,6 +456,12 @@ func (ctx *arrayLoaderContext) buffer() *memory.Buffer { return buf } +func (ctx *arrayLoaderContext) variadic() int64 { + v := ctx.src.variadicCount(ctx.ivariadic) + ctx.ivariadic++ + return v +} + func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData { switch dt := dt.(type) { case *arrow.NullType: @@ -476,6 +487,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData { case *arrow.BinaryType, *arrow.StringType, *arrow.LargeStringType, *arrow.LargeBinaryType: return ctx.loadBinary(dt) + case arrow.BinaryViewDataType: + return ctx.loadBinaryView(dt) + case *arrow.FixedSizeBinaryType: return ctx.loadFixedSizeBinary(dt) @@ -582,6 +596,18 @@ func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) arrow.ArrayData { return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) } +func (ctx *arrayLoaderContext) loadBinaryView(dt arrow.DataType) arrow.ArrayData { + nVariadicBufs := ctx.variadic() + field, buffers := ctx.loadCommon(dt.ID(), 2+int(nVariadicBufs)) + buffers = append(buffers, ctx.buffer()) + for i := 0; i < int(nVariadicBufs); i++ { + buffers = append(buffers, ctx.buffer()) + } + defer releaseBuffers(buffers) + + return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) +} + func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) buffers = append(buffers, ctx.buffer()) diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go index 709aa5aa2dba4..5295c5df30137 100644 --- a/go/arrow/ipc/message.go +++ b/go/arrow/ipc/message.go @@ -31,11 +31,11 @@ import ( type MetadataVersion flatbuf.MetadataVersion const ( - MetadataV1 = MetadataVersion(flatbuf.MetadataVersionV1) // version for Arrow-0.1.0 - MetadataV2 = MetadataVersion(flatbuf.MetadataVersionV2) // version for Arrow-0.2.0 - MetadataV3 = MetadataVersion(flatbuf.MetadataVersionV3) // version for Arrow-0.3.0 to 0.7.1 - MetadataV4 = MetadataVersion(flatbuf.MetadataVersionV4) // version for >= Arrow-0.8.0 - MetadataV5 = MetadataVersion(flatbuf.MetadataVersionV5) // version for >= Arrow-1.0.0, backward compatible with v4 + MetadataV1 = MetadataVersion(flatbuf.MetadataVersionV1) // version for Arrow Format-0.1.0 + MetadataV2 = MetadataVersion(flatbuf.MetadataVersionV2) // version for Arrow Format-0.2.0 + MetadataV3 = MetadataVersion(flatbuf.MetadataVersionV3) // version for Arrow Format-0.3.0 to 0.7.1 + MetadataV4 = MetadataVersion(flatbuf.MetadataVersionV4) // version for >= Arrow Format-0.8.0 + MetadataV5 = MetadataVersion(flatbuf.MetadataVersionV5) // version for >= Arrow Format-1.0.0, backward compatible with v4 ) func (m MetadataVersion) String() string { diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index bd437834c3d06..54ef58753a173 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -323,6 +323,16 @@ func (fv *fieldVisitor) visit(field arrow.Field) { flatbuf.LargeUtf8Start(fv.b) fv.offset = flatbuf.LargeUtf8End(fv.b) + case *arrow.BinaryViewType: + fv.dtype = flatbuf.TypeBinaryView + flatbuf.BinaryViewStart(fv.b) + fv.offset = flatbuf.BinaryViewEnd(fv.b) + + case *arrow.StringViewType: + fv.dtype = flatbuf.TypeUtf8View + flatbuf.Utf8ViewStart(fv.b) + fv.offset = flatbuf.Utf8ViewEnd(fv.b) + case *arrow.Date32Type: fv.dtype = flatbuf.TypeDate flatbuf.DateStart(fv.b) @@ -713,6 +723,12 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr case flatbuf.TypeLargeUtf8: return arrow.BinaryTypes.LargeString, nil + case flatbuf.TypeUtf8View: + return arrow.BinaryTypes.StringView, nil + + case flatbuf.TypeBinaryView: + return arrow.BinaryTypes.BinaryView, nil + case flatbuf.TypeBool: return arrow.FixedWidthTypes.Boolean, nil @@ -1168,15 +1184,15 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) return err } -func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) *memory.Buffer { +func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer { b := flatbuffers.NewBuilder(0) - recFB := recordToFB(b, size, bodyLength, fields, meta, codec) + recFB := recordToFB(b, size, bodyLength, fields, meta, codec, variadicCounts) return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength) } -func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) *memory.Buffer { +func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer { b := flatbuffers.NewBuilder(0) - recFB := recordToFB(b, size, bodyLength, fields, meta, codec) + recFB := recordToFB(b, size, bodyLength, fields, meta, codec, variadicCounts) flatbuf.DictionaryBatchStart(b) flatbuf.DictionaryBatchAddId(b, id) @@ -1186,7 +1202,7 @@ func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size, return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, dictFB, bodyLength) } -func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) flatbuffers.UOffsetT { +func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) flatbuffers.UOffsetT { fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector) metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector) var bodyCompressFB flatbuffers.UOffsetT @@ -1194,10 +1210,24 @@ func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMe bodyCompressFB = writeBodyCompression(b, codec) } + var vcFB *flatbuffers.UOffsetT + if len(variadicCounts) > 0 { + flatbuf.RecordBatchStartVariadicBufferCountsVector(b, len(variadicCounts)) + for i := len(variadicCounts) - 1; i >= 0; i-- { + b.PrependInt64(variadicCounts[i]) + } + vcFBVal := b.EndVector(len(variadicCounts)) + vcFB = &vcFBVal + } + flatbuf.RecordBatchStart(b) flatbuf.RecordBatchAddLength(b, size) flatbuf.RecordBatchAddNodes(b, fieldsFB) flatbuf.RecordBatchAddBuffers(b, metaFB) + if vcFB != nil { + flatbuf.RecordBatchAddVariadicBufferCounts(b, *vcFB) + } + if codec != -1 { flatbuf.RecordBatchAddCompression(b, bodyCompressFB) } diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 58c56d2d16ccf..e9d59f0e35e00 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -277,7 +277,7 @@ type dictEncoder struct { } func (d *dictEncoder) encodeMetadata(p *Payload, isDelta bool, id, nrows int64) error { - p.meta = writeDictionaryMessage(d.mem, id, isDelta, nrows, p.size, d.fields, d.meta, d.codec) + p.meta = writeDictionaryMessage(d.mem, id, isDelta, nrows, p.size, d.fields, d.meta, d.codec, d.variadicCounts) return nil } @@ -300,8 +300,9 @@ func (d *dictEncoder) Encode(p *Payload, id int64, isDelta bool, dict arrow.Arra type recordEncoder struct { mem memory.Allocator - fields []fieldMetadata - meta []bufferMetadata + fields []fieldMetadata + meta []bufferMetadata + variadicCounts []int64 depth int64 start int64 @@ -602,6 +603,33 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { p.body = append(p.body, voffsets) p.body = append(p.body, values) + case arrow.BinaryViewDataType: + data := arr.Data() + values := data.Buffers()[1] + arrLen := int64(arr.Len()) + typeWidth := int64(arrow.ViewHeaderSizeBytes) + minLength := paddedLength(arrLen*typeWidth, kArrowAlignment) + + switch { + case needTruncate(int64(data.Offset()), values, minLength): + // non-zero offset: slice the buffer + offset := data.Offset() * int(typeWidth) + // send padding if available + len := int(minI64(bitutil.CeilByte64(arrLen*typeWidth), int64(values.Len()-offset))) + values = memory.SliceBuffer(values, offset, len) + default: + if values != nil { + values.Retain() + } + } + p.body = append(p.body, values) + + w.variadicCounts = append(w.variadicCounts, int64(len(data.Buffers())-2)) + for _, b := range data.Buffers()[2:] { + b.Retain() + p.body = append(p.body, b) + } + case *arrow.StructType: w.depth-- arr := arr.(*array.Struct) @@ -946,7 +974,7 @@ func (w *recordEncoder) Encode(p *Payload, rec arrow.Record) error { } func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error { - p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, w.codec) + p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, w.codec, w.variadicCounts) return nil } diff --git a/go/arrow/type_traits_view.go b/go/arrow/type_traits_view.go new file mode 100644 index 0000000000000..c3846db294681 --- /dev/null +++ b/go/arrow/type_traits_view.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package arrow + +import ( + "reflect" + "unsafe" + + "github.com/apache/arrow/go/v15/arrow/endian" +) + +var ViewHeaderTraits viewHeaderTraits + +const ( + ViewHeaderSizeBytes = int(unsafe.Sizeof(ViewHeader{})) +) + +type viewHeaderTraits struct{} + +func (viewHeaderTraits) BytesRequired(n int) int { return ViewHeaderSizeBytes * n } + +func (viewHeaderTraits) PutValue(b []byte, v ViewHeader) { + endian.Native.PutUint32(b, uint32(v.size)) + copy(b[4:], v.data[:]) +} + +func (viewHeaderTraits) CastFromBytes(b []byte) (res []ViewHeader) { + h := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + + return unsafe.Slice((*ViewHeader)(unsafe.Pointer(h.Data)), cap(b)/ViewHeaderSizeBytes)[:len(b)/ViewHeaderSizeBytes] +} + +func (viewHeaderTraits) CastToBytes(b []ViewHeader) (res []byte) { + h := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + + return unsafe.Slice((*byte)(unsafe.Pointer(h.Data)), cap(b)*ViewHeaderSizeBytes)[:len(b)*ViewHeaderSizeBytes] +} + +func (viewHeaderTraits) Copy(dst, src []ViewHeader) { copy(dst, src) }