Skip to content

Commit

Permalink
Merge #39496
Browse files Browse the repository at this point in the history
39496: storage/bulk,storagebase: add BulkAdderOptions struct r=adityamaru27 a=dt

The signature on this is hard to modify as it is plumbed though in several places,
and as a result, many flags that control one behavior or another, like if SSTs
should be added disallowing duplicates, were being mutated via 'Set' functions
that were starting to get messy.

By making a options struct that is in the signature used and plumbed through all
the sql/distsql/etc layers, any new flags can be added here with minimal extra
work involved.

Release note: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Aug 12, 2019
2 parents 7b16c1b + c7b8288 commit 724efe2
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 92 deletions.
30 changes: 14 additions & 16 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,30 +505,28 @@ func (cp *readImportDataProcessor) ingestKvs(
// primary and secondary index kvs. The number of secondary index kvs are
// small, and so we expect the indexAdder to flush much less frequently
// than the pkIndexAdder.
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
BufferSize: bufferSize,
SSTSize: uint64(flushSize),
})
if err != nil {
return err
}
pkIndexAdder.SetName("pkIndexAdder")
pkIndexAdder.SetDisallowShadowing(true)
// AddSSTable with disallowShadowing=true does not consider a KV with the
// same ts and value to be a collision. This is to support the resumption
// of IMPORT jobs which might re-import some already ingested, but not
// checkpointed KVs.
//
// To provide a similar behavior with KVs within the same SST, we silently
// skip over duplicates with the same value, instead of throwing a
// uniqueness error.
pkIndexAdder.SkipLocalDuplicatesWithSameValues(true)
defer pkIndexAdder.Close(ctx)

indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
BufferSize: bufferSize,
SSTSize: uint64(flushSize),
})
if err != nil {
return err
}
indexAdder.SetName("indexAdder")
indexAdder.SetDisallowShadowing(true)
indexAdder.SkipLocalDuplicatesWithSameValues(true)
defer indexAdder.Close(ctx)

// We insert splits at every index span of the table above. Since the
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -78,8 +79,10 @@ func ToSSTable(t workload.Table, tableID sqlbase.ID, ts time.Time) ([]byte, erro
})
g.GoCtx(func(ctx context.Context) error {
sstTS := hlc.Timestamp{WallTime: ts.UnixNano()}
const sstSize = math.MaxInt64
ba, err := bulk.MakeBulkAdder(&ssts, nil /* rangeCache */, sstSize, sstSize, sstTS)
const sstSize = math.MaxUint64
ba, err := bulk.MakeBulkAdder(
&ssts, nil /* rangeCache */, sstTS, storagebase.BulkAdderOptions{SSTSize: sstSize, BufferSize: sstSize},
)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
ClusterName: s.cfg.ClusterName,

TempStorage: tempEngine,
BulkAdder: func(ctx context.Context, db *client.DB, bufferSize, flushSize int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) {
return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), bufferSize, flushSize, ts)
BulkAdder: func(
ctx context.Context, db *client.DB, ts hlc.Timestamp, opts storagebase.BulkAdderOptions,
) (storagebase.BulkAdder, error) {
return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), ts, opts)
},
DiskMonitor: s.cfg.TempStorageConfig.Mon,

Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/distsqlrun/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ func (sp *bulkRowWriter) OutputTypes() []types.T {

func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan []roachpb.KeyValue) error {
writeTS := sp.spec.Table.CreateAsOfTime
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := sp.flowCtx.Cfg.BulkAdder(ctx, sp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
const bufferSize = 64 << 20
adder, err := sp.flowCtx.Cfg.BulkAdder(
ctx, sp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{BufferSize: bufferSize},
)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsqlrun/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ func newIndexBackfiller(
func (ib *indexBackfiller) prepare(ctx context.Context) error {
bufferSize := backfillerBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV)
sstSize := backillerSSTSize.Get(&ib.flowCtx.Cfg.Settings.SV)
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, bufferSize, sstSize, ib.spec.ReadAsOf)
opts := storagebase.BulkAdderOptions{
SSTSize: uint64(sstSize),
BufferSize: uint64(bufferSize),
SkipDuplicates: ib.ContainsInvertedIndex(),
}
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.ReadAsOf, opts)
if err != nil {
return err
}
ib.adder = adder
ib.adder.SkipLocalDuplicates(ib.ContainsInvertedIndex())
return nil
}

