Skip to content

Commit

Permalink
Merge pull request #19961 from lego/scrub-physical-implementation
Browse files Browse the repository at this point in the history
sql,distsqlrun: Add SCRUB checks for physical table integrity
  • Loading branch information
lgo authored Dec 6, 2017
2 parents 09ebec5 + 094a05c commit aae0424
Show file tree
Hide file tree
Showing 24 changed files with 1,488 additions and 320 deletions.
118 changes: 58 additions & 60 deletions docs/RFCS/20171120_scrub_index_and_physical_implementation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- Feature Name: SCRUB index and physical check implementation
- Status: in-progress
- Status: completed
- Start Date: 2017-10-11
- Authors: Joey Pereira
- RFC PR: [#19327](https://github.com/cockroachdb/cockroach/pull/19327)
Expand Down Expand Up @@ -97,22 +97,22 @@ The following changes are introduced for running physical checks:
the logic to scan relevant tables. This plan will run through distSQL,
similar to any regular plan.

- Add a `RunCheck` enum to `scanNode` and plumb it through the
`TableReader` for `RowFetcher` initialization during distSQL planning.
- Add a `IsCheck` enum to `scanNode` and plumb it through the
`TableReader` for `MultiRowFetcher` initialization during distSQL planning.
This enum would only be meaningfully set during planning of `SCRUB`
when doing a physical check.

- Modify the `RowFetcher` to conditionally run check code in existing
row fetching code paths, using the `RunCheck` indicator. For example,
- Modify the `MultiRowFetcher` to conditionally run check code in existing
row fetching code paths, using the `IsCheck` indicator. For example,
we want to return an error instead of panic when NULL is encountered
on a non-NULL column.

- Add a new iterator function to `RowFetcher`, `NextCheckError`, for
- Add a new iterator function to `MultiRowFetcher`, `NextRowWithErrors`, for
just scanning for errors. In here we can execute additional checking
code on a separate code path than the critical fetching one. This lets
us consume check errors instead of row data.

- Add logic to `TableReader` to consume `NextCheckError` and handle the
- Add logic to `TableReader` to consume `NextRowWithErrors` and handle the
difference between wanted rows and returned rows (check error row
type). This may potentially require a new `TableChecker`. With this
change, we are able to retrieve the error results desired.
Expand Down Expand Up @@ -143,7 +143,7 @@ the feasibility and time-effort of the possibilities.
## 1. Index checks and how they will be implemented.

Checking the index can be expressed through a rather simple logical SQL
plan. The final result will return all erroneous index entries -- both
plan. The final result will return all erroneous index entries -- both
dangling references and missing entries. In short, this can be broken
down into a few stages of processing:

Expand All @@ -170,46 +170,44 @@ represented by:
```sql
SELECT p.pkey, p.name, s.pkey, s.name
FROM
t1@{FORCE_INDEX=primary,NO_INDEX_JOIN} as p
(SELECT pkey, name FROM t1@{FORCE_INDEX=primary,NO_INDEX_JOIN} ORDER BY pkey, name) AS p
FULL OUTER JOIN
t1@{FORCE_INDEX=name_idx,NO_INDEX_JOIN} as s
(SELECT pkey, name FROM t1@{FORCE_INDEX=name_idx,NO_INDEX_JOIN} ORDER BY pkey, name) AS s
ON
((p.pkey IS NOT NULL and s.pkey IS NOT NULL and p.pkey = s.pkey) OR
(p.pkey IS NULL AND s.pkey IS NULL)) AND
((p.name IS NOT NULL and s.name IS NOT NULL and p.name = s.name) OR
(p.name IS NULL AND s.name IS NULL)) AND
p.pkey = s.pkey OR (p.pkey IS NULL AND s.pkey IS NULL)) AND
(p.name = s.name OR (p.name IS NULL AND s.name IS NULL))
WHERE
(p.pkey IS NULL AND p.name IS NULL) OR
(s.pkey IS NULL AND s.name IS NULL)
```

In short, this query is:
1) Scanning the primary index and the secondary index.
2) Joining them on all of the secondary index columns and extra columns
that are equal.
2) Joining them on all of the secondary index columns and extra columns
that are equivalent. This is a fairly verbose check due to
equivilancy properties when involving NULLs in order to be an
anti-join.
3) Filtering to achieve an anti-join. The first line of the predicate
takes rows on the right for the anti-join. The second line of the
predicate takes rows on the left for the anti-join.

