Skip to content

Commit

Permalink
storage/bulk,storagebase: add BulkAdderOptions struct
Browse files Browse the repository at this point in the history
The signature on this is hard to modify as it is plumbed though in several places,
and as a result, many flags that control one behavior or another, like if SSTs
should be added disallowing duplicates, were being mutated via 'Set' functions
that were starting to get messy.

By making a options struct that is in the signature used and plumbed through all
the sql/distsql/etc layers, any new flags can be added here with minimal extra
work involved.

Release note: none.
  • Loading branch information
dt authored and adityamaru27 committed Aug 12, 2019
1 parent b44713c commit c7b8288
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 92 deletions.
30 changes: 14 additions & 16 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,30 +505,28 @@ func (cp *readImportDataProcessor) ingestKvs(
// primary and secondary index kvs. The number of secondary index kvs are
// small, and so we expect the indexAdder to flush much less frequently
// than the pkIndexAdder.
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
BufferSize: bufferSize,
SSTSize: uint64(flushSize),
})
if err != nil {
return err
}
pkIndexAdder.SetName("pkIndexAdder")
pkIndexAdder.SetDisallowShadowing(true)
// AddSSTable with disallowShadowing=true does not consider a KV with the
// same ts and value to be a collision. This is to support the resumption
// of IMPORT jobs which might re-import some already ingested, but not
// checkpointed KVs.
//
// To provide a similar behavior with KVs within the same SST, we silently
// skip over duplicates with the same value, instead of throwing a
// uniqueness error.
pkIndexAdder.SkipLocalDuplicatesWithSameValues(true)
defer pkIndexAdder.Close(ctx)

indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
BufferSize: bufferSize,
SSTSize: uint64(flushSize),
})
if err != nil {
return err
}
indexAdder.SetName("indexAdder")
indexAdder.SetDisallowShadowing(true)
indexAdder.SkipLocalDuplicatesWithSameValues(true)
defer indexAdder.Close(ctx)

// We insert splits at every index span of the table above. Since the
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -78,8 +79,10 @@ func ToSSTable(t workload.Table, tableID sqlbase.ID, ts time.Time) ([]byte, erro
})
g.GoCtx(func(ctx context.Context) error {
sstTS := hlc.Timestamp{WallTime: ts.UnixNano()}
const sstSize = math.MaxInt64
ba, err := bulk.MakeBulkAdder(&ssts, nil /* rangeCache */, sstSize, sstSize, sstTS)
const sstSize = math.MaxUint64
ba, err := bulk.MakeBulkAdder(
&ssts, nil /* rangeCache */, sstTS, storagebase.BulkAdderOptions{SSTSize: sstSize, BufferSize: sstSize},
)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
ClusterName: s.cfg.ClusterName,

TempStorage: tempEngine,
BulkAdder: func(ctx context.Context, db *client.DB, bufferSize, flushSize int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) {
return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), bufferSize, flushSize, ts)
BulkAdder: func(
ctx context.Context, db *client.DB, ts hlc.Timestamp, opts storagebase.BulkAdderOptions,
) (storagebase.BulkAdder, error) {
return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), ts, opts)
},
DiskMonitor: s.cfg.TempStorageConfig.Mon,

Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/distsqlrun/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ func (sp *bulkRowWriter) OutputTypes() []types.T {

func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan []roachpb.KeyValue) error {
writeTS := sp.spec.Table.CreateAsOfTime
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := sp.flowCtx.Cfg.BulkAdder(ctx, sp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
const bufferSize = 64 << 20
adder, err := sp.flowCtx.Cfg.BulkAdder(
ctx, sp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{BufferSize: bufferSize},
)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsqlrun/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ func newIndexBackfiller(
func (ib *indexBackfiller) prepare(ctx context.Context) error {
bufferSize := backfillerBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV)
sstSize := backillerSSTSize.Get(&ib.flowCtx.Cfg.Settings.SV)
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, bufferSize, sstSize, ib.spec.ReadAsOf)
opts := storagebase.BulkAdderOptions{
SSTSize: uint64(sstSize),
BufferSize: uint64(bufferSize),
SkipDuplicates: ib.ContainsInvertedIndex(),
}
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.ReadAsOf, opts)
if err != nil {
return err
}
ib.adder = adder
ib.adder.SkipLocalDuplicates(ib.ContainsInvertedIndex())
return nil
}

