Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy: add vectorize insert support used solely by copy for now #98605

Merged
merged 1 commit into from
Mar 18, 2023

Conversation

cucaroach
Copy link
Contributor

@cucaroach cucaroach commented Mar 14, 2023

Implement new on by default vectorized insert for COPY FROM statements.
Controlled by vectorize and copy_fast_path_enabled session variables
which both default to true. If you set copy_fast_path_enabled to false
you get the old unoptimized behavior (22.1). If you leave
copy_fast_path_enabled enabled but turn off vectorize you get the 22.2
behavior.

COPY FROM fast path row version and vectorized version both now respect
memory limits on a per row basis, ie if huge rows are encountered
COPY buffers will be flushed before we reach the configured copy row
batch size. Also if lots of rows are sent in one CopyData statement
we will now flush when we reach the copy row batch size limit instead
of inserting all the data. This matters little with psql clients which
typically do a row per CopyData segment but matters a lot with pgx
which will do 64k CopyData segments.

Keys are not inserted in the exact same order as they were with the row
version of copy. Now they are sorted per batch so that all the PK Keys
are inserted and then the first secondary index etc.

The vectorized insert benefits from larger batch sizes so we are more
generous with how big they can get. By default we start with 64 row
batches and double up till a limit derived by KV raft command batch
size parameterized by schema (ie wider bigger schema will get smaller
batch size upper limit) not to exceed 32k which is roughly where
performance gains from bigger batches start to trail off.

Epic: CRDB-18892
Informs: #91831
Release note (sql change): Bulk COPY FROM statements are now
processed with a vectorized insert and can be anywhere from %50
to 5x faster. Typical hardware and schemas should see a 2x improvement.
Vectorized inserts are only used for COPY statements and are not yet
applied to regular inserts. Both the vectorize and copy_fast_path_enabled
session variables can be used to disable this feature.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@cucaroach cucaroach changed the title copyfin copy: add vectorize insert support used solely by copy for now Mar 14, 2023
@cucaroach cucaroach marked this pull request as ready for review March 14, 2023 19:04
@cucaroach cucaroach requested review from a team as code owners March 14, 2023 19:04
@cucaroach cucaroach requested review from DrewKimball, otan and yuzefovich and removed request for a team and DrewKimball March 14, 2023 19:04
@cucaroach cucaroach force-pushed the copyfin branch 3 times, most recently from a8d40c8 to 494327d Compare March 15, 2023 10:37
@otan
Copy link
Contributor

otan commented Mar 15, 2023

unfortunately do not feel qualified to review this :') will let yahor take first pass

@cucaroach
Copy link
Contributor Author

unfortunately do not feel qualified to review this :') will let yahor take first pass

No worries, I was hoping you'd kinda do a once over the copy_from.go changes. Specifically should we have a dedicated session variable for vectorized copy? I'm half tempted to re-purpose copy_fast_path_enabled to decide row vs vector, I don't think its useful in its current incarnation (ie thinking we can just delete the current copy_fast_path_enabled disabled code which was just a 22.2 crutch in case my original optimization broke something).

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice stuff! Overall, looks good to me, but I have some minor questions and suggestions.

Reviewed 36 of 36 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach and @otan)


pkg/sql/copy_from.go line 256 at r1 (raw file):

	scratchRow    []tree.Datum
	batch         coldata.Batch
	alloc         *colmem.Allocator

It looks like this alloc is only used to "power" the SetAccountingHelper. I'd recommend that we not store the reference to alloc at all - SetAccountHelper has ReleaseMemory for ReleaseAll. The rationale for this is that SetAccountingHelper requires that the Allocator object is not shared with anything else, so by not storing the reference to it, we're less likely to access the allocator outside of the SetAccountingHelper.


pkg/sql/copy_from.go line 323 at r1 (raw file):

	}
	c.resultColumns = make(colinfo.ResultColumns, len(cols))
	typs := make([]*types.T, len(cols))

nit: could just use c.typs to avoid the local variable.