Because this is an anti-join, the results are as follows:
- If any primary index column on the left is NULL, that means the right
columns are present. This is because of the invariant that primary
index columns are never null.
- Otherwise, the left columns are present.

As for the overall performance of this operation, if N is the number of
primary index k/v rows on a node, and M is the number of secondary index
k/v rows on a node. Then each node will scan exactly N+M rows and stream
them to a joiner.

Because the inputs will not be sorted similarily and will be fairly
large, this join will ideally be a hash join. In this scenario, the
memory used on each node will be at most the total data divided by the
joiners, and temp storage will be used if necessary.
In short, this query is:
1) Scanning the primary index and the secondary index.
2) Ordering both of them by the primary then secondary index columns.
This is done to force a distSQL merge join.
3) Joining both sides on all of the columns contained by the secondary
index key-value pairs.
4) Filtering to achieve an anti-join. It looks for when the primary
index values are null, as they are non-nullable columns. The first
line of the predicate takes rows on the right for the anti-join.
The second line of the predicate takes rows on the left for the
anti-join.

Because this is an anti-join, the results are as follows:
- If any primary index column on the left is NULL, that means the
right columns are present. This is because of the invariant that
primary index columns are never null.
- Otherwise, the left columns is present.

Now, a problem comes up because of the join predicate. The query above
will be run as a hash joiner, with only one bucket. This will be very
slow. As a workaround, the `p.col = s.col OR (p.col IS NULL AND s.col IS NULL)`
predicates are replaced with just `p.col = s.col` so that the query can
be run using a merge joiner, hence the explicit orders and step 2 in the
query. The distSQL plan generated from this query has the
MergeJoinerSpec.NullEquality set to true, to make NULL = NULL evaluate
to true, making the two predicates equivilant while allowing for a far
more efficient execution/

[1]: Relevant index data includes the columns, extra columns, and store
columns of the index.
Expand All @@ -223,11 +221,11 @@ groupings of certain data (column families), how index data is organized

Most of the checks that involve the physical table are encoding and
related data validity. This data is already partially checked during
scanning in `RowFetcher`. Because of this, adding checks is a matter of
scanning in `MultiRowFetcher`. Because of this, adding checks is a matter of
a few additive changes, and handling the error to instead return row
results.

The checks we can implement in `RowFetcher` include:
The checks we can implement in `MultiRowFetcher` include:
- Tables using [FAMILY][] have the data organized into families
correctly
- Invalid data encodings
Expand All @@ -237,10 +235,10 @@ The checks we can implement in `RowFetcher` include:
- `NOT NULL` constraint violation (not physical table, but checked at
this layer)

