From c7b82880f7a616aa3dd6db364f906ac02a961718 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 9 Aug 2019 20:51:27 +0000 Subject: [PATCH] storage/bulk,storagebase: add BulkAdderOptions struct The signature on this is hard to modify as it is plumbed though in several places, and as a result, many flags that control one behavior or another, like if SSTs should be added disallowing duplicates, were being mutated via 'Set' functions that were starting to get messy. By making a options struct that is in the signature used and plumbed through all the sql/distsql/etc layers, any new flags can be added here with minimal extra work involved. Release note: none. --- pkg/ccl/importccl/read_import_proc.go | 30 ++++++++-------- pkg/ccl/workloadccl/format/sstable.go | 7 ++-- pkg/server/server.go | 6 ++-- pkg/sql/distsqlrun/bulk_row_writer.go | 6 ++-- pkg/sql/distsqlrun/indexbackfiller.go | 8 +++-- pkg/storage/bulk/buffering_adder.go | 51 +++++++++++---------------- pkg/storage/bulk/sst_batcher.go | 35 +++++++----------- pkg/storage/bulk/sst_batcher_test.go | 7 ++-- pkg/storage/storagebase/bulk_adder.go | 41 ++++++++++++++------- 9 files changed, 99 insertions(+), 92 deletions(-) diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index ec9a13c6f36a..4e2adb35beff 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -505,30 +505,28 @@ func (cp *readImportDataProcessor) ingestKvs( // primary and secondary index kvs. The number of secondary index kvs are // small, and so we expect the indexAdder to flush much less frequently // than the pkIndexAdder. - pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) + pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ + Name: "pkAdder", + DisallowShadowing: true, + SkipDuplicates: true, + BufferSize: bufferSize, + SSTSize: uint64(flushSize), + }) if err != nil { return err } - pkIndexAdder.SetName("pkIndexAdder") - pkIndexAdder.SetDisallowShadowing(true) - // AddSSTable with disallowShadowing=true does not consider a KV with the - // same ts and value to be a collision. This is to support the resumption - // of IMPORT jobs which might re-import some already ingested, but not - // checkpointed KVs. - // - // To provide a similar behavior with KVs within the same SST, we silently - // skip over duplicates with the same value, instead of throwing a - // uniqueness error. - pkIndexAdder.SkipLocalDuplicatesWithSameValues(true) defer pkIndexAdder.Close(ctx) - indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) + indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ + Name: "indexAdder", + DisallowShadowing: true, + SkipDuplicates: true, + BufferSize: bufferSize, + SSTSize: uint64(flushSize), + }) if err != nil { return err } - indexAdder.SetName("indexAdder") - indexAdder.SetDisallowShadowing(true) - indexAdder.SkipLocalDuplicatesWithSameValues(true) defer indexAdder.Close(ctx) // We insert splits at every index span of the table above. Since the diff --git a/pkg/ccl/workloadccl/format/sstable.go b/pkg/ccl/workloadccl/format/sstable.go index d0d0540a8d2d..512c2c21fb82 100644 --- a/pkg/ccl/workloadccl/format/sstable.go +++ b/pkg/ccl/workloadccl/format/sstable.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage/bulk" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/workload" @@ -78,8 +79,10 @@ func ToSSTable(t workload.Table, tableID sqlbase.ID, ts time.Time) ([]byte, erro }) g.GoCtx(func(ctx context.Context) error { sstTS := hlc.Timestamp{WallTime: ts.UnixNano()} - const sstSize = math.MaxInt64 - ba, err := bulk.MakeBulkAdder(&ssts, nil /* rangeCache */, sstSize, sstSize, sstTS) + const sstSize = math.MaxUint64 + ba, err := bulk.MakeBulkAdder( + &ssts, nil /* rangeCache */, sstTS, storagebase.BulkAdderOptions{SSTSize: sstSize, BufferSize: sstSize}, + ) if err != nil { return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index eef1bd8121a1..f4528b9c88ca 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -537,8 +537,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ClusterName: s.cfg.ClusterName, TempStorage: tempEngine, - BulkAdder: func(ctx context.Context, db *client.DB, bufferSize, flushSize int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) { - return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), bufferSize, flushSize, ts) + BulkAdder: func( + ctx context.Context, db *client.DB, ts hlc.Timestamp, opts storagebase.BulkAdderOptions, + ) (storagebase.BulkAdder, error) { + return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), ts, opts) }, DiskMonitor: s.cfg.TempStorageConfig.Mon, diff --git a/pkg/sql/distsqlrun/bulk_row_writer.go b/pkg/sql/distsqlrun/bulk_row_writer.go index c6921c8b9e53..5cef88e06cea 100644 --- a/pkg/sql/distsqlrun/bulk_row_writer.go +++ b/pkg/sql/distsqlrun/bulk_row_writer.go @@ -73,8 +73,10 @@ func (sp *bulkRowWriter) OutputTypes() []types.T { func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan []roachpb.KeyValue) error { writeTS := sp.spec.Table.CreateAsOfTime - const bufferSize, flushSize = 64 << 20, 16 << 20 - adder, err := sp.flowCtx.Cfg.BulkAdder(ctx, sp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) + const bufferSize = 64 << 20 + adder, err := sp.flowCtx.Cfg.BulkAdder( + ctx, sp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{BufferSize: bufferSize}, + ) if err != nil { return err } diff --git a/pkg/sql/distsqlrun/indexbackfiller.go b/pkg/sql/distsqlrun/indexbackfiller.go index d8f7d580e735..49d923e9af34 100644 --- a/pkg/sql/distsqlrun/indexbackfiller.go +++ b/pkg/sql/distsqlrun/indexbackfiller.go @@ -79,12 +79,16 @@ func newIndexBackfiller( func (ib *indexBackfiller) prepare(ctx context.Context) error { bufferSize := backfillerBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV) sstSize := backillerSSTSize.Get(&ib.flowCtx.Cfg.Settings.SV) - adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, bufferSize, sstSize, ib.spec.ReadAsOf) + opts := storagebase.BulkAdderOptions{ + SSTSize: uint64(sstSize), + BufferSize: uint64(bufferSize), + SkipDuplicates: ib.ContainsInvertedIndex(), + } + adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.ReadAsOf, opts) if err != nil { return err } ib.adder = adder - ib.adder.SkipLocalDuplicates(ib.ContainsInvertedIndex()) return nil } diff --git a/pkg/storage/bulk/buffering_adder.go b/pkg/storage/bulk/buffering_adder.go index 96db372d701c..12cb365cb825 100644 --- a/pkg/storage/bulk/buffering_adder.go +++ b/pkg/storage/bulk/buffering_adder.go @@ -17,9 +17,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/pkg/errors" ) // BufferingAdder is a wrapper for an SSTBatcher that allows out-of-order calls @@ -45,42 +45,37 @@ type BufferingAdder struct { name string } -// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed -// to add into SSTs that are then ingested. +// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs +// passed to add into SSTs that are then ingested. rangeCache if set is +// consulted to avoid generating an SST that will span a range boundary and thus +// encounter an error and need to be split and retired to be applied. func MakeBulkAdder( db sender, rangeCache *kv.RangeDescriptorCache, - flushBytes, sstBytes int64, timestamp hlc.Timestamp, + opts storagebase.BulkAdderOptions, ) (*BufferingAdder, error) { - if flushBytes <= 0 || sstBytes <= 0 { - return nil, errors.Errorf("flush size and sst bytes must be > 0") + if opts.BufferSize == 0 { + opts.BufferSize = 32 << 20 + } + if opts.SSTSize == 0 { + opts.SSTSize = 32 << 20 } b := &BufferingAdder{ - sink: SSTBatcher{db: db, maxSize: sstBytes, rc: rangeCache}, + name: opts.Name, + sink: SSTBatcher{ + db: db, + maxSize: int64(opts.SSTSize), + rc: rangeCache, + skipDuplicates: opts.SkipDuplicates, + disallowShadowing: opts.DisallowShadowing, + }, timestamp: timestamp, - flushSize: int(flushBytes), + flushSize: int(opts.BufferSize), } return b, nil } -// SkipLocalDuplicates configures skipping of duplicate keys in local batches. -func (b *BufferingAdder) SkipLocalDuplicates(skip bool) { - b.sink.skipDuplicateKeys = skip -} - -// SkipLocalDuplicatesWithSameValues configures skipping of duplicate keys with -// the same value in local batches. -func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) { - b.sink.skipDuplicateKeysWithSameValue = skip -} - -// SetName sets the name of the adder being used for the purpose of logging -// stats. -func (b *BufferingAdder) SetName(name string) { - b.name = name -} - // Close closes the underlying SST builder. func (b *BufferingAdder) Close(ctx context.Context) { log.VEventf(ctx, 2, @@ -158,9 +153,3 @@ func (b *BufferingAdder) Flush(ctx context.Context) error { func (b *BufferingAdder) GetSummary() roachpb.BulkOpSummary { return b.sink.GetSummary() } - -// SetDisallowShadowing controls whether or not shadowing of existing keys is -// permitted. -func (b *BufferingAdder) SetDisallowShadowing(disallowShadowing bool) { - b.sink.disallowShadowing = disallowShadowing -} diff --git a/pkg/storage/bulk/sst_batcher.go b/pkg/storage/bulk/sst_batcher.go index 4ae940eedc51..2b547f4dab23 100644 --- a/pkg/storage/bulk/sst_batcher.go +++ b/pkg/storage/bulk/sst_batcher.go @@ -45,21 +45,17 @@ type SSTBatcher struct { flushKey roachpb.Key rc *kv.RangeDescriptorCache - // skips duplicate keys (iff they are buffered together). This - // is true when used to backfill an inverted index. An array in JSONB with - // multiple values which are the same, will all correspond to the same kv in - // the inverted index. The method which generates these kvs does not dedup, - // thus we rely on the SSTBatcher to dedup them (by skipping), rather than - // throwing a DuplicateKeyError. - skipDuplicateKeys bool - - // skips duplicate keys with the same value (iff they are buffered together). - // This is true when used with IMPORT. Import generally prohibits the - // ingestion of KVs which will shadow existing data, with the exception of - // duplicates having the same value and timestamp. To maintain uniform - // behavior, duplicates in the same batch with equal values will not raise a - // DuplicateKeyError. - skipDuplicateKeysWithSameValue bool + // skips duplicate keys (iff they are buffered together). This is true when + // used to backfill an inverted index. An array in JSONB with multiple values + // which are the same, will all correspond to the same kv in the inverted + // index. The method which generates these kvs does not dedup, thus we rely on + // the SSTBatcher to dedup them (by skipping), rather than throwing a + // DuplicateKeyError. This is also true when used with IMPORT. Import + // generally prohibits the ingestion of KVs which will shadow existing data, + // with the exception of duplicates having the same value and timestamp. To + // maintain uniform behavior, duplicates in the same batch with equal values + // will not raise a DuplicateKeyError. + skipDuplicates bool maxSize int64 // rows written in the current batch. @@ -78,7 +74,7 @@ type SSTBatcher struct { sstSize int } - // allows ingestion of keys where the MVCC.Key would shadow an exisiting row. + // allows ingestion of keys where the MVCC.Key would shadow an existing row. disallowShadowing bool } @@ -95,12 +91,7 @@ func MakeSSTBatcher(ctx context.Context, db sender, flushBytes int64) (*SSTBatch // Keys must be added in order. func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key engine.MVCCKey, value []byte) error { if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) { - if b.skipDuplicateKeys { - return nil - } - - sameValue := len(b.batchEndValue) > 0 && bytes.Equal(b.batchEndValue, value) - if b.skipDuplicateKeysWithSameValue && sameValue { + if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) { return nil } diff --git a/pkg/storage/bulk/sst_batcher_test.go b/pkg/storage/bulk/sst_batcher_test.go index 48db5bedb2f3..f2e23a8cf431 100644 --- a/pkg/storage/bulk/sst_batcher_test.go +++ b/pkg/storage/bulk/sst_batcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/bulk" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -42,7 +43,7 @@ func TestAddBatched(t *testing.T) { }) } -func runTestImport(t *testing.T, batchSize int64) { +func runTestImport(t *testing.T, batchSize uint64) { ctx := context.Background() s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) @@ -127,7 +128,9 @@ func runTestImport(t *testing.T, batchSize int64) { } ts := hlc.Timestamp{WallTime: 100} - b, err := bulk.MakeBulkAdder(kvDB, mockCache, batchSize, batchSize, ts) + b, err := bulk.MakeBulkAdder( + kvDB, mockCache, ts, storagebase.BulkAdderOptions{BufferSize: batchSize, SSTSize: batchSize}, + ) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go index 0646a51a9f06..174e40586f07 100644 --- a/pkg/storage/storagebase/bulk_adder.go +++ b/pkg/storage/storagebase/bulk_adder.go @@ -19,9 +19,36 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) +// BulkAdderOptions is used to configure the behavior of a BulkAdder. +type BulkAdderOptions struct { + // Name is used in logging messages to identify this adder or the process on + // behalf of which it is adding data. + Name string + + // SSTSize is the size at which an SST will be flushed and a new one started. + // SSTs are also split during a buffer flush to avoid spanning range bounds so + // they may be smaller than this limit. + SSTSize uint64 + + // BufferSize is the maximum amount of data to buffer before flushing SSTs. + BufferSize uint64 + + // SkipLocalDuplicates configures handling of duplicate keys within a local + // sorted batch. When true if the same key/value pair is added more than once + // subsequent additions will be ignored instead of producing an error. If an + // attempt to add the same key has a differnet value, it is always an error. + // Once a batch is flushed – explicitly or automatically – local duplicate + // detection does not apply. + SkipDuplicates bool + + // DisallowShadowing controls whether shadowing of existing keys is permitted + // when the SSTables produced by this adder are ingested. + DisallowShadowing bool +} + // BulkAdderFactory describes a factory function for BulkAdders. type BulkAdderFactory func( - ctx context.Context, db *client.DB, bufferBytes, flushBytes int64, timestamp hlc.Timestamp, + ctx context.Context, db *client.DB, timestamp hlc.Timestamp, opts BulkAdderOptions, ) (BulkAdder, error) // BulkAdder describes a bulk-adding helper that can be used to add lots of KVs. @@ -36,18 +63,6 @@ type BulkAdder interface { GetSummary() roachpb.BulkOpSummary // Close closes the underlying buffers/writers. Close(ctx context.Context) - // SkipLocalDuplicates configures handling of duplicate keys within a local - // sorted batch. Once a batch is flushed – explicitly or automatically – local - // duplicate detection does not apply. - SkipLocalDuplicates(bool) - // SkipLocalDuplicatesWithSameValues configures handling of duplicate keys - // with the same value within a local sorted batch. - SkipLocalDuplicatesWithSameValues(bool) - // SetDisallowShadowing sets the flag which controls whether shadowing of - // existing keys is permitted in the AddSSTable method. - SetDisallowShadowing(bool) - // SetName sets the name of the adder for the purpose of logging adder stats. - SetName(string) } // DuplicateKeyError represents a failed attempt to ingest the same key twice