Expand Down
51 changes: 20 additions & 31 deletions pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

// BufferingAdder is a wrapper for an SSTBatcher that allows out-of-order calls
Expand All @@ -45,42 +45,37 @@ type BufferingAdder struct {
name string
}

// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
// to add into SSTs that are then ingested.
// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs
// passed to add into SSTs that are then ingested. rangeCache if set is
// consulted to avoid generating an SST that will span a range boundary and thus
// encounter an error and need to be split and retired to be applied.
func MakeBulkAdder(
db sender,
rangeCache *kv.RangeDescriptorCache,
flushBytes, sstBytes int64,
timestamp hlc.Timestamp,
opts storagebase.BulkAdderOptions,
) (*BufferingAdder, error) {
if flushBytes <= 0 || sstBytes <= 0 {
return nil, errors.Errorf("flush size and sst bytes must be > 0")
if opts.BufferSize == 0 {
opts.BufferSize = 32 << 20
}
if opts.SSTSize == 0 {
opts.SSTSize = 32 << 20
}
b := &BufferingAdder{
sink: SSTBatcher{db: db, maxSize: sstBytes, rc: rangeCache},
name: opts.Name,
sink: SSTBatcher{
db: db,
maxSize: int64(opts.SSTSize),
rc: rangeCache,
skipDuplicates: opts.SkipDuplicates,
disallowShadowing: opts.DisallowShadowing,
},
timestamp: timestamp,
flushSize: int(flushBytes),
flushSize: int(opts.BufferSize),
}
return b, nil
}

// SkipLocalDuplicates configures skipping of duplicate keys in local batches.
func (b *BufferingAdder) SkipLocalDuplicates(skip bool) {
b.sink.skipDuplicateKeys = skip
}

// SkipLocalDuplicatesWithSameValues configures skipping of duplicate keys with
// the same value in local batches.
func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) {
b.sink.skipDuplicateKeysWithSameValue = skip
}