pkg/sql/copy_from.go line 415 at r1 (raw file):

		for colmem.EstimateBatchSizeBytes(typs, batchSize) > targetBatchMemUsage &&
			batchSize > minBatchSize {
			batchSize /= 2

nit: we might end up with batchSize < minBatchSize and then we won't increase it enough in the other loop below (when we have wide schema). Is this ok?


pkg/sql/copy_from.go line 430 at r1 (raw file):

	c.vectorized = true
	factory := coldataext.NewExtendedColumnFactory(c.p.EvalContext())
	c.alloc = colmem.NewLimitedAllocator(ctx, &c.rowsMemAcc, nil, factory)

nit: add inlined comment for nil.


pkg/sql/copy_from.go line 432 at r1 (raw file):

	c.alloc = colmem.NewLimitedAllocator(ctx, &c.rowsMemAcc, nil, factory)
	c.alloc.SetMaxBatchSize(c.copyBatchRowSize)
	// TODO(cucaroach): check that batch isn't unnecessarily allocating selection vector.

nit: oh, it's definitely allocating a selection vector (and includes it into the memory account) 😃 Perhaps reword the TODO to make it not allocate the selection vector?


pkg/sql/copy_from.go line 501 at r1 (raw file):

	c.rows.Close(ctx)
	// TODO(cucaroach): if this isn't close'd the Stop below errors out
	// saying there's 10240 bytes left, investigate.

My guess is that Allocator.ReleaseAll shrinks rowsMemAcc, but that shrinking doesn't return all bytes to the monitor - it keeps mon.DefaultPoolAllocationSize in "reserve", and this is what you're observing. I think closing rowsMemAcc is reasonable here, and in this case we can avoid Allocator.ReleaseAll call too. I'd be good to confirm whether this is what happens though.


pkg/sql/copy_from.go line 821 at r1 (raw file):

				continue
			}
			if err := tree.ParseAndRequireStringHandler(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx, c.valueHandlers[i], &c.ph); err != nil {

nit: could use vh[i] here.


pkg/sql/copy_from.go line 1119 at r1 (raw file):

	if c.vectorized {
		var realloc bool
		c.batch, realloc = c.accHelper.ResetMaybeReallocate(c.typs, c.batch, 0)

nit: add inlined comment for 0.


pkg/sql/distsql_physical_planner.go line 4630 at r1 (raw file):

	for i, p := range plan.Processors {
		plan.Processors[i].Spec.ProcessorID = int32(i)
		// Double check that our reliance on ProcessorID == index is good

nit: missing period here and below.


pkg/sql/distsql_physical_planner.go line 4642 at r1 (raw file):

// TODO(cucaroach): this doesn't work, get it working as part of effort to make
// distsql inserts handle general inserts.
func (dsp *DistSQLPlanner) createPlanForRowCount(

This seems like a leftover? Is rowCountNode needed for copy?


pkg/sql/distsql_physical_planner.go line 4692 at r1 (raw file):

	var typs []*types.T
	if len(n.columns) > 0 {
		panic(errors.AssertionFailedf("distsql insert doesn't support RETURNING"))

nit: maybe return as error?


pkg/sql/values.go line 39 at r1 (raw file):

	valuesRun

	// Allow passing a coldata.Batch through a valuesNode.

Why not use coldata.Batch here?


pkg/sql/colexec/insert.go line 43 at r1 (raw file):

	retBatch   coldata.Batch
	flowCtx    *execinfra.FlowCtx
	// Check ords are the columns containing bool values with check expression

nit: s/Check ords/checkOrds/g.


pkg/sql/colexec/insert.go line 78 at r1 (raw file):

	}

	// Empirical testing shows that if is ApproximateMutationBytes approaches

nit: something is off in "if is ...".


pkg/sql/colexec/insert.go line 83 at r1 (raw file):

	mutationQuota := int(kvserverbase.MaxCommandSize.Get(&flowCtx.Cfg.Settings.SV) / 3)

	alloc := colmem.NewAllocator(ctx, nil, coldata.StandardColumnFactory)

nit: in other places we pass the allocator as argument in order to centralize memory monitoring in execplan.go. I think we should do so here too. Use getStreamingAllocator in execplan.go.


pkg/sql/colexec/insert.go line 97 at r1 (raw file):

			colexecerror.InternalError(err)
		}
		v.semaCtx = flowCtx.NewSemaContext(v.flowCtx.Txn)

nit: you could pass this as an argument too - args.ExprHelper.SemaCtx in execplan.go.


pkg/sql/colexec/insert.go line 102 at r1 (raw file):

}

func (v *vectorInserter) Init(ctx context.Context) {

nit: can drop this and defer the implementation to the embedded OneInputHelper.


pkg/sql/colexec/insert.go line 106 at r1 (raw file):

}

func (v *vectorInserter) GetPartialIndexMap(b coldata.Batch) map[catid.IndexID][]bool {

nit: does this need to be exported?


pkg/sql/colexec/insert.go line 157 at r1 (raw file):

			return nil
		})
	// PrepareBatch is called in a loop to partially insert til everything is

nit: s/til/till/g.


pkg/sql/colexec/insert.go line 171 at r1 (raw file):

				if end <= start {
					// Disable memory limit, if the system can't handle this row
					// a KV error will encountered below.

nit: s/will/will be/g.


pkg/sql/colexec/insert.go line 198 at r1 (raw file):

		}
		if start < b.Length() {
			kvba.Batch = v.flowCtx.Txn.NewBatch()

nit: we could do this as the first action in the loop if we remove the initial allocation when creating kvba object outside of the loop.


pkg/sql/colexec/insert.go line 202 at r1 (raw file):

	}

	v.retBatch.ColVec(0).Int64()[0] = int64(b.Length())

We should call v.retBatch.ResetInternalBatch() here. This is in case - hypothetically - the caller adds a selection vector on top.


pkg/sql/colexec/colbuilder/execplan.go line 824 at r1 (raw file):

			}
			if core.Values.NumRows == 0 || len(core.Values.Columns) == 0 {
				// Handle coldata.Batch vector source

nit: missing period.


pkg/sql/colexec/colexecargs/op_creation.go line 67 at r1 (raw file):

	ProcessorConstructor execinfra.ProcessorConstructor
	LocalProcessors      []execinfra.LocalProcessor
	LocalVectorSources   map[int32]any

Why are we using any here? Is the expectation that later we'll have two options (say coldata.Batch and [][]byte)?

Update: I saw the explanation later, consider adding a quick comment here pointing to the comment in physicalplan package.


pkg/sql/colflow/explain_vec.go line 62 at r1 (raw file):

		admission.WorkInfo{},
	)
	opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, nil, fuseOpt)