Expand Down
51 changes: 20 additions & 31 deletions pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

// BufferingAdder is a wrapper for an SSTBatcher that allows out-of-order calls
Expand All @@ -45,42 +45,37 @@ type BufferingAdder struct {
name string
}

// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
// to add into SSTs that are then ingested.
// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs
// passed to add into SSTs that are then ingested. rangeCache if set is
// consulted to avoid generating an SST that will span a range boundary and thus
// encounter an error and need to be split and retired to be applied.
func MakeBulkAdder(
db sender,
rangeCache *kv.RangeDescriptorCache,
flushBytes, sstBytes int64,
timestamp hlc.Timestamp,
opts storagebase.BulkAdderOptions,
) (*BufferingAdder, error) {
if flushBytes <= 0 || sstBytes <= 0 {
return nil, errors.Errorf("flush size and sst bytes must be > 0")
if opts.BufferSize == 0 {
opts.BufferSize = 32 << 20
}
if opts.SSTSize == 0 {
opts.SSTSize = 32 << 20
}
b := &BufferingAdder{
sink: SSTBatcher{db: db, maxSize: sstBytes, rc: rangeCache},
name: opts.Name,
sink: SSTBatcher{
db: db,
maxSize: int64(opts.SSTSize),
rc: rangeCache,
skipDuplicates: opts.SkipDuplicates,
disallowShadowing: opts.DisallowShadowing,
},
timestamp: timestamp,
flushSize: int(flushBytes),
flushSize: int(opts.BufferSize),
}
return b, nil
}

// SkipLocalDuplicates configures skipping of duplicate keys in local batches.
func (b *BufferingAdder) SkipLocalDuplicates(skip bool) {
b.sink.skipDuplicateKeys = skip
}

// SkipLocalDuplicatesWithSameValues configures skipping of duplicate keys with
// the same value in local batches.
func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) {
b.sink.skipDuplicateKeysWithSameValue = skip
}

