Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add support for joinReader to be the first join in paired joins #56161

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 40
const Version execinfrapb.DistSQLVersion = 41

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
Expand All @@ -51,6 +51,11 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 40

Please add new entries at the top.

- Version: 41 (MinAcceptedVersion: 40)
- A paired joiner approach for lookup joins was added, for left
outer/semi/anti joins involving a pair of joinReaders, where the
first join uses a non-covering index.

- Version: 40 (MinAcceptedVersion: 40)
- A new field was added execinfrapb.ProcessorSpec to propagate the result
column types of a processor. This change is not backwards compatible
Expand All @@ -62,7 +67,7 @@ Please add new entries at the top.
protobuf struct for easier propagation to the remote nodes during the
execution.

- Version: 38 (MinAcceptedVersion: 38)
- Version: 38 (MinAcceptedVersion: 37)
- A paired joiner approach for inverted joins was added, for left
outer/semi/anti joins involving the invertedJoiner and joinReader.

Expand Down
452 changes: 247 additions & 205 deletions pkg/sql/execinfrapb/processors_sql.pb.go

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ message IndexSkipTableReaderSpec {
// The output for LEFT_SEMI:
// a2, b1, c1, d4, true
// Again, the d, cont columns will be projected away after the join.
//
// The example above is for a lookup join as the second join in the
// paired-joins. The lookup join can also be the first join in the
// paired-joins, which is indicated by both
// OutputGroupContinuationForLeftRow and MaintainOrdering set to true.
message JoinReaderSpec {
optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false];

Expand Down Expand Up @@ -314,8 +319,16 @@ message JoinReaderSpec {
optional bool has_system_columns = 13 [(gogoproto.nullable) = false];

// LeftJoinWithPairedJoiner is used when a left {outer,anti,semi} join is
// being achieved by pairing two joins. See the comment above.
// being achieved by pairing two joins, and this is the second join. See
// the comment above.
optional bool left_join_with_paired_joiner = 14 [(gogoproto.nullable) = false];

// OutputGroupContinuationForLeftRow indicates that this join is the first
// join in the paired-joins. At most one of OutputGroupContinuationForLeftRow
// and LeftJoinWithPairedJoiner must be true. Additionally, if
// OutputGroupContinuationForLeftRow is true, MaintainOrdering must also
// be true.
optional bool output_group_continuation_for_left_row = 15 [(gogoproto.nullable) = false];
}

// SorterSpec is the specification for a "sorting aggregator". A sorting
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func newHashJoiner(
spec.OnExpr,
spec.LeftEqColumns,
spec.RightEqColumns,
false, /* outputContinuationColumn */
post,
output,
execinfra.ProcStateOpts{
Expand Down
31 changes: 22 additions & 9 deletions pkg/sql/rowexec/joinerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (jb *joinerBase) init(
onExpr execinfrapb.Expression,
leftEqColumns []uint32,
rightEqColumns []uint32,
outputContinuationColumn bool,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
opts execinfra.ProcStateOpts,
Expand All @@ -74,18 +75,26 @@ func (jb *joinerBase) init(
jb.eqCols[leftSide] = leftEqColumns
jb.eqCols[rightSide] = rightEqColumns

size := len(leftTypes) + len(rightTypes)
jb.combinedRow = make(rowenc.EncDatumRow, size)
rowSize := len(leftTypes) + len(rightTypes)
if outputContinuationColumn {
// NB: Can only be true for inner joins and left outer joins.
rowSize++
}
jb.combinedRow = make(rowenc.EncDatumRow, rowSize)

condTypes := make([]*types.T, 0, size)
// condTypes does not include the continuation column, but the slice has the
// capacity for it, since outputTypes later adds the continuation column.
condTypes := make([]*types.T, 0, rowSize)
condTypes = append(condTypes, leftTypes...)
condTypes = append(condTypes, rightTypes...)

outputSize := len(leftTypes)
if jb.joinType.ShouldIncludeRightColsInOutput() {
outputSize += len(rightTypes)
outputTypes := condTypes
// NB: outputContinuationCol implies jb.joinType.ShouldIncludeRightColsInOutput()
if !jb.joinType.ShouldIncludeRightColsInOutput() {
outputTypes = outputTypes[:len(leftTypes)]
} else if outputContinuationColumn {
outputTypes = append(outputTypes, types.Bool)
}
outputTypes := condTypes[:outputSize]

if err := jb.ProcessorBase.Init(
self, post, outputTypes, flowCtx, processorID, output, nil /* memMonitor */, opts,
Expand Down Expand Up @@ -119,7 +128,9 @@ func (j joinSide) String() string {
}

// renderUnmatchedRow creates a result row given an unmatched row on either
// side. Only used for outer joins.
// side. Only used for outer joins. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) renderUnmatchedRow(row rowenc.EncDatumRow, side joinSide) rowenc.EncDatumRow {
lrow, rrow := jb.emptyLeft, jb.emptyRight
if side == leftSide {
Expand Down Expand Up @@ -158,7 +169,9 @@ func shouldEmitUnmatchedRow(side joinSide, joinType descpb.JoinType) bool {
}

// render constructs a row with columns from both sides. The ON condition is
// evaluated; if it fails, returns nil.
// evaluated; if it fails, returns nil. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, error) {
jb.combinedRow = jb.combinedRow[:len(lrow)+len(rrow)]
copy(jb.combinedRow, lrow)
Expand Down
59 changes: 37 additions & 22 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,26 @@ type joinReader struct {
// (or evaluate it accurately, as is sometimes the case with inverted
// indexes). The first join is running a left outer or inner join, and each
// group of rows seen by the second join correspond to one left row.
//
// TODO(sumeer): add support for joinReader to also be the first
// join in this pair, say when the index can evaluate most of the
// join condition (the part with low selectivity), but not all.
// Currently only the invertedJoiner can serve as the first join
// in the pair.

// The input rows in the current batch belong to groups which are tracked in
// groupingState. The last row from the last batch is in
// lastInputRowFromLastBatch -- it is tracked because we don't know if it
// was the last row in a group until we get to the next batch. NB:
// groupingState is used even when there is no grouping -- we simply have
// groups of one.
// groups of one. The no grouping cases include the case of this join being
// the first join in the paired joins.
groupingState *inputBatchGroupingState

lastBatchState struct {
lastInputRow rowenc.EncDatumRow
lastGroupMatched bool
lastGroupContinued bool
}

// Set to true when this is the first join in the paired-joins (see the
// detailed comment in the spec). This can never be true for index joins,
// and requires that the spec has MaintainOrdering set to true.
outputGroupContinuationForLeftRow bool
}

var _ execinfra.Processor = &joinReader{}
Expand All @@ -153,6 +153,10 @@ func newJoinReader(
if spec.IndexIdx != 0 && readerType == indexJoinReaderType {
return nil, errors.AssertionFailedf("index join must be against primary index")
}
if spec.OutputGroupContinuationForLeftRow && !spec.MaintainOrdering {
return nil, errors.AssertionFailedf(
"lookup join must maintain ordering since it is first join in paired-joins")
}

var lookupCols []uint32
switch readerType {
Expand All @@ -168,15 +172,14 @@ func newJoinReader(
return nil, errors.Errorf("unsupported joinReaderType")
}
jr := &joinReader{
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
}
if readerType != indexJoinReaderType {
// TODO(sumeer): When LeftJoinWithPairedJoiner, the lookup columns and the
// bool column must be projected away by the optimizer after this join.
jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner}
}
var err error
Expand Down Expand Up @@ -239,6 +242,7 @@ func newJoinReader(
spec.OnExpr,
leftEqCols,
indexCols,
spec.OutputGroupContinuationForLeftRow,
post,
output,
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -361,11 +365,12 @@ func (jr *joinReader) initJoinReaderStrategy(
drc.DisableCache = true
}
jr.strategy = &joinReaderOrderingStrategy{
joinerBase: &jr.joinerBase,
defaultSpanGenerator: spanGenerator,
isPartialJoin: jr.joinType == descpb.LeftSemiJoin || jr.joinType == descpb.LeftAntiJoin,
lookedUpRows: drc,
groupingState: jr.groupingState,
joinerBase: &jr.joinerBase,
defaultSpanGenerator: spanGenerator,
isPartialJoin: jr.joinType == descpb.LeftSemiJoin || jr.joinType == descpb.LeftAntiJoin,
lookedUpRows: drc,
groupingState: jr.groupingState,
outputGroupContinuationForLeftRow: jr.outputGroupContinuationForLeftRow,
}
}

Expand Down Expand Up @@ -404,8 +409,14 @@ func (jr *joinReader) neededRightCols() util.FastIntSet {
// Get the columns from the right side of the join and shift them over by
// the size of the left side so the right side starts at 0.
neededRightCols := util.MakeFastIntSet()
var lastCol int
for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) {
neededRightCols.Add(i - len(jr.inputTypes))
lastCol = i - len(jr.inputTypes)
neededRightCols.Add(lastCol)
}
if jr.outputGroupContinuationForLeftRow {
// The lastCol is the bool continuation column and not a right column.
neededRightCols.Remove(lastCol)
}

// Add columns needed by OnExpr.
Expand Down Expand Up @@ -580,7 +591,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet
}
}

// Fetch the next row and copy it into the row container.
// Fetch the next row and tell the strategy to process it.
lookedUpRow, _, _, err := jr.fetcher.NextRow(jr.Ctx)
if err != nil {
jr.MoveToDraining(scrub.UnwrapScrubError(err))
Expand Down Expand Up @@ -868,12 +879,16 @@ func (ib *inputBatchGroupingState) setFirstGroupMatched() {
ib.groupState[0].matched = true
}

func (ib *inputBatchGroupingState) setMatched(rowIndex int) {
// setMatched records that the given rowIndex has matched. It returns the
// previous value of the matched field.
func (ib *inputBatchGroupingState) setMatched(rowIndex int) bool {
groupIndex := rowIndex
if ib.doGrouping {
groupIndex = ib.batchRowToGroupIndex[rowIndex]
}
rv := ib.groupState[groupIndex].matched
ib.groupState[groupIndex].matched = true
return rv
}

func (ib *inputBatchGroupingState) getMatched(rowIndex int) bool {
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ type joinReaderOrderingStrategy struct {
}

groupingState *inputBatchGroupingState

// outputGroupContinuationForLeftRow is true when this join is the first
// join in paired-joins. Note that in this case the input batches will
// always be of size 1 (real input batching only happens when this join is
// the second join in paired-joins).
outputGroupContinuationForLeftRow bool
}

func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint() int64 {
Expand Down Expand Up @@ -445,7 +451,8 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// During a SemiJoin or AntiJoin, we only output if we've seen no match
// for this input row yet. Additionally, since we don't have to render
// anything to output a Semi or Anti join match, we can evaluate our
// on condition now.
// on condition now. NB: the first join in paired-joins is never a
// SemiJoin or AntiJoin.
if !s.groupingState.getMatched(inputRowIdx) {
renderedRow, err := s.render(s.inputRows[inputRowIdx], row)
if err != nil {
Expand Down Expand Up @@ -500,6 +507,10 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
// An outer-join non-match means we emit the input row with NULLs for
// the right side.
if renderedRow := s.renderUnmatchedRow(inputRow, leftSide); renderedRow != nil {
if s.outputGroupContinuationForLeftRow {
// This must be the first row being output for this input row.
renderedRow = append(renderedRow, falseEncDatum)
}
return renderedRow, jrEmittingRows, nil
}
case descpb.LeftAntiJoin:
Expand Down Expand Up @@ -532,7 +543,16 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
return nil, jrStateUnknown, err
}
if outputRow != nil {
s.groupingState.setMatched(s.emitCursor.inputRowIdx)
wasAlreadyMatched := s.groupingState.setMatched(s.emitCursor.inputRowIdx)
if s.outputGroupContinuationForLeftRow {
if wasAlreadyMatched {
// Not the first row output for this input row.
outputRow = append(outputRow, trueEncDatum)
} else {
// First row output for this input row.
outputRow = append(outputRow, falseEncDatum)
}
}
}
return outputRow, jrEmittingRows, nil
}
Expand Down
Loading