// SetName sets the name of the adder being used for the purpose of logging
// stats.
func (b *BufferingAdder) SetName(name string) {
b.name = name
}

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
log.VEventf(ctx, 2,
Expand Down Expand Up @@ -158,9 +153,3 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
func (b *BufferingAdder) GetSummary() roachpb.BulkOpSummary {
return b.sink.GetSummary()
}

// SetDisallowShadowing controls whether or not shadowing of existing keys is
// permitted.
func (b *BufferingAdder) SetDisallowShadowing(disallowShadowing bool) {
b.sink.disallowShadowing = disallowShadowing
}
35 changes: 13 additions & 22 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,17 @@ type SSTBatcher struct {
flushKey roachpb.Key
rc *kv.RangeDescriptorCache

// skips duplicate keys (iff they are buffered together). This
// is true when used to backfill an inverted index. An array in JSONB with
// multiple values which are the same, will all correspond to the same kv in
// the inverted index. The method which generates these kvs does not dedup,
// thus we rely on the SSTBatcher to dedup them (by skipping), rather than
// throwing a DuplicateKeyError.
skipDuplicateKeys bool

// skips duplicate keys with the same value (iff they are buffered together).
// This is true when used with IMPORT. Import generally prohibits the
// ingestion of KVs which will shadow existing data, with the exception of
// duplicates having the same value and timestamp. To maintain uniform
// behavior, duplicates in the same batch with equal values will not raise a
// DuplicateKeyError.
skipDuplicateKeysWithSameValue bool
// skips duplicate keys (iff they are buffered together). This is true when
// used to backfill an inverted index. An array in JSONB with multiple values
// which are the same, will all correspond to the same kv in the inverted
// index. The method which generates these kvs does not dedup, thus we rely on
// the SSTBatcher to dedup them (by skipping), rather than throwing a
// DuplicateKeyError. This is also true when used with IMPORT. Import
// generally prohibits the ingestion of KVs which will shadow existing data,
// with the exception of duplicates having the same value and timestamp. To
// maintain uniform behavior, duplicates in the same batch with equal values
// will not raise a DuplicateKeyError.
skipDuplicates bool

maxSize int64
// rows written in the current batch.
Expand All @@ -78,7 +74,7 @@ type SSTBatcher struct {
sstSize int
}

// allows ingestion of keys where the MVCC.Key would shadow an exisiting row.
// allows ingestion of keys where the MVCC.Key would shadow an existing row.
disallowShadowing bool
}

Expand All @@ -95,12 +91,7 @@ func MakeSSTBatcher(ctx context.Context, db sender, flushBytes int64) (*SSTBatch
// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value []byte) error {
if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) {
if b.skipDuplicateKeys {
return nil
}

sameValue := len(b.batchEndValue) > 0 && bytes.Equal(b.batchEndValue, value)
if b.skipDuplicateKeysWithSameValue && sameValue {
if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) {
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -42,7 +43,7 @@ func TestAddBatched(t *testing.T) {
})
}

func runTestImport(t *testing.T, batchSize int64) {
func runTestImport(t *testing.T, batchSize uint64) {

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down Expand Up @@ -127,7 +128,9 @@ func runTestImport(t *testing.T, batchSize int64) {
}

ts := hlc.Timestamp{WallTime: 100}
b, err := bulk.MakeBulkAdder(kvDB, mockCache, batchSize, batchSize, ts)
b, err := bulk.MakeBulkAdder(
kvDB, mockCache, ts, storagebase.BulkAdderOptions{BufferSize: batchSize, SSTSize: batchSize},
)
if err != nil {
t.Fatal(err)
}
Expand Down
41 changes: 28 additions & 13 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// BulkAdderOptions is used to configure the behavior of a BulkAdder.
type BulkAdderOptions struct {
// Name is used in logging messages to identify this adder or the process on
// behalf of which it is adding data.
Name string

// SSTSize is the size at which an SST will be flushed and a new one started.
// SSTs are also split during a buffer flush to avoid spanning range bounds so
// they may be smaller than this limit.
SSTSize uint64

// BufferSize is the maximum amount of data to buffer before flushing SSTs.
BufferSize uint64

// SkipLocalDuplicates configures handling of duplicate keys within a local
// sorted batch. When true if the same key/value pair is added more than once
// subsequent additions will be ignored instead of producing an error. If an
// attempt to add the same key has a differnet value, it is always an error.
// Once a batch is flushed – explicitly or automatically – local duplicate
// detection does not apply.
SkipDuplicates bool

// DisallowShadowing controls whether shadowing of existing keys is permitted
// when the SSTables produced by this adder are ingested.
DisallowShadowing bool
}

// BulkAdderFactory describes a factory function for BulkAdders.
type BulkAdderFactory func(
ctx context.Context, db *client.DB, bufferBytes, flushBytes int64, timestamp hlc.Timestamp,
ctx context.Context, db *client.DB, timestamp hlc.Timestamp, opts BulkAdderOptions,
) (BulkAdder, error)

// BulkAdder describes a bulk-adding helper that can be used to add lots of KVs.
Expand All @@ -36,18 +63,6 @@ type BulkAdder interface {
GetSummary() roachpb.BulkOpSummary
// Close closes the underlying buffers/writers.
Close(ctx context.Context)
// SkipLocalDuplicates configures handling of duplicate keys within a local
// sorted batch. Once a batch is flushed – explicitly or automatically – local
// duplicate detection does not apply.
SkipLocalDuplicates(bool)
// SkipLocalDuplicatesWithSameValues configures handling of duplicate keys
// with the same value within a local sorted batch.
SkipLocalDuplicatesWithSameValues(bool)
// SetDisallowShadowing sets the flag which controls whether shadowing of
// existing keys is permitted in the AddSSTable method.
SetDisallowShadowing(bool)
// SetName sets the name of the adder for the purpose of logging adder stats.
SetName(string)
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down

0 comments on commit c7b8288

Please sign in to comment.