Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add string interning to SyncIterator #3411

Merged
merged 14 commits into from
Feb 27, 2024
33 changes: 30 additions & 3 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/intern"
"github.com/opentracing/opentracing-go"
pq "github.com/parquet-go/parquet-go"
)
Expand Down Expand Up @@ -498,6 +499,14 @@ func ReleaseResult(r *IteratorResult) {
}
}

type SyncIteratorOpt func(*SyncIterator)

func SyncIteratorOptIntern() SyncIteratorOpt {
return func(i *SyncIterator) {
i.intern = true
}
}

// SyncIterator is like ColumnIterator but synchronous. It scans through the given row
// groups and column, and applies the optional predicate to each chunk, page, and value.
// Results are read by calling Next() until it returns nil.
Expand Down Expand Up @@ -526,11 +535,14 @@ type SyncIterator struct {
currBuf []pq.Value
currBufN int
currPageN int

intern bool
interner *intern.Interner
}

var _ Iterator = (*SyncIterator)(nil)

func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator {
func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string, opts ...SyncIteratorOpt) *SyncIterator {
// Assign row group bounds.
// Lower bound is inclusive
// Upper bound is exclusive, points at the first row of the next group
Expand All @@ -549,7 +561,8 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnN
"column": columnName,
})

return &SyncIterator{
// Create the iterator
i := &SyncIterator{
span: span,
column: column,
columnName: columnName,
Expand All @@ -560,7 +573,15 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnN
rgsMax: rgsMax,
filter: &InstrumentedPredicate{pred: filter},
curr: EmptyRowNumber(),
interner: intern.New(),
}

// Apply options
for _, opt := range opts {
opt(i)
}

return i
}

func (c *SyncIterator) String() string {
Expand Down Expand Up @@ -933,7 +954,11 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
r := GetResult()
r.RowNumber = t
if c.selectAs != "" {
r.AppendValue(c.selectAs, v.Clone())
if c.intern {
r.AppendValue(c.selectAs, c.interner.UnsafeClone(v))
} else {
r.AppendValue(c.selectAs, v.Clone())
}
}
return r
}
Expand All @@ -948,6 +973,8 @@ func (c *SyncIterator) Close() {
c.span.SetTag("keptPages", c.filter.KeptPages)
c.span.SetTag("keptValues", c.filter.KeptValues)
c.span.Finish()

c.interner.Close()
}

// ColumnIterator asynchronously iterates through the given row groups and column. Applies
Expand Down
83 changes: 83 additions & 0 deletions pkg/util/intern/intern.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package intern

import (
"sync"
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

type Interner struct {
mtx sync.RWMutex
m map[string][]byte
}

func New() *Interner {
return NewWithSize(0)
}

func NewWithSize(size int) *Interner {
return &Interner{m: make(map[string][]byte, size)}
}

func (i *Interner) UnsafeClone(v *pq.Value) pq.Value {
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
// Look away, this is unsafe.
a := *(*pqValue)(unsafe.Pointer(v))
a.ptr = addressOfBytes(i.internBytes(a.byteArray()))
return *(*pq.Value)(unsafe.Pointer(&a))
default:
return *v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this incur an extra value copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we pass pq.Value as values (🥁) in Entries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we cloning values unnecessarily? For any non-byte array/fixed len byte array by cloning here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but values are used everywhere. Changing this to pointers is a big change in the codebase, that I'd prefer to keep out from this PR.

FWIW, v.Clone() is/was creating a copy of pq.Value too. This is unchanged in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the line be v.Clone() to make it consistent with the way it used to be? i'm honestly not sure.

Copy link
Contributor

@mdisibio mdisibio Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all other types (int, float, etc) the struct doesn't contain external pointers so a copy of the contents is fine. That's what Clone() does: https://github.com/parquet-go/parquet-go/blob/main/value.go#L792 Avoiding it is fine because it would incur another type check unnecessarily.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with this leaving as is in the PR, but it does feel safer to call .Clone() in case that internal behavior changes.

}
}

func (i *Interner) internBytes(b []byte) []byte {
s := bytesToString(b)

i.mtx.RLock()
if x, ok := i.m[s]; ok {
i.mtx.RUnlock()
return x
}
i.mtx.RUnlock()

i.mtx.Lock()
defer i.mtx.Unlock()

clone := make([]byte, len(b))
copy(clone, b)
i.m[s] = clone
return clone
}

func (i *Interner) Close() {
i.mtx.Lock()
clear(i.m) // clear the map
i.m = nil
i.mtx.Unlock()
}