nit: add inlined comment for nil.


pkg/sql/distsql/server.go line 568 at r1 (raw file):

	LocalProcs []execinfra.LocalProcessor

	LocalVectorSources map[int32]any

nit: a quick comment would be good.


pkg/sql/execinfrapb/processors_sql.proto line 1095 at r1 (raw file):

// rows or inserts from select).
message InsertSpec {
  optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

I remember that we have put in quite a bit of effort to not pass the table descriptors around via the processor specs (at least in processors owned by SQL Queries) - that's where IndexFetchSpec came out. Do we really need the whole descriptor here? Can we extend a reduced version of it?


pkg/sql/physicalplan/physical_plan.go line 86 at r1 (raw file):

	LocalProcessors []execinfra.LocalProcessor

	// LocalVectorSources contains canned coldata.Batch's to be used at vector

nit: probably s/at/as/g.


pkg/sql/sem/tree/parse_string.go line 163 at r1 (raw file):

// ValueHandler is an interface to allow raw types to be extracted from strings.
// For types that don't pack perfectly in a machine type they return a size

Is this comment still applicable?


pkg/sql/sem/tree/values.go line 65 at r1 (raw file):

}

// VectorRows lets us store a Batch in a tree.LiteralValuesClause

nit: missing period.


pkg/sql/sem/tree/values.go line 83 at r1 (raw file):

func (r VectorRows) Get(i, j int) Expr {
	return DNull
	//panic("not implemented")

nit: leftover.


pkg/sql/sem/tree/values.go line 86 at r1 (raw file):

}

// put this type assertion here to prevent coldata->tree dependency.

nit: this seems like the right place to add this assertion, no? Perhaps drop this comment?


pkg/sql/copy/bench_test.go line 66 at r1 (raw file):

	require.NoError(b, err)

	// send data in 5 batches of 10k rows

I'm confused by this comment - could you expand on it?


