Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67720: sql: rationalize batch key and bytes limit r=andreimatei a=andreimatei

Before this patch, the txnKVFetcher had two modes of operation: either
use both key limits and bytes limits, or use neither. This patch changes
it to the following three modes: only bytes limits, both, neither - thus
adding the only bytes limits mode.

The idea is that, frequently, only bytes limits are desired - getting
row limits in those cases was a historical artifact. The choices for the
user of the fetcher are the following:

1) I want DistSender-level parallelism. I can't use any limits.

2) I want to read some key spans fully. But I don't want to OOM either
   myself (the client) or the server. In this case I want only memory
   limits (and I was getting row limits before this patch for no reason).

3) I hope I won't have to read my spans fully. The high-level query has
   a LIMIT, which has been processed into a fetcher-level hint about how
   many keys need to be read to satisfy the LIMIT. Even if that hint proves
   too small, there's still likely a point where enough rows will have been
   read. In this case, I start with my hint, and then (while insufficient),
   I progressively ratched it up. All the batches have bytes limits too
   since I don't want OOMs and, since I use key limits,
   DistSender-parallelism is inhibited anyway (so there's no reason not to
   use bytes limit).

For case 3), there's still an upper-bound key limit to the "ratcheting
up" behavior. This is productionKVBatchSize - which this patch increases
from 10k keys to 100k. I'm not sure if the existence of this upper bound
is rational; I'm thinking that it acts as an upper-bound on the amount
of wasted work due to over-shooting the real LIMIT. Since the bytes
limit is 10MB, it doesn't matter very much.

The fetcher clients that don't specify a "firstBatchRowHint" used to get
the bytes limit and a key limit of 10k. Now they get only the bytes
limit. This category includes the joinReader, the inverted joiner, some
backfills, and some DELETEs.

For the fetcher client that specify a hint, the behavior is unchanged
with the exception of the fact that the upper bound of the key limit
ratcheting is now 100k instead of 10k. This means that, for rows under
1KB, they're now more likely to hit the memory limit then the key limit.
This category includes the cFetcher, the TableReader, and the zig-zag
joiner.

Release note: None

68606: sql: support COMMENT ON SCHEMA r=otan a=ekalinin

Fixes #67689 

This change adds support for SCHEMA comment.

Release note (sql change): This change adds associating
comment to SQL schema using PostgreSQL's 
`COMMENT ON SCHEMA` syntax.

68892: backupccl: move SHOW BACKUP manifest loading to helper r=dt a=dt

This pulls the manifest loading and rendering step of SHOW BACKUP into a helper,
behind an interface, that can then allow for other implementations of the interface
that load backup information differently, much like how the actual renderer was already
plugable.

Release note: none.

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Eugene Kalinin <e.v.kalinin@gmail.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
4 people committed Aug 18, 2021
4 parents 5d2c91c + 0fa923a + edc8951 + 5089ab3 commit a60b8bb
Show file tree
Hide file tree
Showing 56 changed files with 995 additions and 556 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/comment.bnf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
comment_stmt ::=
'COMMENT' 'ON' 'DATABASE' database_name 'IS' comment_text
| 'COMMENT' 'ON' 'SCHEMA' schema_name 'IS' comment_text
| 'COMMENT' 'ON' 'TABLE' table_name 'IS' comment_text
| 'COMMENT' 'ON' 'COLUMN' column_name 'IS' comment_text
| 'COMMENT' 'ON' 'INDEX' table_index_name 'IS' comment_text
7 changes: 4 additions & 3 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ copy_from_stmt ::=

comment_stmt ::=
'COMMENT' 'ON' 'DATABASE' database_name 'IS' comment_text
| 'COMMENT' 'ON' 'SCHEMA' schema_name 'IS' comment_text
| 'COMMENT' 'ON' 'TABLE' table_name 'IS' comment_text
| 'COMMENT' 'ON' 'COLUMN' column_path 'IS' comment_text
| 'COMMENT' 'ON' 'INDEX' table_index_name 'IS' comment_text
Expand Down Expand Up @@ -272,6 +273,9 @@ comment_text ::=
'SCONST'
| 'NULL'

schema_name ::=
name

column_path ::=
name
| prefixed_column_path
Expand Down Expand Up @@ -1806,9 +1810,6 @@ alter_zone_partition_stmt ::=
| 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_index_name set_zone_config
| 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_name '@' '*' set_zone_config

schema_name ::=
name

opt_add_val_placement ::=
'BEFORE' 'SCONST'
| 'AFTER' 'SCONST'
Expand Down
119 changes: 77 additions & 42 deletions pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,79 @@ func checkShowBackupURIPrivileges(ctx context.Context, p sql.PlanHookState, uri
return nil
}

type backupInfoReader interface {
showBackup(
context.Context,
cloud.ExternalStorage,
*jobspb.BackupEncryptionOptions,
[]string,
chan<- tree.Datums,
) error
header() colinfo.ResultColumns
}

type manifestInfoReader struct {
shower backupShower
}

var _ backupInfoReader = manifestInfoReader{}

func (m manifestInfoReader) header() colinfo.ResultColumns {
return m.shower.header
}