// bytesToString converts a byte slice to a string.
func bytesToString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}

//go:linkname addressOfBytes github.com/parquet-go/parquet-go/internal/unsafecast.AddressOfBytes
func addressOfBytes(data []byte) *byte

//go:linkname bytes github.com/parquet-go/parquet-go/internal/unsafecast.Bytes
func bytes(data *byte, size int) []byte

// pqValue is a slimmer version of parquet-go's pq.Value.
type pqValue struct {
// data
ptr *byte
u64 uint64
// type
kind int8 // XOR(Kind) so the zero-value is <null>
}

func (v *pqValue) byteArray() []byte {
return bytes(v.ptr, int(v.u64))
}
67 changes: 67 additions & 0 deletions pkg/util/intern/intern_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package intern

import (
"testing"
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

func TestInterner_internBytes(t *testing.T) {
i := New()
defer i.Close()

words := []string{"hello", "world", "hello", "world", "hello", "world"}
for _, w := range words {
_ = i.internBytes([]byte(w))
}
if len(i.m) != 2 {
// Values are interned, so there should be only 2 unique words
t.Errorf("expected 2, got %d", len(i.m))
}
if i.internBytes([]byte("hello"))[0] != i.internBytes([]byte("hello"))[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}
}

func TestInterner_UnsafeClone(t *testing.T) {
i := New()
defer i.Close()

value1 := pq.ByteArrayValue([]byte("foo"))
value2 := pq.ByteArrayValue([]byte("foo"))

clone1 := i.UnsafeClone(&value1)
clone2 := i.UnsafeClone(&value2)

if clone1.ByteArray()[0] != clone2.ByteArray()[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}

if value1.ByteArray()[0] != value2.ByteArray()[0] {
// Mutates the original value, so the memory address should be different as well
t.Error("expected same memory address")
}
}

func Test_pqValue(t *testing.T) {
// Test that conversion from pq.Value to pqValue and back to pq.Value
// does not change the value.
value := pq.ByteArrayValue([]byte("foo"))
pqValue := *(*pqValue)(unsafe.Pointer(&value))
back := *(*pq.Value)(unsafe.Pointer(&pqValue))

if value.Kind() != back.Kind() {
t.Error("expected same kind")
}

if string(value.ByteArray()) != string(back.ByteArray()) {
t.Error("expected same value")
}

if value.String() != back.String() {
t.Error("expected same value")
}
}
7 changes: 6 additions & 1 deletion tempodb/encoding/vparquet3/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,12 @@ func makeIterFunc(ctx context.Context, rgs []parquet.RowGroup, pf *parquet.File)
return pq.NewColumnIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
var opts []pq.SyncIteratorOpt
if name != columnPathSpanID && name != columnPathTraceID {
opts = append(opts, pq.SyncIteratorOptIntern())
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs, opts...)
}
}

Expand Down
14 changes: 9 additions & 5 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,13 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) {
ctx := context.TODO()
tenantID := "1"
// blockID := uuid.MustParse("00000c2f-8133-4a60-a62a-7748bd146938")
blockID := uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
//blockID := uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
blockID := uuid.MustParse("0008e57d-069d-4510-a001-b9433b2da08c")

r, _, _, err := local.New(&local.Config{
// Path: path.Join("/home/joe/testblock/"),
Path: path.Join("/Users/marty/src/tmp"),
//Path: path.Join("/Users/marty/src/tmp"),
Path: path.Join("/Users/mapno/workspace/testblock"),
mapno marked this conversation as resolved.
Show resolved Hide resolved
})
require.NoError(b, err)

Expand Down Expand Up @@ -685,7 +687,7 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
"{} | rate()",
"{} | rate() by (name)",
"{} | rate() by (resource.service.name)",
"{resource.service.name=`tempo-gateway`} | rate()",
"{resource.service.name=`loki-ingester`} | rate()",
mapno marked this conversation as resolved.
Show resolved Hide resolved
"{status=error} | rate()",
}

Expand All @@ -694,9 +696,11 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
e = traceql.NewEngine()
opts = common.DefaultSearchOptions()
tenantID = "1"
blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
//blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
blockID = uuid.MustParse("0008e57d-069d-4510-a001-b9433b2da08c")
// blockID = uuid.MustParse("18364616-f80d-45a6-b2a3-cb63e203edff")
path = "/Users/marty/src/tmp/"
//path = "/Users/marty/src/tmp/"
path = "/Users/mapno/workspace/testblock"
)

r, _, _, err := local.New(&local.Config{
Expand Down
Loading