diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index a6c33a8fb676..152fecb8072a 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -16,11 +16,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -31,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -255,45 +252,12 @@ func (cp *readImportDataProcessor) emitKvs(ctx context.Context, kvCh <-chan row. return nil } -func (cp *readImportDataProcessor) presplitTableBoundaries(ctx context.Context) error { - // TODO(jeffreyxiao): Remove this check in 20.1. - // If the cluster supports sticky bits, then we should use the sticky bit to - // ensure that the splits are not automatically split by the merge queue. If - // the cluster does not support sticky bits, we disable the merge queue via - // gossip, so we can just set the split to expire immediately. - stickyBitEnabled := cp.flowCtx.Cfg.Settings.Version.IsActive(cluster.VersionStickyBit) - expirationTime := hlc.Timestamp{} - if stickyBitEnabled { - expirationTime = cp.flowCtx.Cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0) - } - for _, tbl := range cp.spec.Tables { - for _, span := range tbl.Desc.AllIndexSpans() { - if err := cp.flowCtx.Cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil { - return err - } - - log.VEventf(ctx, 1, "scattering index range %s", span.Key) - scatterReq := &roachpb.AdminScatterRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span), - } - if _, pErr := client.SendWrapped(ctx, cp.flowCtx.Cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil { - log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr) - } - } - } - return nil -} - // ingestKvs drains kvs from the channel until it closes, ingesting them using // the BulkAdder. It handles the required buffering/sorting/etc. func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error { ctx, span := tracing.ChildSpan(ctx, "ingestKVs") defer tracing.FinishSpan(span) - if err := cp.presplitTableBoundaries(ctx); err != nil { - return err - } - writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos} flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings) diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index c2a69009abd6..b459f1233ce7 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" @@ -601,6 +602,39 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan( return samples, nil } +func presplitTableBoundaries( + ctx context.Context, + cfg *ExecutorConfig, + tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, +) error { + // TODO(jeffreyxiao): Remove this check in 20.1. + // If the cluster supports sticky bits, then we should use the sticky bit to + // ensure that the splits are not automatically split by the merge queue. If + // the cluster does not support sticky bits, we disable the merge queue via + // gossip, so we can just set the split to expire immediately. + stickyBitEnabled := cfg.Settings.Version.IsActive(cluster.VersionStickyBit) + expirationTime := hlc.Timestamp{} + if stickyBitEnabled { + expirationTime = cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0) + } + for _, tbl := range tables { + for _, span := range tbl.Desc.AllIndexSpans() { + if err := cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil { + return err + } + + log.VEventf(ctx, 1, "scattering index range %s", span.Key) + scatterReq := &roachpb.AdminScatterRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span), + } + if _, pErr := client.SendWrapped(ctx, cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil { + log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr) + } + } + } + return nil +} + // DistIngest is used by IMPORT to run a DistSQL flow to ingest data by starting // reader processes on many nodes that each read and ingest their assigned files // and then send back a summary of what they ingested. The combined summary is @@ -688,6 +722,10 @@ func DistIngest( return nil }) + if err := presplitTableBoundaries(ctx, phs.ExecCfg(), tables); err != nil { + return roachpb.BulkOpSummary{}, err + } + recv := MakeDistSQLReceiver( ctx, &metadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn},