Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58358: opt: prune partial index columns and simplify partial index projections r=rytaft a=mgartner

Fixes #51623

#### opt: do not derive prune columns for Upsert, Update, Delete

We no longer derive output prune columns for Upsert, Update, and Delete
ops in `DerivePruneCols`. There are no PruneCols rules for these
operators, so deriving their prune columns was only performing
unnecessary work. There are other rules that prune the fetch and return
columns for these operators. These rules do not rely on
`DerivePruneCols`.

Release note: None

#### sql: remove logic to determine fetch cols in row updater

Previously, the `row.MakeUpdater` function had logic to determine the
fetch columns required for an update operation. This is not necessary
because the cost based optimizer already determines the necessary fetch
columns and plumbs them to `MakeUpdater` as the `requestedCols`
argument.

Release note: None

#### opt: safer access to partial index predicates in TableMeta

Previously, partial index predicate expressions in TableMeta were the
source-of-truth used within the optimizer to determine if an index is a
partial index. However, partial index predicates are not added to
TableMeta for all types of statements in optbuilder. Therefore, it was
not safe to assume this was a source-of-truth.

This commit unexports the map of partial index predicates in TableMeta.
Access to partial index predicates must now be done via
`TableMeta.PartialIndexPredicate`. This function checks the catalog to
determine if an index is a partial index, and panics if there is not a
corresponding predicate expression in the partial index predicate map.
This makes the function an actual a source-of-truth.

Release note: None

#### opt: move addPartialIndexPredicatesForTable to optbuilder/partial_index.go

Release note: None

#### opt: prune update/upsert fetch columns not needed for partial indexes

Indexed columns of partial indexes are now only fetched for UPDATE and
UPSERT operations when needed. They are pruned in cases where it is
guaranteed that they are not needed to build old or new index entries.
For example, consider the table and UPDATE:

    CREATE TABLE t (
      a INT PRIMARY KEY,
      b INT,
      c INT,
      d INT,
      INDEX (b) WHERE c > 0,
      FAMILY (a), FAMILY (b), FAMILY (c), FAMILY (d)
    )

    UPDATE t SET d = d + 1 WHERE a = 1

The partial index is guaranteed not to change with this UPDATE because
neither its indexed columns not the columns referenced in its predicate
are mutating. Therefore, the existing values of b do not need to be
fetched to maintain the state of the partial index. Furthermore, the
primary index does require the existing values of b because no columns
in b's family are mutating. So, b can be pruned from the UPDATE's fetch
columns.

Release note (performance improvement): Previously, indexed columns of
partial indexes were always fetched for UPDATEs and UPSERTs. Now they
are only fetched if they are required for maintaining the state of the
index. If an UPDATE or UPSERT mutates columns that are neither indexed by a
partial index nor referenced in a partial index predicate, they will no
longer be fetched (assuming that they are not needed to maintain the
state of other indexes, including the primary index).

#### opt: normalize partial index PUT/DEL projections to false

The `SimplifyPartialIndexProjections` normalization rule has been added
that normalizes synthesized partial index PUT and DEL columns to False
when it is guaranteed that a mutation will not require changed to the
associated partial index. This normalization can lead to further
normalizations, such as pruning columns that the synthesized projections
relied on.

The motivation for this change is to allow fully disjoint updates to
different columns in the same row, when the columns are split across
different families. By pruning columns not needed to maintain a partial
index, we're not forced to scan all column families. This can ultimately
reduce contention during updates.

Release note (performance improvement): UPDATE and UPSERT operations on
tables with partial indexes no longer evaluate partial index predicate
expressions when it is guaranteed that the operation will not alter the
state of the partial index. In some cases, this can eliminate fetching
the existing value of columns that are referenced in partial index
predicates.


58373: streamingccl: add ingestion job framework r=pbardea a=adityamaru

This change introduces a new StreamIngestionJob. It does not do much
more than laying out the general outline of the job, which is very
similar to other bulk jobs such as changefeed, backup etc.

More precisely:
- Introduces StreamIngestionDetails job details proto
- Hooks up the dependency to a mock stream client
- Introduces a StreamIngestionProcessorSpec
- Sets up a simple DistSQL flow which round-robin assigns the partitions
  to the processors.

Most notable TODOs in job land which will be addressed in follow up PRs:
- StreamIngestionPlanHook to create this job. It Will involve figuring out
  SQL syntax.
- Introducing a ts watermark in both the job and processors. This watermark will represent the lowest resolved ts which all processors
have ingested till. Iron out semantics on job start and resumption.
- Introducing a StreamIngestionFrontier processor which will slurp the
  results from the StreamIngestionProcessors, and use them to keep track
of the minimum resolved ts across all processors.

Fixes: #57399

Release note: None

58477: opt: prevent columns reuse in Union and UnionAll r=rytaft a=mgartner

#### opt: fix columns in SplitScanIntoUnionScans constraint

This commit fixes a minor bug in `SplitScanIntoUnionScans` that resulted
in a scan's constraint containing columns not associated with the scan.
This did not affect the correctness of results. However it appears that
it did cause inaccurate stats calculations; I had to add histogram
buckets to the tests to coerce the optimizer into choosing the same
plan for the corresponding test.

Release note: None

#### opt: do not reuse columns for Unions in SplitScanIntoUnionScans