func (m manifestInfoReader) showBackup(
ctx context.Context,
store cloud.ExternalStorage,
enc *jobspb.BackupEncryptionOptions,
incPaths []string,
resultsCh chan<- tree.Datums,
) error {
var err error
manifests := make([]BackupManifest, len(incPaths)+1)
manifests[0], err = ReadBackupManifestFromStore(ctx, store, enc)
if err != nil {
return err
}

for i := range incPaths {
m, err := readBackupManifest(ctx, store, incPaths[i], enc)
if err != nil {
return err
}
// Blank the stats to prevent memory blowup.
m.DeprecatedStatistics = nil
manifests[i+1] = m
}

// Ensure that the descriptors in the backup manifests are up to date.
//
// This is necessary in particular for upgrading descriptors with old-style
// foreign keys which are no longer supported.
// If we are restoring a backup with old-style foreign keys, skip over the
// FKs for which we can't resolve the cross-table references. We can't
// display them anyway, because we don't have the referenced table names,
// etc.
err = maybeUpgradeDescriptorsInBackupManifests(ctx, manifests, true /* skipFKsWithNoMatchingTable */)
if err != nil {
return err
}

datums, err := m.shower.fn(manifests)
if err != nil {
return err
}

for _, row := range datums {
select {
case <-ctx.Done():
return ctx.Err()
case resultsCh <- row:
}
}
return nil
}

// showBackupPlanHook implements PlanHookFn.
func showBackupPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
Expand Down Expand Up @@ -107,6 +180,7 @@ func showBackupPlanHook(
backup.Details = tree.BackupManifestAsJSON
}

var infoReader backupInfoReader
var shower backupShower
switch backup.Details {
case tree.BackupRangeDetails:
Expand All @@ -118,6 +192,7 @@ func showBackupPlanHook(
default:
shower = backupShowerDefault(ctx, p, backup.ShouldIncludeSchemas, opts)
}
infoReader = manifestInfoReader{shower}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
Expand Down Expand Up @@ -190,50 +265,10 @@ func showBackupPlanHook(
}
}

manifests := make([]BackupManifest, len(incPaths)+1)
manifests[0], err = ReadBackupManifestFromStore(ctx, store, encryption)
if err != nil {
return err
}

for i := range incPaths {
m, err := readBackupManifest(ctx, store, incPaths[i], encryption)
if err != nil {
return err
}
// Blank the stats to prevent memory blowup.
m.DeprecatedStatistics = nil
manifests[i+1] = m
}

// Ensure that the descriptors in the backup manifests are up to date.
//
// This is necessary in particular for upgrading descriptors with old-style
// foreign keys which are no longer supported.
// If we are restoring a backup with old-style foreign keys, skip over the
// FKs for which we can't resolve the cross-table references. We can't
// display them anyway, because we don't have the referenced table names,
// etc.
err = maybeUpgradeDescriptorsInBackupManifests(ctx, manifests, true /* skipFKsWithNoMatchingTable */)
if err != nil {
return err
}

datums, err := shower.fn(manifests)
if err != nil {
return err
}
for _, row := range datums {
select {
case <-ctx.Done():
return ctx.Err()
case resultsCh <- row:
}
}
return nil
return infoReader.showBackup(ctx, store, encryption, incPaths, resultsCh)
}

return fn, shower.header, nil, false, nil
return fn, infoReader.header(), nil, false, nil
}