// SetName sets the name of the adder being used for the purpose of logging
// stats.
func (b *BufferingAdder) SetName(name string) {
b.name = name
}

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
log.VEventf(ctx, 2,
Expand Down Expand Up @@ -158,9 +153,3 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
func (b *BufferingAdder) GetSummary() roachpb.BulkOpSummary {
return b.sink.GetSummary()
}

// SetDisallowShadowing controls whether or not shadowing of existing keys is
// permitted.
func (b *BufferingAdder) SetDisallowShadowing(disallowShadowing bool) {
b.sink.disallowShadowing = disallowShadowing
}
35 changes: 13 additions & 22 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,17 @@ type SSTBatcher struct {
flushKey roachpb.Key
rc *kv.RangeDescriptorCache

// skips duplicate keys (iff they are buffered together). This
// is true when used to backfill an inverted index. An array in JSONB with
// multiple values which are the same, will all correspond to the same kv in
// the inverted index. The method which generates these kvs does not dedup,
// thus we rely on the SSTBatcher to dedup them (by skipping), rather than
// throwing a DuplicateKeyError.
skipDuplicateKeys bool

// skips duplicate keys with the same value (iff they are buffered together).
// This is true when used with IMPORT. Import generally prohibits the
// ingestion of KVs which will shadow existing data, with the exception of
// duplicates having the same value and timestamp. To maintain uniform
// behavior, duplicates in the same batch with equal values will not raise a
// DuplicateKeyError.
skipDuplicateKeysWithSameValue bool
// skips duplicate keys (iff they are buffered together). This is true when
// used to backfill an inverted index. An array in JSONB with multiple values
// which are the same, will all correspond to the same kv in the inverted
// index. The method which generates these kvs does not dedup, thus we rely on
// the SSTBatcher to dedup them (by skipping), rather than throwing a
// DuplicateKeyError. This is also true when used with IMPORT. Import
// generally prohibits the ingestion of KVs which will shadow existing data,
// with the exception of duplicates having the same value and timestamp. To
// maintain uniform behavior, duplicates in the same batch with equal values
// will not raise a DuplicateKeyError.
skipDuplicates bool

maxSize int64
// rows written in the current batch.
Expand All @@ -78,7 +74,7 @@ type SSTBatcher struct {
sstSize int
}

// allows ingestion of keys where the MVCC.Key would shadow an exisiting row.
// allows ingestion of keys where the MVCC.Key would shadow an existing row.
disallowShadowing bool
}

Expand All @@ -95,12 +91,7 @@ func MakeSSTBatcher(ctx context.Context, db sender, flushBytes int64) (*SSTBatch
// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value []byte) error {
if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) {
if b.skipDuplicateKeys {
return nil
}

sameValue := len(b.batchEndValue) > 0 && bytes.Equal(b.batchEndValue, value)
if b.skipDuplicateKeysWithSameValue && sameValue {
if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) {
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -42,7 +43,7 @@ func TestAddBatched(t *testing.T) {
})
}

func runTestImport(t *testing.T, batchSize int64) {
func runTestImport(t *testing.T, batchSize uint64) {

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down Expand Up @@ -127,7 +128,9 @@ func runTestImport(t *testing.T, batchSize int64) {
}

ts := hlc.Timestamp{WallTime: 100}
b, err := bulk.MakeBulkAdder(kvDB, mockCache, batchSize, batchSize, ts)
b, err := bulk.MakeBulkAdder(
kvDB, mockCache, ts, storagebase.BulkAdderOptions{BufferSize: batchSize, SSTSize: batchSize},
)
if err != nil {
t.Fatal(err)
}
Expand Down
41 changes: 28 additions & 13 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// BulkAdderOptions is used to configure the behavior of a BulkAdder.
type BulkAdderOptions struct {
// Name is used in logging messages to identify this adder or the process on
// behalf of which it is adding data.
Name string

// SSTSize is the size at which an SST will be flushed and a new one started.
// SSTs are also split during a buffer flush to avoid spanning range bounds so
// they may be smaller than this limit.
SSTSize uint64

// BufferSize is the maximum amount of data to buffer before flushing SSTs.
BufferSize uint64

// SkipLocalDuplicates configures handling of duplicate keys within a local
// sorted batch. When true if the same key/value pair is added more than once
// subsequent additions will be ignored instead of producing an error. If an
// attempt to add the same key has a differnet value, it is always an error.
// Once a batch is flushed – explicitly or automatically – local duplicate
// detection does not apply.
SkipDuplicates bool

// DisallowShadowing controls whether shadowing of existing keys is permitted
// when the SSTables produced by this adder are ingested.
DisallowShadowing bool
}

// BulkAdderFactory describes a factory function for BulkAdders.
type BulkAdderFactory func(
ctx context.Context, db *client.DB, bufferBytes, flushBytes int64, timestamp hlc.Timestamp,
ctx context.Context, db *client.DB, timestamp hlc.Timestamp, opts BulkAdderOptions,
) (BulkAdder, error)

// BulkAdder describes a bulk-adding helper that can be used to add lots of KVs.
Expand All @@ -36,18 +63,6 @@ type BulkAdder interface {
GetSummary() roachpb.BulkOpSummary
// Close closes the underlying buffers/writers.
Close(ctx context.Context)
// SkipLocalDuplicates configures handling of duplicate keys within a local
// sorted batch. Once a batch is flushed – explicitly or automatically – local
// duplicate detection does not apply.
SkipLocalDuplicates(bool)
// SkipLocalDuplicatesWithSameValues configures handling of duplicate keys
// with the same value within a local sorted batch.
SkipLocalDuplicatesWithSameValues(bool)
// SetDisallowShadowing sets the flag which controls whether shadowing of
// existing keys is permitted in the AddSSTable method.
SetDisallowShadowing(bool)
// SetName sets the name of the adder for the purpose of logging adder stats.
SetName(string)
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down

0 comments on commit 724efe2

Please sign in to comment.