Unions generated in SplitScanIntoUnionScans no longer reuse column IDs
from their left children as output column IDs. Reusing column IDs in
this way has shown to be dangerous (see #58434).

Release note: None

#### opt: add Union column ID check to CheckExpr

A check has been added to `CheckExpr` that asserts that the output
columns of `Union`s and `UnionAll`s are not reused from the left or
right inputs of the union. Reusing columns in this way is dangerous
(see #58434).

Release note: None


Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
3 people committed Jan 7, 2021
4 parents c59186a + 039fb1b + 5797332 + 80d7605 commit d8b5cb0
Show file tree
Hide file tree
Showing 44 changed files with 2,372 additions and 870 deletions.
19 changes: 18 additions & 1 deletion pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,24 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "streamingccl",
srcs = ["stream_ingestion_job.go"],
srcs = [
"stream_ingestion_job.go",
"stream_ingestion_processor_planning.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"@com_github_cockroachdb_logtags//:logtags",
],
)
85 changes: 84 additions & 1 deletion pkg/ccl/streamingccl/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,89 @@

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

type streamIngestionResumer struct {
job *jobs.Job
}

func ingest(
ctx context.Context,
execCtx sql.JobExecContext,
streamAddress streamclient.PartitionAddress,
job *jobs.Job,
) error {
// Initialize a stream client and resolve topology.
client := streamclient.NewStreamClient()
sa := streamclient.StreamAddress(streamAddress)
topology, err := client.GetTopology(sa)
if err != nil {
return err
}

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, err := distStreamIngestionPlanSpecs(topology, nodes)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, planCtx, dsp, streamIngestionSpecs)
if err != nil {
return err
}

return nil
}

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(
ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

err := ingest(ctx, p, streamclient.PartitionAddress(details.StreamAddress), s.job)
if err != nil {
return err
}

// TODO(adityamaru): We probably want to use the resultsCh to indicate that
// the processors have completed setup. We can then return the job ID in the
// plan hook similar to how changefeeds do it.

return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
// TODO: Implement me.
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
}
109 changes: 109 additions & 0 deletions pkg/ccl/streamingccl/stream_ingestion_processor_planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/logtags"
)

// TODO(adityamaru): Figure out what the processors will return.
var streamIngestionResultTypes = []*types.T{}

func distStreamIngestionPlanSpecs(
topology streamclient.Topology, nodes []roachpb.NodeID,
) ([]*execinfrapb.StreamIngestionDataSpec, error) {

// For each stream partition in the topology, assign it to a node.
streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(nodes))

for i, partition := range topology.Partitions {
// Round robin assign the stream partitions to nodes. Partitions 0 through
// len(nodes) - 1 creates the spec. Future partitions just add themselves to
// the partition addresses.
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
PartitionAddress: make([]streamclient.PartitionAddress, 0),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
}
n := i % len(nodes)
streamIngestionSpecs[n].PartitionAddress = append(streamIngestionSpecs[n].PartitionAddress, partition)
}

return streamIngestionSpecs, nil
}

func distStreamIngest(
ctx context.Context,
execCtx sql.JobExecContext,
nodes []roachpb.NodeID,
planCtx *sql.PlanningCtx,
dsp *sql.DistSQLPlanner,
streamIngestionSpecs []*execinfrapb.StreamIngestionDataSpec,
) error {
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn

if len(streamIngestionSpecs) == 0 {
return nil
}

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
for i := range streamIngestionSpecs {
corePlacement[i].NodeID = nodes[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(
corePlacement,
execinfrapb.PostProcessSpec{},
streamIngestionResultTypes,
execinfrapb.Ordering{},
)

// TODO(adityamaru): It is likely that we will add a StreamIngestFrontier
// processor on the coordinator node. All the StreamIngestionProcessors will
// feed their results into this frontier. This is similar to the relationship
// between the ChangeAggregator and ChangeFrontier processors. The
// StreamIngestFrontier will be responsible for updating the job watermark
// with the min of the resolved ts outputted by all the processors.

// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap.
dsp.FinalizePlan(planCtx, p)

recv := sql.MakeDistSQLReceiver(
ctx,
// TODO(adityamaru): Are there any results we want to surface to the user?
nil, /* resultWriter */
tree.Rows,
nil, /* rangeCache */
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
)
defer recv.Release()

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return nil
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"client.go",
"event.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import (
"github.com/stretchr/testify/require"
)

type mockStreamClient struct{}
type testStreamClient struct{}

var _ Client = mockStreamClient{}
var _ Client = testStreamClient{}

// GetTopology implements the Client interface.
func (sc mockStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
func (sc testStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
return Topology{Partitions: []PartitionAddress{
"s3://my_bucket/my_stream/partition_1",
"s3://my_bucket/my_stream/partition_2",
}}, nil
}

// ConsumePartition implements the Client interface.
func (sc mockStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (chan Event, error) {
func (sc testStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (chan Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
Expand All @@ -51,7 +51,7 @@ func (sc mockStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (ch
// TestExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
client := mockStreamClient{}
client := testStreamClient{}
sa := StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/streamingccl/streamclient/stream_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient

import "time"

// client is a mock stream client.
type client struct{}

var _ Client = &client{}

// NewStreamClient returns a new mock stream client.
func NewStreamClient() Client {
return &client{}
}

// GetTopology implements the Client interface.
func (m *client) GetTopology(address StreamAddress) (Topology, error) {
panic("unimplemented mock method")
}

// ConsumePartition implements the Client interface.
func (m *client) ConsumePartition(
address PartitionAddress, startTime time.Time,
) (chan Event, error) {
panic("unimplemented mock method")
}
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/roachpb",
"//pkg/security",
"//pkg/sql/catalog/descpb",
Expand Down
Loading

0 comments on commit d8b5cb0

Please sign in to comment.