type backupShower struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ func readPostgresStmt(
case *tree.Insert, *tree.CopyFrom, *tree.Delete, copyData:
// handled during the data ingestion pass.
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.SetVar, *tree.Analyze:
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.SetVar, *tree.Analyze,
*tree.CommentOnSchema:
// These are the statements that can be parsed by CRDB but are not
// supported, or are not required to be processed, during an IMPORT.
// - ignore txns.
Expand Down Expand Up @@ -1344,7 +1345,8 @@ func (m *pgDumpReader) readFile(
return wrapErrorWithUnsupportedHint(err)
}
case *tree.CreateExtension, *tree.CommentOnDatabase, *tree.CommentOnTable,
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence:
*tree.CommentOnIndex, *tree.CommentOnColumn, *tree.AlterSequence,
*tree.CommentOnSchema:
// handled during schema extraction.
case *tree.SetVar, *tree.BeginTransaction, *tree.CommitTransaction, *tree.Analyze:
// handled during schema extraction.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ const (
TableCommentType = 1
ColumnCommentType = 2
IndexCommentType = 3
SchemaCommentType = 4
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"comment_on_column.go",
"comment_on_database.go",
"comment_on_index.go",
"comment_on_schema.go",
"comment_on_table.go",
"conn_executor.go",
"conn_executor_exec.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,7 @@ func columnBackfillInTxn(
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunColumnBackfillChunk(ctx,
txn, tableDesc, sp, columnBackfillBatchSize.Get(&evalCtx.Settings.SV),
txn, tableDesc, sp, row.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
txn *kv.Txn,
tableDesc catalog.TableDescriptor,
sp roachpb.Span,
chunkSize int64,
chunkSize row.RowLimit,
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
Expand Down Expand Up @@ -287,7 +287,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
// populated and deleted by the OLTP commands but not otherwise
// read or used
if err := cb.fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, true /* limitBatches */, chunkSize,
ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, chunkSize,
traceKV, false, /* forceProductionKVBatchSize */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
Expand All @@ -305,7 +305,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
iv.Cols = append(iv.Cols, tableDesc.PublicColumns()...)
iv.Cols = append(iv.Cols, cb.added...)
cb.evalCtx.IVarContainer = iv
for i := int64(0); i < chunkSize; i++ {
for i := int64(0); i < int64(chunkSize); i++ {
datums, _, _, err := cb.fetcher.NextRowDecoded(ctx)
if err != nil {
return roachpb.Key{}, err
Expand Down Expand Up @@ -809,7 +809,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
defer fetcher.Close(ctx)
if err := fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, true /* limitBatches */, initBufferSize,
ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, initBufferSize,
traceKV, false, /* forceProductionKVBatchSize */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,26 +664,30 @@ func (rf *cFetcher) StartScan(
spans roachpb.Spans,
bsHeader *roachpb.BoundedStalenessHeader,
limitBatches bool,
limitHint int64,
batchBytesLimit row.BytesLimit,
limitHint row.RowLimit,
traceKV bool,
forceProductionKVBatchSize bool,
) error {
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
}
if !limitBatches && batchBytesLimit != row.NoBytesLimit {
return errors.AssertionFailedf("batchBytesLimit set without limitBatches")
}

rf.traceKV = traceKV

// If we have a limit hint, we limit the first batch size. Subsequent
// batches get larger to avoid making things too slow (e.g. in case we have
// a very restrictive filter and actually have to retrieve a lot of rows).
firstBatchLimit := limitHint
firstBatchLimit := row.KeyLimit(limitHint)
if firstBatchLimit != 0 {
// The limitHint is a row limit, but each row could be made up
// of more than one key. We take the maximum possible keys
// per row out of all the table rows we could potentially
// scan over.
firstBatchLimit = limitHint * int64(rf.maxKeysPerRow)
firstBatchLimit = row.KeyLimit(int(limitHint) * rf.maxKeysPerRow)
// We need an extra key to make sure we form the last row.
firstBatchLimit++
}
Expand All @@ -695,7 +699,7 @@ func (rf *cFetcher) StartScan(
spans,
bsHeader,
rf.reverse,
limitBatches,
batchBytesLimit,
firstBatchLimit,
rf.lockStrength,
rf.lockWaitPolicy,
Expand Down
46 changes: 31 additions & 15 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -47,12 +48,13 @@ type ColBatchScan struct {
colexecop.ZeroInputNode
colexecop.InitHelper

spans roachpb.Spans
flowCtx *execinfra.FlowCtx
bsHeader *roachpb.BoundedStalenessHeader
rf *cFetcher
limitHint int64
parallelize bool
spans roachpb.Spans
flowCtx *execinfra.FlowCtx
bsHeader *roachpb.BoundedStalenessHeader
rf *cFetcher
limitHint row.RowLimit
batchBytesLimit row.BytesLimit
parallelize bool
// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
tracingSpan *tracing.Span
Expand Down Expand Up @@ -94,6 +96,7 @@ func (s *ColBatchScan) Init(ctx context.Context) {
s.spans,
s.bsHeader,
limitBatches,
s.batchBytesLimit,
s.limitHint,
s.flowCtx.TraceKV,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes,
Expand Down Expand Up @@ -191,7 +194,7 @@ func NewColBatchScan(
return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}

limitHint := execinfra.LimitHint(spec.LimitHint, post)
limitHint := row.RowLimit(execinfra.LimitHint(spec.LimitHint, post))
// TODO(ajwerner): The need to construct an immutable here
// indicates that we're probably doing this wrong. Instead we should be
// just setting the ID and Version in the spec or something like that and
Expand Down Expand Up @@ -249,16 +252,29 @@ func NewColBatchScan(
//gcassert:bce
spans = append(spans, specSpans[i].Span)
}
*s = ColBatchScan{
spans: spans,
flowCtx: flowCtx,
bsHeader: bsHeader,
rf: fetcher,
limitHint: limitHint,

if spec.LimitHint > 0 || spec.BatchBytesLimit > 0 {
// Parallelize shouldn't be set when there's a limit hint, but double-check
// just in case.
parallelize: spec.Parallelize && limitHint == 0,
ResultTypes: typs,
spec.Parallelize = false
}
var batchBytesLimit row.BytesLimit
if !spec.Parallelize {
batchBytesLimit = row.BytesLimit(spec.BatchBytesLimit)
if batchBytesLimit == 0 {
batchBytesLimit = row.DefaultBatchBytesLimit
}
}

*s = ColBatchScan{
spans: spans,
flowCtx: flowCtx,
bsHeader: bsHeader,
rf: fetcher,
limitHint: limitHint,
batchBytesLimit: batchBytesLimit,
parallelize: spec.Parallelize,
ResultTypes: typs,
}
return s, nil
}
Expand Down
Loading

0 comments on commit a60b8bb

Please sign in to comment.