Skip to content

Commit

Permalink
Merge #96237
Browse files Browse the repository at this point in the history
96237: kv: add simple bulk put routines for vectorized insert r=nvanbenschoten a=cucaroach

These are simple analogs to existing APIs that do all the memory
allocations in bulk.

Epic: CRDB-18892
Informs: #91831
Release note: None


Co-authored-by: Tommy Reilly <treilly@cockroachlabs.com>
  • Loading branch information
craig[bot] and cucaroach committed Mar 2, 2023
2 parents 810808c + 23f3f98 commit ac95590
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 1 deletion.
194 changes: 194 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ type Batch struct {
rowsStaticIdx int
}

// GValue is a generic value for use in generic code.
type GValue interface {
[]byte | roachpb.Value
}

// BulkSource is a generator interface for efficiently adding lots of requests.
type BulkSource[T GValue] interface {
// Len will be called to batch allocate resources, the iterator should return
// exactly Len KVs.
Len() int
// Iter will be called to retrieve KVs and add them to the Batch.
Iter() BulkSourceIterator[T]
}

// BulkSourceIterator is the iterator interface for bulk put operations.
type BulkSourceIterator[T GValue] interface {
// Next returns the next KV, calling this more than Len() times is undefined.
Next() (roachpb.Key, T)
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
Expand Down Expand Up @@ -428,6 +448,51 @@ func (b *Batch) Put(key, value interface{}) {
b.put(key, value, false)
}

// PutBytes allows an arbitrary number of PutRequests to be added to the batch.
func (b *Batch) PutBytes(bs BulkSource[[]byte]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.PutRequest
union kvpb.RequestUnion_Put
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.Put = pr
i++
k, v := bsi.Next()
pr.Key = k
pr.Value.SetBytes(v)
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// PutTuples allows multiple tuple value type puts to be added to the batch using
// BulkSource interface.
func (b *Batch) PutTuples(bs BulkSource[[]byte]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.PutRequest
union kvpb.RequestUnion_Put
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.Put = pr
i++
k, v := bsi.Next()
pr.Key = k
pr.Value.SetTuple(v)
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// PutInline sets the value for a key, but does not maintain
// multi-version values. The most recent value is always overwritten.
// Inline values cannot be mutated transactionally and should be used
Expand Down Expand Up @@ -511,6 +576,58 @@ func (b *Batch) cputInternal(
b.initResult(1, 1, notRaw, nil)
}

// CPutTuplesEmpty allows multiple CPut tuple requests to be added to the batch
// as tuples using the BulkSource interface. The values for these keys are
// expected to be empty.
func (b *Batch) CPutTuplesEmpty(bs BulkSource[[]byte]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.ConditionalPutRequest
union kvpb.RequestUnion_ConditionalPut
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.ConditionalPut = pr
pr.AllowIfDoesNotExist = false
pr.ExpBytes = nil
i++
k, v := bsi.Next()
pr.Key = k
pr.Value.SetTuple(v)
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// CPutValuesEmpty allows multiple CPut tuple requests to be added to the batch
// as roachpb.Values using the BulkSource interface. The values for these keys
// are expected to be empty.
func (b *Batch) CPutValuesEmpty(bs BulkSource[roachpb.Value]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.ConditionalPutRequest
union kvpb.RequestUnion_ConditionalPut
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.ConditionalPut = pr
pr.AllowIfDoesNotExist = false
pr.ExpBytes = nil
i++
k, v := bsi.Next()
pr.Key = k
pr.Value = v
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// InitPut sets the first value for a key to value. An ConditionFailedError is
// reported if a value already exists for the key and it's not equal to the
// value passed in. If failOnTombstones is set to true, tombstones will return
Expand All @@ -535,6 +652,52 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
b.initResult(1, 1, notRaw, nil)
}

// InitPutBytes allows multiple []byte value type InitPut requests to be added to
// the batch using BulkSource interface.
func (b *Batch) InitPutBytes(bs BulkSource[[]byte]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.InitPutRequest
union kvpb.RequestUnion_InitPut
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.InitPut = pr
i++
k, v := bsi.Next()
pr.Key = k
pr.Value.SetBytes(v)
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// InitPutTuples allows multiple tuple value type InitPut to be added to the
// batch using BulkSource interface.
func (b *Batch) InitPutTuples(bs BulkSource[[]byte]) {
numKeys := bs.Len()
reqs := make([]struct {
req kvpb.InitPutRequest
union kvpb.RequestUnion_InitPut
}, numKeys)
i := 0
bsi := bs.Iter()
b.bulkRequest(numKeys, func() (kvpb.RequestUnion, int) {
pr := &reqs[i].req
union := &reqs[i].union
union.InitPut = pr
i++
k, v := bsi.Next()
pr.Key = k
pr.Value.SetTuple(v)
pr.Value.InitChecksum(k)
return kvpb.RequestUnion{Value: union}, len(k) + len(pr.Value.RawBytes)
})
}

// Inc increments the integer value at key. If the key does not exist it will
// be created with an initial value of 0 which will then be incremented. If the
// key exists but was set using Put or CPut an error will be returned.
Expand Down Expand Up @@ -909,3 +1072,34 @@ func (b *Batch) barrier(s, e interface{}) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) bulkRequest(
numKeys int, requestFactory func() (req kvpb.RequestUnion, kvSize int),
) {
n := len(b.reqs)
b.growReqs(numKeys)
newReqs := b.reqs[n:]
for i := 0; i < numKeys; i++ {
req, numBytes := requestFactory()
b.approxMutationReqBytes += numBytes
newReqs[i] = req
}
b.initResult(numKeys, numKeys, notRaw, nil)
}

// GetResult retrieves the Result and Result row KeyValue for a particular index.
func (b *Batch) GetResult(idx int) (*Result, KeyValue, error) {
origIdx := idx
for i := range b.Results {
r := &b.Results[i]
if idx < r.calls {
if idx < len(r.Rows) {
return r, r.Rows[idx], nil
} else {
return r, KeyValue{}, nil
}
}
idx -= r.calls
}
return nil, KeyValue{}, errors.AssertionFailedf("index %d outside of results: %+v", origIdx, b.Results)
}
153 changes: 152 additions & 1 deletion pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"strconv"
"testing"
"time"

Expand All @@ -35,7 +36,7 @@ import (
"google.golang.org/grpc/status"
)

func setup(t *testing.T) (serverutils.TestServerInterface, *kv.DB) {
func setup(t testing.TB) (serverutils.TestServerInterface, *kv.DB) {
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
return s, kvDB
}
Expand Down Expand Up @@ -858,3 +859,153 @@ func TestPreservingSteppingOnSenderReplacement(t *testing.T) {
require.Equal(t, expectedStepping, txn.ConfigureStepping(ctx, expectedStepping))
})
}

type byteSliceBulkSource[T kv.GValue] struct {
keys []roachpb.Key
values []T
}

var _ kv.BulkSource[[]byte] = &byteSliceBulkSource[[]byte]{}

func (s *byteSliceBulkSource[T]) Len() int {
return len(s.keys)
}

func (s *byteSliceBulkSource[T]) Iter() kv.BulkSourceIterator[T] {
return &byteSliceBulkSourceIterator[T]{s: s, cursor: 0}
}

type byteSliceBulkSourceIterator[T kv.GValue] struct {
s *byteSliceBulkSource[T]
cursor int
}

func (s *byteSliceBulkSourceIterator[T]) Next() (roachpb.Key, T) {
k, v := s.s.keys[s.cursor], s.s.values[s.cursor]
s.cursor++
return k, v
}

func TestBulkBatchAPI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()

kys := []roachpb.Key{[]byte("a"), []byte("b"), []byte("c")}
vals := [][]byte{[]byte("you"), []byte("know"), []byte("me")}

type putter func(*kv.Batch)

clearKeys := func() {
txn := db.NewTxn(ctx, "bulk-test")
b := txn.NewBatch()
b.Del("a", "b", "c")
err := txn.CommitInBatch(ctx, b)
require.NoError(t, err)
}

verify := func() {
for i, k := range kys {
v, err := db.Get(ctx, k)
require.NoError(t, err)
raw := vals[i]
if tv, err := v.Value.GetTuple(); err == nil {
raw = tv
}
err = v.Value.Verify(k)
require.NoError(t, err)
require.Equal(t, raw, vals[i])
}
}

testF := func(p putter) {
txn := db.NewTxn(ctx, "bulk-test")
b := txn.NewBatch()
p(b)
err := txn.CommitInBatch(ctx, b)
require.NoError(t, err)
verify()
require.Greater(t, len(b.Results), 1)
r := b.Results[0]
require.Equal(t, len(r.Rows), len(kys))
require.NoError(t, r.Err)
clearKeys()
}

testF(func(b *kv.Batch) { b.PutBytes(&byteSliceBulkSource[[]byte]{kys, vals}) })
testF(func(b *kv.Batch) { b.PutTuples(&byteSliceBulkSource[[]byte]{kys, vals}) })
testF(func(b *kv.Batch) { b.InitPutBytes(&byteSliceBulkSource[[]byte]{kys, vals}) })
testF(func(b *kv.Batch) { b.InitPutTuples(&byteSliceBulkSource[[]byte]{kys, vals}) })
testF(func(b *kv.Batch) { b.CPutTuplesEmpty(&byteSliceBulkSource[[]byte]{kys, vals}) })

values := make([]roachpb.Value, len(kys))
for i, v := range vals {
if kys[i] != nil {
values[i].InitChecksum(kys[i])
values[i].SetTuple(v)
}
}
testF(func(b *kv.Batch) { b.CPutValuesEmpty(&byteSliceBulkSource[roachpb.Value]{kys, values}) })
}

func TestGetResults(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()

kys1 := []roachpb.Key{[]byte("a"), []byte("b"), []byte("c")}
kys2 := []roachpb.Key{[]byte("d"), []byte("e"), []byte("f")}
vals := [][]byte{[]byte("you"), []byte("know"), []byte("me")}
txn := db.NewTxn(ctx, "bulk-test")
b := txn.NewBatch()
b.PutBytes(&byteSliceBulkSource[[]byte]{kys1, vals})
b.PutBytes(&byteSliceBulkSource[[]byte]{kys2, vals})
err := txn.CommitInBatch(ctx, b)
require.NoError(t, err)
for i := 0; i < len(kys1)+len(kys2); i++ {
res, row, err := b.GetResult(i)
require.Equal(t, res, &b.Results[i/3])
require.Equal(t, row, b.Results[i/3].Rows[i%3])
require.NoError(t, err)
}
// test EndTxn result
_, _, err = b.GetResult(len(kys1) + len(kys2))
require.NoError(t, err)

// test out of bounds
_, _, err = b.GetResult(len(kys1) + len(kys2) + 1)
require.Error(t, err)
}

func BenchmarkBulkBatchAPI(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
s, db := setup(b)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()
txn := db.NewTxn(ctx, "bulk-test")
kys := make([]roachpb.Key, 1000)
vals := make([][]byte, len(kys))
for i := 0; i < len(kys); i++ {
kys[i] = []byte("asdf" + strconv.Itoa(i))
vals[i] = []byte("qwerty" + strconv.Itoa(i))
}
b.Run("single", func(b *testing.B) {
ba := txn.NewBatch()
for i := 0; i < b.N; i++ {
for j, k := range kys {
ba.Put(k, vals[j])
}
}
})
b.Run("bulk", func(b *testing.B) {
ba := txn.NewBatch()
for i := 0; i < b.N; i++ {
ba.PutBytes(&byteSliceBulkSource[[]byte]{kys, vals})
}
})
}

0 comments on commit ac95590

Please sign in to comment.