Skip to content

Commit

Permalink
Merge #56161
Browse files Browse the repository at this point in the history
56161: sql: add support for joinReader to be the first join in paired joins r=sumeerbhola a=sumeerbhola

This is to allow lookup joins to be used for left outer/semi/anti
joins with non-covering indexes. Currently only semi joins for
this case can use the index (by doing two inner joins and a DistinctOn)

Paired joins with a non-covering index will be used as follows:
- Left outer join: will become a pair of left outer lookup joins.
- Left anti join: will be a left outer lookup join followed by
  a left anti lookup join.
- Left semi join: will be an inner lookup join followed by a
  left semi lookup join.

This PR does not contain the optimizer changes.

Informs #55452

Release note: None

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
craig[bot] and sumeerbhola committed Nov 5, 2020
2 parents 717b9df + 37ed2f4 commit c456295
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 413 deletions.
9 changes: 7 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 40
const Version execinfrapb.DistSQLVersion = 41

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
Expand All @@ -51,6 +51,11 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 40
Please add new entries at the top.
- Version: 41 (MinAcceptedVersion: 40)
- A paired joiner approach for lookup joins was added, for left
outer/semi/anti joins involving a pair of joinReaders, where the
first join uses a non-covering index.
- Version: 40 (MinAcceptedVersion: 40)
- A new field was added execinfrapb.ProcessorSpec to propagate the result
column types of a processor. This change is not backwards compatible
Expand All @@ -62,7 +67,7 @@ Please add new entries at the top.
protobuf struct for easier propagation to the remote nodes during the
execution.
- Version: 38 (MinAcceptedVersion: 38)
- Version: 38 (MinAcceptedVersion: 37)
- A paired joiner approach for inverted joins was added, for left
outer/semi/anti joins involving the invertedJoiner and joinReader.
Expand Down
452 changes: 247 additions & 205 deletions pkg/sql/execinfrapb/processors_sql.pb.go

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ message IndexSkipTableReaderSpec {
// The output for LEFT_SEMI:
// a2, b1, c1, d4, true
// Again, the d, cont columns will be projected away after the join.
//
// The example above is for a lookup join as the second join in the
// paired-joins. The lookup join can also be the first join in the
// paired-joins, which is indicated by both
// OutputGroupContinuationForLeftRow and MaintainOrdering set to true.
message JoinReaderSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -314,8 +319,16 @@ message JoinReaderSpec {
optional bool has_system_columns = 13 [(gogoproto.nullable) = false];

// LeftJoinWithPairedJoiner is used when a left {outer,anti,semi} join is
// being achieved by pairing two joins. See the comment above.
// being achieved by pairing two joins, and this is the second join. See
// the comment above.
optional bool left_join_with_paired_joiner = 14 [(gogoproto.nullable) = false];

// OutputGroupContinuationForLeftRow indicates that this join is the first
// join in the paired-joins. At most one of OutputGroupContinuationForLeftRow
// and LeftJoinWithPairedJoiner must be true. Additionally, if
// OutputGroupContinuationForLeftRow is true, MaintainOrdering must also
// be true.
optional bool output_group_continuation_for_left_row = 15 [(gogoproto.nullable) = false];
}

// SorterSpec is the specification for a "sorting aggregator". A sorting
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func newHashJoiner(
spec.OnExpr,
spec.LeftEqColumns,
spec.RightEqColumns,
false, /* outputContinuationColumn */
post,
output,
execinfra.ProcStateOpts{
Expand Down
31 changes: 22 additions & 9 deletions pkg/sql/rowexec/joinerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (jb *joinerBase) init(
onExpr execinfrapb.Expression,
leftEqColumns []uint32,
rightEqColumns []uint32,
outputContinuationColumn bool,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
opts execinfra.ProcStateOpts,
Expand All @@ -74,18 +75,26 @@ func (jb *joinerBase) init(
jb.eqCols[leftSide] = leftEqColumns
jb.eqCols[rightSide] = rightEqColumns

size := len(leftTypes) + len(rightTypes)
jb.combinedRow = make(rowenc.EncDatumRow, size)
rowSize := len(leftTypes) + len(rightTypes)
if outputContinuationColumn {
// NB: Can only be true for inner joins and left outer joins.
rowSize++
}
jb.combinedRow = make(rowenc.EncDatumRow, rowSize)

condTypes := make([]*types.T, 0, size)
// condTypes does not include the continuation column, but the slice has the
// capacity for it, since outputTypes later adds the continuation column.
condTypes := make([]*types.T, 0, rowSize)
condTypes = append(condTypes, leftTypes...)
condTypes = append(condTypes, rightTypes...)

outputSize := len(leftTypes)
if jb.joinType.ShouldIncludeRightColsInOutput() {
outputSize += len(rightTypes)
outputTypes := condTypes
// NB: outputContinuationCol implies jb.joinType.ShouldIncludeRightColsInOutput()
if !jb.joinType.ShouldIncludeRightColsInOutput() {
outputTypes = outputTypes[:len(leftTypes)]
} else if outputContinuationColumn {
outputTypes = append(outputTypes, types.Bool)
}
outputTypes := condTypes[:outputSize]

if err := jb.ProcessorBase.Init(
self, post, outputTypes, flowCtx, processorID, output, nil /* memMonitor */, opts,
Expand Down Expand Up @@ -119,7 +128,9 @@ func (j joinSide) String() string {
}

// renderUnmatchedRow creates a result row given an unmatched row on either
// side. Only used for outer joins.
// side. Only used for outer joins. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) renderUnmatchedRow(row rowenc.EncDatumRow, side joinSide) rowenc.EncDatumRow {
lrow, rrow := jb.emptyLeft, jb.emptyRight
if side == leftSide {
Expand Down Expand Up @@ -158,7 +169,9 @@ func shouldEmitUnmatchedRow(side joinSide, joinType descpb.JoinType) bool {
}

// render constructs a row with columns from both sides. The ON condition is
// evaluated; if it fails, returns nil.
// evaluated; if it fails, returns nil. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, error) {
jb.combinedRow = jb.combinedRow[:len(lrow)+len(rrow)]
copy(jb.combinedRow, lrow)
Expand Down
59 changes: 37 additions & 22 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,26 @@ type joinReader struct {
// (or evaluate it accurately, as is sometimes the case with inverted
// indexes). The first join is running a left outer or inner join, and each
// group of rows seen by the second join correspond to one left row.
//
// TODO(sumeer): add support for joinReader to also be the first
// join in this pair, say when the index can evaluate most of the
// join condition (the part with low selectivity), but not all.
// Currently only the invertedJoiner can serve as the first join
// in the pair.

// The input rows in the current batch belong to groups which are tracked in
// groupingState. The last row from the last batch is in
// lastInputRowFromLastBatch -- it is tracked because we don't know if it
// was the last row in a group until we get to the next batch. NB:
// groupingState is used even when there is no grouping -- we simply have
// groups of one.
// groups of one. The no grouping cases include the case of this join being
// the first join in the paired joins.
groupingState *inputBatchGroupingState

lastBatchState struct {
lastInputRow rowenc.EncDatumRow
lastGroupMatched bool
lastGroupContinued bool
}

// Set to true when this is the first join in the paired-joins (see the
// detailed comment in the spec). This can never be true for index joins,
// and requires that the spec has MaintainOrdering set to true.
outputGroupContinuationForLeftRow bool
}

var _ execinfra.Processor = &joinReader{}
Expand All @@ -153,6 +153,10 @@ func newJoinReader(
if spec.IndexIdx != 0 && readerType == indexJoinReaderType {
return nil, errors.AssertionFailedf("index join must be against primary index")
}
if spec.OutputGroupContinuationForLeftRow && !spec.MaintainOrdering {
return nil, errors.AssertionFailedf(
"lookup join must maintain ordering since it is first join in paired-joins")
}

var lookupCols []uint32
switch readerType {
Expand All @@ -168,15 +172,14 @@ func newJoinReader(
return nil, errors.Errorf("unsupported joinReaderType")
}
jr := &joinReader{
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
}
if readerType != indexJoinReaderType {
// TODO(sumeer): When LeftJoinWithPairedJoiner, the lookup columns and the
// bool column must be projected away by the optimizer after this join.
jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner}
}
var err error
Expand Down Expand Up @@ -239,6 +242,7 @@ func newJoinReader(
spec.OnExpr,
leftEqCols,
indexCols,
spec.OutputGroupContinuationForLeftRow,
post,
output,
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -361,11 +365,12 @@ func (jr *joinReader) initJoinReaderStrategy(
drc.DisableCache = true
}
jr.strategy = &joinReaderOrderingStrategy{
joinerBase: &jr.joinerBase,
defaultSpanGenerator: spanGenerator,
isPartialJoin: jr.joinType == descpb.LeftSemiJoin || jr.joinType == descpb.LeftAntiJoin,
lookedUpRows: drc,
groupingState: jr.groupingState,
joinerBase: &jr.joinerBase,
defaultSpanGenerator: spanGenerator,
isPartialJoin: jr.joinType == descpb.LeftSemiJoin || jr.joinType == descpb.LeftAntiJoin,
lookedUpRows: drc,
groupingState: jr.groupingState,
outputGroupContinuationForLeftRow: jr.outputGroupContinuationForLeftRow,
}
}

Expand Down Expand Up @@ -404,8 +409,14 @@ func (jr *joinReader) neededRightCols() util.FastIntSet {
// Get the columns from the right side of the join and shift them over by
// the size of the left side so the right side starts at 0.
neededRightCols := util.MakeFastIntSet()
var lastCol int
for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) {
neededRightCols.Add(i - len(jr.inputTypes))
lastCol = i - len(jr.inputTypes)
neededRightCols.Add(lastCol)
}
if jr.outputGroupContinuationForLeftRow {
// The lastCol is the bool continuation column and not a right column.
neededRightCols.Remove(lastCol)
}

// Add columns needed by OnExpr.
Expand Down Expand Up @@ -580,7 +591,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet
}
}

// Fetch the next row and copy it into the row container.
// Fetch the next row and tell the strategy to process it.
lookedUpRow, _, _, err := jr.fetcher.NextRow(jr.Ctx)
if err != nil {
jr.MoveToDraining(scrub.UnwrapScrubError(err))
Expand Down Expand Up @@ -868,12 +879,16 @@ func (ib *inputBatchGroupingState) setFirstGroupMatched() {
ib.groupState[0].matched = true
}

func (ib *inputBatchGroupingState) setMatched(rowIndex int) {
// setMatched records that the given rowIndex has matched. It returns the
// previous value of the matched field.
func (ib *inputBatchGroupingState) setMatched(rowIndex int) bool {
groupIndex := rowIndex
if ib.doGrouping {
groupIndex = ib.batchRowToGroupIndex[rowIndex]
}
rv := ib.groupState[groupIndex].matched
ib.groupState[groupIndex].matched = true
return rv
}

func (ib *inputBatchGroupingState) getMatched(rowIndex int) bool {
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ type joinReaderOrderingStrategy struct {
}

groupingState *inputBatchGroupingState

// outputGroupContinuationForLeftRow is true when this join is the first
// join in paired-joins. Note that in this case the input batches will
// always be of size 1 (real input batching only happens when this join is
// the second join in paired-joins).
outputGroupContinuationForLeftRow bool
}

func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint() int64 {
Expand Down Expand Up @@ -445,7 +451,8 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// During a SemiJoin or AntiJoin, we only output if we've seen no match
// for this input row yet. Additionally, since we don't have to render
// anything to output a Semi or Anti join match, we can evaluate our
// on condition now.
// on condition now. NB: the first join in paired-joins is never a
// SemiJoin or AntiJoin.
if !s.groupingState.getMatched(inputRowIdx) {
renderedRow, err := s.render(s.inputRows[inputRowIdx], row)
if err != nil {
Expand Down Expand Up @@ -500,6 +507,10 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
// An outer-join non-match means we emit the input row with NULLs for
// the right side.
if renderedRow := s.renderUnmatchedRow(inputRow, leftSide); renderedRow != nil {
if s.outputGroupContinuationForLeftRow {
// This must be the first row being output for this input row.
renderedRow = append(renderedRow, falseEncDatum)
}
return renderedRow, jrEmittingRows, nil
}
case descpb.LeftAntiJoin:
Expand Down Expand Up @@ -532,7 +543,16 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
return nil, jrStateUnknown, err
}
if outputRow != nil {
s.groupingState.setMatched(s.emitCursor.inputRowIdx)
wasAlreadyMatched := s.groupingState.setMatched(s.emitCursor.inputRowIdx)
if s.outputGroupContinuationForLeftRow {
if wasAlreadyMatched {
// Not the first row output for this input row.
outputRow = append(outputRow, trueEncDatum)
} else {
// First row output for this input row.
outputRow = append(outputRow, falseEncDatum)
}
}
}
return outputRow, jrEmittingRows, nil
}
Expand Down
Loading

0 comments on commit c456295

Please sign in to comment.