The underlying scanning is in the `RowFetcher`. Inside the current
`RowFetcher` the method `processKv` is responsible for encoding specific
work as it iterates over the k/v, which is where the checks will be
added, including:
The underlying scanning is in the `MultiRowFetcher`. Inside the current
`MultiRowFetcher` the method `processKv` is responsible for encoding
specific work as it iterates over the k/v, which is where the checks
will be added, including:
- Check composite encodings are correct and round-trip
- Check key encodings correctly reflect ordering of decoded values
- Check the KV pair is well-formed (everything is present, no extra
Expand All @@ -257,15 +255,15 @@ and the returned values will be the `CheckError` as found in the
there will be very few results relative to the data scanned.

Invalid data encoding (and others) already return an `error` while
scanning. We want to capture errors in `RowFetcher` that come from any
external function calls and wrap them in a `scrub.Error` struct
with an error code annotated.
scanning. We want to capture errors in `MultiRowFetcher` that come from
any external function calls and wrap them in a `scrub.Error` struct with
an error code annotated.

Those error structs will be captured further upstream inside the
TableReader and translated into the row results. In the existing
scanning code path, this same error struct will simply be unwrapped.

For example, consider this excerpt from `RowFetcher.processValueSingle`:
For example, consider this excerpt from `MultiRowFetcher.processValueSingle`:

```go
value, err := UnmarshalColumnValue(rf.alloc, typ, kv.Value)
Expand All @@ -286,7 +284,7 @@ Another chunk of the work will then be plumbing through the check error
results out as the table schema we are scanning will differ from the
result rows representing errors. To get around this we must manually
construct the distSQL physical plan. This will also take some extra code
to wrap the call from TableReader to RowFetcher where it will transform
to wrap the call from TableReader to MultiRowFetcher where it will transform
any caught errors.

[SCRUB interface RFC]: https://github.com/cockroachdb/cockroach/pull/18675
Expand Down Expand Up @@ -321,7 +319,7 @@ A challenge here is producing both check errors and the necessary data
to run index checks simultaneously. It is not currently known if
multiple different schemas can be streamed out of one table reader.

In the `RowFetcher`, this may look like a `NextRowOrCheckError` iterator
In the `MultiRowFetcher`, this may look like a `NextRowOrCheckError` iterator
that may return either row data or check error data, and we emit those
to the corresponding outputs.

Expand All @@ -330,8 +328,8 @@ stream which filters out only the columns desired for each consumer --
either a specific index check (a joiner), or the final result errors (in
final aggregation).

Alternatively, a crazy idea may modify the RowFetcher or TableReader to
produce two separate schemas, branching off the idea of MultiRowFetchers
Alternatively, a crazy idea may modify the MultiRowFetcher or TableReader to
produce two separate schemas, branching off the idea of MultiMultiRowFetchers
made for the adjustments for interleaved tables.


Expand Down Expand Up @@ -386,16 +384,16 @@ This has two major problems:

This is by far the easiest method to do as it only involves:
1) Collect the required secondary index data. The first code path that
accesses required data is in `RowFetcher`.
accesses required data is in `MultiRowFetcher`.
2) Do point-lookups for the secondary index entry, and compare.

Step 1 is ideally done in the `RowFetcher` as this is also where we can
Step 1 is ideally done in the `MultiRowFetcher` as this is also where we can
avoid passing decoded data further upstream, when we only need check
errors.

Step 2 can be optimized by batching the k/v lookups, which would be a
trivial mechanism to add inside `RowFetcher`. In this case, having an
alternative `NextRow` called `NextCheckError` makes more sense to not
trivial mechanism to add inside `MultiRowFetcher`. In this case, having an
alternative `NextRow` called `NextRowWithErrors` makes more sense to not
add this code to the critical code path.

A [proof of concept] has been done where this does not batch lookups or
Expand Down Expand Up @@ -455,7 +453,7 @@ may be a problem though, as this will run all checks in parallel and not
in sequence, which will adversely affect foreground traffic.

Lastly, we may need to have an alternative to a `TableReader`, call it
`TableChecker`, where we call the iterator `NextCheckError` instead of
`TableChecker`, where we call the iterator `NextRowWithErrors` instead of
`NextRow` to produce all physical errors.

# Unresolved questions
8 changes: 8 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ func initTableReaderSpec(
s := distsqlrun.TableReaderSpec{
Table: *n.desc,
Reverse: n.reverse,
IsCheck: n.run.isCheck,
}
if n.index != &n.desc.PrimaryIndex {
for i := range n.desc.Indexes {
Expand All @@ -671,6 +672,13 @@ func initTableReaderSpec(
}
}

// When a TableReader is running scrub checks, do not allow a
// post-processor. This is because the outgoing stream is a fixed
// format (distsqlrun.ScrubTypes).
if n.run.isCheck {
return s, distsqlrun.PostProcessSpec{}, nil
}

post := distsqlrun.PostProcessSpec{
Filter: distsqlplan.MakeExpression(n.filter, evalCtx, nil),
}
Expand Down
78 changes: 78 additions & 0 deletions pkg/sql/distsql_plan_scrub_physical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sql

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// createScrubPhysicalCheck generates a plan for running a physical
// check for an index. The plan consists of TableReaders, with IsCheck
// enabled, that scan an index span. By having IsCheck enabled, the
// TableReaders will only emit errors encountered during scanning
// instead of row data. The plan is finalized.
func (dsp *DistSQLPlanner) createScrubPhysicalCheck(
planCtx *planningCtx,
n *scanNode,
desc sqlbase.TableDescriptor,
indexDesc sqlbase.IndexDescriptor,
spans []roachpb.Span,
readAsOf hlc.Timestamp,
) (physicalPlan, error) {
spec, _, err := initTableReaderSpec(n, planCtx.evalCtx)
if err != nil {
return physicalPlan{}, err
}

spanPartitions, err := dsp.partitionSpans(planCtx, n.spans)
if err != nil {
return physicalPlan{}, err
}

var p physicalPlan
stageID := p.NewStageID()
p.ResultRouters = make([]distsqlplan.ProcessorIdx, len(spanPartitions))
for i, sp := range spanPartitions {
tr := &distsqlrun.TableReaderSpec{}
*tr = spec
tr.Spans = make([]distsqlrun.TableReaderSpan, len(sp.spans))
for j := range sp.spans {
tr.Spans[j].Span = sp.spans[j]
}

proc := distsqlplan.Processor{
Node: sp.node,
Spec: distsqlrun.ProcessorSpec{
Core: distsqlrun.ProcessorCoreUnion{TableReader: tr},
Output: []distsqlrun.OutputRouterSpec{{Type: distsqlrun.OutputRouterSpec_PASS_THROUGH}},
StageID: stageID,
},
}

pIdx := p.AddProcessor(proc)
p.ResultRouters[i] = pIdx
}

// Set the plan's result types to be ScrubTypes.
p.ResultTypes = distsqlrun.ScrubTypes
p.planToStreamColMap = identityMapInPlace(make([]int, len(distsqlrun.ScrubTypes)))

dsp.FinalizePlan(planCtx, &p)
return p, nil
}
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (cb *columnBackfiller) init() error {
ValNeededForCol: valNeededForCol,
}
return cb.fetcher.Init(
false /* reverse */, false /* returnRangeInfo */, &cb.alloc, tableArgs,
false /* reverse */, false /* returnRangeInfo */, false /* isCheck */, &cb.alloc, tableArgs,
)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsqlrun/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (ib *indexBackfiller) init() error {
ValNeededForCol: valNeededForCol,
}
return ib.fetcher.Init(
false /* reverse */, false /* returnRangeInfo */, &ib.alloc, tableArgs,
false /* reverse */, false /* returnRangeInfo */, false /* isCheck */, &ib.alloc, tableArgs,
)
}

Expand Down Expand Up @@ -167,6 +168,7 @@ func (ib *indexBackfiller) runChunk(
for i := int64(0); i < chunkSize; i++ {
encRow, _, _, err := ib.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return nil, err
}
if encRow == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/distsqlrun/interleaved_reader_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -185,7 +186,8 @@ func (irj *interleavedReaderJoiner) initMultiRowFetcher(
}
}

return irj.fetcher.Init(reverseScan, true /* returnRangeInfo */, alloc, args...)
return irj.fetcher.Init(reverseScan, true /* returnRangeInfo */, true /* isCheck */, alloc,
args...)
}

// sendMisplannedRangesMetadata sends information about the non-local ranges
Expand Down Expand Up @@ -234,6 +236,7 @@ func (irj *interleavedReaderJoiner) Run(ctx context.Context, wg *sync.WaitGroup)
row, desc, index, err := irj.fetcher.NextRow(ctx)
if err != nil || row == nil {
if err != nil {
err = scrub.UnwrapScrubError(err)
irj.out.output.Push(nil /* row */, ProducerMetadata{Err: err})
}
break
Expand Down
Loading

0 comments on commit aae0424

Please sign in to comment.