Skip to content

Commit

Permalink
sql: stop presplitting spans on every import proc
Browse files Browse the repository at this point in the history
We are currently presplitting spans (on every table and index boundary)
for every processor. We should only be doing this once so this PR moves
it to the planning stage.

Release justification: improves test flakes associated with too many
AdminSplit requests.

Release note: None
  • Loading branch information
pbardea committed Sep 17, 2019
1 parent ad4d6f5 commit 89f4cdf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 36 deletions.
36 changes: 0 additions & 36 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand All @@ -31,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -255,45 +252,12 @@ func (cp *readImportDataProcessor) emitKvs(ctx context.Context, kvCh <-chan row.
return nil
}

func (cp *readImportDataProcessor) presplitTableBoundaries(ctx context.Context) error {
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cp.flowCtx.Cfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cp.flowCtx.Cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.Desc.AllIndexSpans() {
if err := cp.flowCtx.Cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

log.VEventf(ctx, 1, "scattering index range %s", span.Key)
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
}
if _, pErr := client.SendWrapped(ctx, cp.flowCtx.Cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil {
log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr)
}
}
}
return nil
}

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)

if err := cp.presplitTableBoundaries(ctx); err != nil {
return err
}

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}

flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings)
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
Expand Down Expand Up @@ -601,6 +602,39 @@ func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
return samples, nil
}

func presplitTableBoundaries(
ctx context.Context,
cfg *ExecutorConfig,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
) error {
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cfg.DB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
for _, tbl := range tables {
for _, span := range tbl.Desc.AllIndexSpans() {
if err := cfg.DB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

log.VEventf(ctx, 1, "scattering index range %s", span.Key)
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
}
if _, pErr := client.SendWrapped(ctx, cfg.DB.NonTransactionalSender(), scatterReq); pErr != nil {
log.Errorf(ctx, "failed to scatter span %s: %s", span.Key, pErr)
}
}
}
return nil
}

// DistIngest is used by IMPORT to run a DistSQL flow to ingest data by starting
// reader processes on many nodes that each read and ingest their assigned files
// and then send back a summary of what they ingested. The combined summary is
Expand Down Expand Up @@ -688,6 +722,10 @@ func DistIngest(
return nil
})

if err := presplitTableBoundaries(ctx, phs.ExecCfg(), tables); err != nil {
return roachpb.BulkOpSummary{}, err
}

recv := MakeDistSQLReceiver(
ctx,
&metadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn},
Expand Down

0 comments on commit 89f4cdf

Please sign in to comment.