Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: paired joiners to accomplish left joins #54639

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 37
const Version execinfrapb.DistSQLVersion = 38

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
Expand All @@ -51,6 +51,10 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 37

Please add new entries at the top.

- Version: 38 (MinAcceptedVersion: 38)
- A paired joiner approach for inverted joins was added, for left
outer/semi/anti joins involving the invertedJoiner and joinReader.

- Version: 37 (MinAcceptedVersion: 37)
- An InterleavedReaderJoiner processor was removed, and the old processor
spec would be unrecognized by a server running older versions, hence the
Expand Down
540 changes: 337 additions & 203 deletions pkg/sql/execinfrapb/processors_sql.pb.go

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,55 @@ message IndexSkipTableReaderSpec {
//
// If performing an index join (where a = c and b = d) (lookup columns is []):
// Internal columns: | c | d | e |
//
// There is a special case when a "join reader" is used as the second join in
// a pair of joins to accomplish a LEFT_OUTER, LEFT_SEMI or LEFT_ANTI join.
// The first join in this pair of joins is unable to precisely evaluate the
// join condition and produces false positives. This is typical when the first
// join is an inverted join (see InvertedJoinerSpec), but can also be the case
// when the first join is being evaluated over an index that does not have all
// the columns needed to evaluate the join condition. The first join outputs
// rows in sorted order of the original left columns. The input stream columns
// for the second join are a combination of the original left columns and the
// lookup columns. The first join additionally adds a continuation column that
// demarcates a group of successive rows that correspond to an original left
// row. The first row in a group contains false (since it is not a
// continuation of the group) and successive rows contain true.
//
// The mapping from the original join to the pair of joins is:
// LEFT_OUTER => LEFT_OUTER, LEFT_OUTER
// LEFT_SEMI => INNER, LEFT_SEMI (better than doing INNER, INNER, SORT, DISTINCT)
// LEFT_ANTI => LEFT_OUTER, LEFT_ANTI.
// where the first join always preserves order.
//
// More specifically, consider a lookup join example where the input stream
// columns are: | a | b | c | d | cont |.
// The lookup column is | d |. And the table columns are | e | f | with
// d = e.
// This join reader can see input of the form
// a1, b1, c1, d1, false
// a1, b1, c1, d2, true
// a1, b2, c1, null, false // when the first join is LEFT_OUTER
// a2, b1, c1, d3, false
// a2, b1, c1, d4, true
//
// Say both the results for (a1, b1, c1) are false positives, and the first
// of the (a2, b1, c1) result is a false positive.
// The output for LEFT_OUTER:
// a1, b1, c1, d1, false, null, null
// a1, b2, c1, null, false, null, null
// a2, b1, c1, d4, true, d4, f1
// The d, cont columns are not part of the original left row, so will be
// projected away after the join.
//
// The output for LEFT_ANTI:
// a1, b1, c1, d1, false
// a1, b2, c1, null, false
// Again, the d, cont columns will be projected away after the join.
//
// The output for LEFT_SEMI:
// a2, b1, c1, d4, true
// Again, the d, cont columns will be projected away after the join.
message JoinReaderSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -263,6 +312,10 @@ message JoinReaderSpec {
// result of the secondary index joined against the primary index is
// expected to contain the materialized system columns.
optional bool has_system_columns = 13 [(gogoproto.nullable) = false];

// LeftJoinWithPairedJoiner is used when a left {outer,anti,semi} join is
// being achieved by pairing two joins. See the comment above.
optional bool left_join_with_paired_joiner = 14 [(gogoproto.nullable) = false];
}

// SorterSpec is the specification for a "sorting aggregator". A sorting
Expand Down Expand Up @@ -484,6 +537,18 @@ message HashJoinerSpec {
// that was indexed). For LEFT_SEMI and LEFT_ANTI, the "internal columns" are
// the columns of the left input.
//
// In many cases, the inverted join will contain false positives wrt the
// original join condition. This is handled by pairing it with a lookup join.
// This pairing works naturally when the user query specified INNER, by
// running an INNER inverted join followed by INNER lookup join. For a user
// query with LEFT_OUTER/LEFT_ANTI, the inverted join is run as a LEFT_OUTER
// with a special mode that outputs an additional bool column that represents
// whether this row is a continuation of a group, where a group is defined as
// rows corresponding to the same original left row. This is paired with a
// lookup join that also knows about the semantics of this bool column. For a
// user query with LEFT_SEMI, the inverted join is run as an INNER join with
// the same special mode. See the JoinReaderSpec for an example.
//
// Example:
// Input stream columns: | a | b |
// Table columns: | c | d | e |
Expand All @@ -498,6 +563,9 @@ message HashJoinerSpec {
// Internal columns for INNER and LEFT_OUTER: | a | b | c | d | e |
// where d, e are not populated.
// Internal columns for LEFT_SEMI and LEFT_ANTI: | a | b |
//
// For INNER/LEFT_OUTER with OutputGroupContinuationForLeftRow = true, the
// internal columns include an additional bool column as the last column.
message InvertedJoinerSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -545,6 +613,11 @@ message InvertedJoinerSpec {
// Indicates that the inverted joiner should maintain the ordering of the
// input stream.
optional bool maintain_ordering = 7 [(gogoproto.nullable) = false];

// Indicates that the join should output a continuation column that
// indicates whether a row is a continuation of a group corresponding to a
// left row.
optional bool output_group_continuation_for_left_row = 8 [(gogoproto.nullable) = false];
}

// InvertedFiltererSpec is the specification of a processor that does filtering
Expand Down
53 changes: 46 additions & 7 deletions pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type invertedJoiner struct {
// A row with one element, corresponding to an encoded inverted column
// value. Used to construct the span of the index for that value.
invertedColRow rowenc.EncDatumRow

outputContinuationCol bool
}

var _ execinfra.Processor = &invertedJoiner{}
Expand Down Expand Up @@ -201,12 +203,18 @@ func newInvertedJoiner(
if ij.joinType == descpb.InnerJoin || ij.joinType == descpb.LeftOuterJoin {
outputColCount += len(rightColTypes)
includeRightCols = true
if spec.OutputGroupContinuationForLeftRow {
outputColCount++
}
}
outputColTypes := make([]*types.T, 0, outputColCount)
outputColTypes = append(outputColTypes, ij.inputTypes...)
if includeRightCols {
outputColTypes = append(outputColTypes, rightColTypes...)
}
if spec.OutputGroupContinuationForLeftRow {
outputColTypes = append(outputColTypes, types.Bool)
}
if err := ij.ProcessorBase.Init(
ij, post, outputColTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand All @@ -227,7 +235,11 @@ func newInvertedJoiner(
if err := ij.onExprHelper.Init(spec.OnExpr, onExprColTypes, semaCtx, ij.EvalCtx); err != nil {
return nil, err
}
ij.combinedRow = make(rowenc.EncDatumRow, 0, len(onExprColTypes))
combinedRowLen := len(onExprColTypes)
if spec.OutputGroupContinuationForLeftRow {
combinedRowLen++
}
ij.combinedRow = make(rowenc.EncDatumRow, 0, combinedRowLen)

if ij.datumsToInvertedExpr == nil {
var invertedExprHelper execinfrapb.ExprHelper
Expand Down Expand Up @@ -298,6 +310,8 @@ func newInvertedJoiner(
ij.diskMonitor,
)

ij.outputContinuationCol = spec.OutputGroupContinuationForLeftRow

return ij, nil
}

Expand Down Expand Up @@ -496,6 +510,9 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ
return ijEmittingRows, nil
}

var trueEncDatum = rowenc.DatumToEncDatum(types.Bool, tree.DBoolTrue)
var falseEncDatum = rowenc.DatumToEncDatum(types.Bool, tree.DBoolFalse)

// emitRow returns the next row from ij.emitCursor, if present. Otherwise it
// prepares for another input batch.
func (ij *invertedJoiner) emitRow() (
Expand Down Expand Up @@ -532,7 +549,8 @@ func (ij *invertedJoiner) emitRow() (
if !seenMatch {
switch ij.joinType {
case descpb.LeftOuterJoin:
return ijEmittingRows, ij.renderUnmatchedRow(ij.inputRows[inputRowIdx]), nil
ij.renderUnmatchedRow(ij.inputRows[inputRowIdx])
return ijEmittingRows, ij.combinedRow, nil
case descpb.LeftAntiJoin:
return ijEmittingRows, ij.inputRows[inputRowIdx], nil
}
Expand Down Expand Up @@ -564,9 +582,22 @@ func (ij *invertedJoiner) emitRow() (
return nil
}
if renderedRow != nil {
seenMatch := ij.emitCursor.seenMatch
ij.emitCursor.seenMatch = true
switch ij.joinType {
case descpb.InnerJoin, descpb.LeftOuterJoin:
if ij.outputContinuationCol {
if seenMatch {
// This is not the first row output for this left row, so set the
// group continuation to true.
ij.combinedRow = append(ij.combinedRow, trueEncDatum)
} else {
// This is the first row output for this left row, so set the group
// continuation to false.
ij.combinedRow = append(ij.combinedRow, falseEncDatum)
}
renderedRow = ij.combinedRow
}
return ijEmittingRows, renderedRow, nil
case descpb.LeftSemiJoin:
// Skip the rest of the joined rows.
Expand All @@ -588,7 +619,8 @@ func (ij *invertedJoiner) emitRow() (
}

// render constructs a row with columns from both sides. The ON condition is
// evaluated; if it fails, returns nil.
// evaluated; if it fails, returns nil. When it returns a non-nil row, it is
// identical to ij.combinedRow.
func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, error) {
ij.combinedRow = append(ij.combinedRow[:0], lrow...)
ij.combinedRow = append(ij.combinedRow, rrow...)
Expand All @@ -601,14 +633,21 @@ func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatum
return ij.combinedRow, nil
}

// renderUnmatchedRow creates a result row given an unmatched row.
func (ij *invertedJoiner) renderUnmatchedRow(row rowenc.EncDatumRow) rowenc.EncDatumRow {
ij.combinedRow = append(ij.combinedRow[:0], row...)
// renderUnmatchedRow creates a result row given an unmatched row and
// stores it in ij.combinedRow.
func (ij *invertedJoiner) renderUnmatchedRow(row rowenc.EncDatumRow) {
ij.combinedRow = ij.combinedRow[:cap(ij.combinedRow)]
// Copy the left row.
copy(ij.combinedRow, row)
// Set the remaining columns to NULL.
for i := len(row); i < len(ij.combinedRow); i++ {
ij.combinedRow[i].Datum = tree.DNull
}
return ij.combinedRow
if ij.outputContinuationCol {
// The last column is the continuation column, so set it to false since
// this is the only output row for this group.
ij.combinedRow[len(ij.combinedRow)-1] = falseEncDatum
}
}

func (ij *invertedJoiner) transformToKeyRow(row rowenc.EncDatumRow) {
Expand Down
Loading