pkg/sql/copy/bench_test.go line 128 at r1 (raw file):

)`

// Perform a copy

nit: the comment seems incomplete.


pkg/sql/copy/bench_test.go line 153 at r1 (raw file):

	rng := rand.New(rand.NewSource(0))
	rows := 10000
	numrows, err := conn.GetDriverConn().CopyFrom(ctx,

I'm confused - what are we benchmarking? It seems like we always run just a single iteration - is this expected? Should we do b.ResetTimer somewhere?

Copy link
Contributor

@otan otan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brief skim of copy_from

decodeTyp = decodeTyp.ArrayContents()
}
switch decodeTyp.Family() {
case types.BytesFamily,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, where did this logic come from / why are these families special?

pkg/sql/copy_from.go Show resolved Hide resolved
pkg/sql/copy_from.go Outdated Show resolved Hide resolved
Copy link
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I'm gonna look at the table descriptor thing a bit more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @otan and @yuzefovich)


pkg/sql/copy_from.go line 256 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

It looks like this alloc is only used to "power" the SetAccountingHelper. I'd recommend that we not store the reference to alloc at all - SetAccountHelper has ReleaseMemory for ReleaseAll. The rationale for this is that SetAccountingHelper requires that the Allocator object is not shared with anything else, so by not storing the reference to it, we're less likely to access the allocator outside of the SetAccountingHelper.

Done.


pkg/sql/copy_from.go line 415 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: we might end up with batchSize < minBatchSize and then we won't increase it enough in the other loop below (when we have wide schema). Is this ok?

Yeah, it would have to be one crazy schema but its okay, we test with batches of size 1.


pkg/sql/copy_from.go line 424 at r1 (raw file):

Previously, otan (Oliver Tan) wrote…

should we add a minBatchSize thingo check here too?

See above, I added a comment.


pkg/sql/copy_from.go line 501 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

My guess is that Allocator.ReleaseAll shrinks rowsMemAcc, but that shrinking doesn't return all bytes to the monitor - it keeps mon.DefaultPoolAllocationSize in "reserve", and this is what you're observing. I think closing rowsMemAcc is reasonable here, and in this case we can avoid Allocator.ReleaseAll call too. I'd be good to confirm whether this is what happens though.

Yeah, that explains it.


pkg/sql/copy_from.go line 1222 at r1 (raw file):

Previously, otan (Oliver Tan) wrote…

hmm, where did this logic come from / why are these families special?

Its like 7 years old code I copied from readTextTupleDatum. 🤷


pkg/sql/distsql_physical_planner.go line 4642 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

This seems like a leftover? Is rowCountNode needed for copy?

Yeah the COPY statement returns number of rows so opt_exec_factory constructs a rowCountNode in ConstructInsert and ConstructInsertFastPath.


pkg/sql/values.go line 39 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Why not use coldata.Batch here?

I think I thought I was avoiding adding a dependency but I was mistaken.


pkg/sql/colexec/insert.go line 106 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: does this need to be exported?

No


pkg/sql/colexec/insert.go line 202 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We should call v.retBatch.ResetInternalBatch() here. This is in case - hypothetically - the caller adds a selection vector on top.

Done.


pkg/sql/colexec/colexecargs/op_creation.go line 67 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Why are we using any here? Is the expectation that later we'll have two options (say coldata.Batch and [][]byte)?

Update: I saw the explanation later, consider adding a quick comment here pointing to the comment in physicalplan package.

Done.


pkg/sql/distsql/server.go line 568 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: a quick comment would be good.

👍


pkg/sql/execinfrapb/processors_sql.proto line 1095 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I remember that we have put in quite a bit of effort to not pass the table descriptors around via the processor specs (at least in processors owned by SQL Queries) - that's where IndexFetchSpec came out. Do we really need the whole descriptor here? Can we extend a reduced version of it?

I'll do some digging, not really familiar with what my options are.


pkg/sql/sem/tree/parse_string.go line 163 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Is this comment still applicable?

No, good catch.


pkg/sql/copy/bench_test.go line 66 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I'm confused by this comment - could you expand on it?

This is an old test I just moved over from copy_test.go, I'm just gonna delete it.


pkg/sql/copy/bench_test.go line 153 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I'm confused - what are we benchmarking? It seems like we always run just a single iteration - is this expected? Should we do b.ResetTimer somewhere?

This was a fixture for me to test arbitrarily large copies, I should make it a Test and not a Benchmark.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 14 of 14 files at r2, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @cucaroach and @otan)


pkg/sql/copy_from.go line 501 at r1 (raw file):

Previously, cucaroach (Tommy Reilly) wrote…

Yeah, that explains it.

Perhaps remove the TODO then?


pkg/sql/copy_from.go line 263 at r2 (raw file):

	// For testing we want to be able to override this on the instance level.
	copyBatchRowSize int
	maxRowMem        int64

nit: maybe s/maxRowMem/maxRowsMem/g (or maxBatchMem) to make it more clear that it's about many rows (or a batch of rows)?


pkg/sql/copy_from.go line 391 at r2 (raw file):

	if buildutil.CrdbTestBuild {
		// We have to honor metamorphic default in testing, the transaction
		// commit tests rely on it, specifically they override it to

nit: maybe join this and next lines into one.


pkg/sql/distsql_physical_planner.go line 4642 at r1 (raw file):

Previously, cucaroach (Tommy Reilly) wrote…

Yeah the COPY statement returns number of rows so opt_exec_factory constructs a rowCountNode in ConstructInsert and ConstructInsertFastPath.

I see, thanks.


pkg/sql/distsql_physical_planner.go line 3465 at r2 (raw file):

					plan.LocalVectorSources = make(map[int32]any)
				}
				plan.LocalVectorSources[int32(idx)] = n.coldataBatch

nit: should we make the key in this map be physicalplan.ProcessorIdx type? This way it'll be more clear what the key is about.


pkg/sql/colexec/insert.go line 83 at r2 (raw file):

	// 32MB we'll hit the command limit. So set limit to a fraction of
	// command limit to be safe.
	mutationQuota := int(kvserverbase.MaxCommandSize.Get(&flowCtx.Cfg.Settings.SV) / 3)

nit: we divide raft command size by 3 in several places - should we introduce a cluster setting to control this 3 number? Like I'm wondering whether we might want to have a knob to tune mutationQuota and batch sizes without having to update the raft command size.


pkg/sql/execinfrapb/processors_sql.proto line 1095 at r1 (raw file):

Previously, cucaroach (Tommy Reilly) wrote…

I'll do some digging, not really familiar with what my options are.

We do have other non-read-only processors that pass the whole descriptor around, so if it turns out that the whole descriptor is needed unless we change a lot of code (on a quick glance it appears to be this way), it seems ok to passing around the whole descriptor.


pkg/sql/sem/tree/values.go line 82 at r2 (raw file):

// Get implements the ExprContainer interface.
func (r VectorRows) Get(i, j int) Expr {
	return DNull

nit: should we panic here? I'm guessing it's unexpected if VectorRows.Get is called.

Implement new on by default vectorized insert for COPY FROM statements.
Controlled by vectorize and copy_fast_path_enabled session variables
which both default to true.  If you set copy_fast_path_enabled to false
you get the old unoptimized behavior (22.1).  If you leave
copy_fast_path_enabled enabled but turn off vectorize you get the 22.2
behavior.

COPY FROM fast path row version and vectorized version both now respect
memory limits on a per row basis, ie if huge rows are encountered
COPY buffers will be flushed before we reach the configured copy row
batch size. Also if lots of rows are sent in one CopyData statement
we will now flush when we reach the copy row batch size limit instead
of inserting all the data. This matters little with psql clients which
typically do a row per CopyData segment but matters a lot with pgx
which will do 64k CopyData segments.

Keys are not inserted in the exact same order as they were with the row
version of copy. Now they are sorted per batch so that all the PK Keys
are inserted and then the first secondary index etc.

The vectorized insert benefits from larger batch sizes so we are more
generous with how big they can get.  By default we start with 64 row
batches and double up till a limit derived by KV raft command batch
size parameterized by schema (ie wider bigger schema will get smaller
batch size upper limit) not to exceed 32k which is roughly where
performance gains from bigger batches start to trail off.

Epic: CRDB-18892
Informs: cockroachdb#91831
Release note (sql change): Bulk COPY FROM statements are now
processed with a vectorized insert and can be anywhere from %50
to 5x faster. Typical hardware and schemas should see a 2x improvement.
Vectorized inserts are only used for COPY statements and are not yet
applied to regular inserts. Both the vectorize and copy_fast_path_enabled
session variables can be used to disable this feature.
@cucaroach
Copy link
Contributor Author

bors r=yuzefovich
She's loaded to the hilt with TODO's but I think she'll float! TFTRs!

@craig
Copy link
Contributor

craig bot commented Mar 18, 2023

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Mar 18, 2023

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Mar 18, 2023

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants