Skip to content

Commit

Permalink
rowexec: fix unmatched row bug for second paired join
Browse files Browse the repository at this point in the history
If a group is umatched in the second join in a paired join,
and a row needs to be output for the group, we were not
setting the right-side columns that are in the left-side to
NULL, since the expectation was that the NULL right-side
columns would be the ones kept after the subsequent projection.
This assumption was incorrect: the projection keeps the
right-side key columns that are included in the left-side.
The fix is to NULL these columns.

Release note: None
  • Loading branch information
sumeerbhola committed Jan 20, 2021
1 parent 0c0d6d9 commit 888777e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/logictest/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// See the comments in logic.go for more details.
func TestLogic(t *testing.T) {
defer leaktest.AfterTest(t)()
RunLogicTest(t, TestServerArgs{}, "testdata/logic_test/[^.]*")
RunLogicTest(t, TestServerArgs{}, "testdata/logic_test/paired_joins_null_bug")
}

// TestSqlLiteLogic runs the supported SqlLite logic tests. See the comments
Expand Down
86 changes: 86 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/paired_joins_null_bug
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# LogicTest: local

statement ok
CREATE TABLE j1 (
k INT PRIMARY KEY,
j JSON
)

statement ok
INSERT INTO j1 VALUES (1, '{"a": "b"}')

statement ok
CREATE TABLE j2 (
k INT PRIMARY KEY,
j JSON,
INVERTED INDEX j_idx (j)
)

statement ok
INSERT INTO j2 VALUES (1, '{"a": "b"}')

query T
EXPLAIN (OPT, VERBOSE)
SELECT j1.*, j2.*
FROM j1 LEFT INVERTED JOIN j2@j_idx
ON j2.j @> j1.j AND j2.j = '"foo"'
ORDER BY j1.k, j2.k
----
sort (segmented)
├── columns: k:1 j:2 k:5 j:6
├── immutable
├── stats: [rows=1000, distinct(1)=1000, null(1)=0]
├── cost: 102714.07
├── key: (1,5)
├── fd: (1)-->(2), (1,5)-->(6)
├── ordering: +1,+5
├── prune: (1,5)
├── interesting orderings: (+1) (+5)
└── left-join (lookup j2)
├── columns: j1.k:1 j1.j:2 j2.k:5 j2.j:6
├── key columns: [5] = [5]
├── lookup columns are key
├── immutable
├── stats: [rows=1000, distinct(1)=1000, null(1)=0]
├── cost: 102684.06
├── key: (1,5)
├── fd: (1)-->(2), (1,5)-->(6)
├── ordering: +1
├── prune: (1,5)
├── interesting orderings: (+1) (+5)
├── left-join (inverted j2@j_idx)
│ ├── columns: j1.k:1 j1.j:2 j2.k:5 continuation:11
│ ├── flags: force inverted join (into right side)
│ ├── inverted-expr
│ │ └── j2.j:6 @> j1.j:2
│ ├── stats: [rows=10000, distinct(1)=1000, null(1)=0]
│ ├── cost: 41984.03
│ ├── key: (1,5)
│ ├── fd: (1)-->(2), (5)-->(11)
│ ├── ordering: +1
│ ├── scan j1
│ │ ├── columns: j1.k:1 j1.j:2
│ │ ├── stats: [rows=1000, distinct(1)=1000, null(1)=0]
│ │ ├── cost: 1084.02
│ │ ├── key: (1)
│ │ ├── fd: (1)-->(2)
│ │ ├── ordering: +1
│ │ ├── prune: (1,2)
│ │ ├── interesting orderings: (+1)
│ │ └── unfiltered-cols: (1-4)
│ └── filters (true)
└── filters
├── j2.j:6 @> j1.j:2 [outer=(2,6), immutable]
└── j2.j:6 = '"foo"' [outer=(6), immutable, constraints=(/6: [/'"foo"' - /'"foo"']; tight), fd=()-->(6)]

# NOTE: The output should be:
#
# 1 {"a": "b"} NULL NULL
#
query ITIT
SELECT j1.*, j2.*
FROM j1 LEFT INVERTED JOIN j2@j_idx
ON j2.j @> j1.j AND j2.j = '"foo"'
ORDER BY j1.k, j2.k
----
1 {"a": "b"} NULL NULL
16 changes: 14 additions & 2 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,11 @@ func (jr *joinReader) allContinuationValsProcessed() rowenc.EncDatumRow {
// match.
switch jr.joinType {
case descpb.LeftOuterJoin:
outRow = jr.renderUnmatchedRow(jr.lastBatchState.lastInputRow, leftSide)
outRow = changeLookupColsToNullForUnmatchedRowInSecondPairedJoin(
jr.renderUnmatchedRow(jr.lastBatchState.lastInputRow, leftSide), jr.lookupCols)
case descpb.LeftAntiJoin:
outRow = jr.lastBatchState.lastInputRow
outRow = changeLookupColsToNullForUnmatchedRowInSecondPairedJoin(
jr.lastBatchState.lastInputRow, jr.lookupCols)
}
}
// Else the last group matched, so already emitted 1+ row for left outer
Expand All @@ -806,6 +808,16 @@ func (jr *joinReader) allContinuationValsProcessed() rowenc.EncDatumRow {
return outRow
}

func changeLookupColsToNullForUnmatchedRowInSecondPairedJoin(
row rowenc.EncDatumRow, lookupCols []uint32,
) rowenc.EncDatumRow {
for _, col := range lookupCols {
row[col].UnsetDatum()
row[col].Datum = tree.DNull
}
return row
}

// updateGroupingStateForNonEmptyBatch is called once the batch has been read
// and found to be non-empty.
func (jr *joinReader) updateGroupingStateForNonEmptyBatch() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (s *joinReaderNoOrderingStrategy) nextRowToEmit(
}
inputRow := s.inputRows[s.emitState.unmatchedInputRowIndices[s.emitState.unmatchedInputRowIndicesCursor]]
s.emitState.unmatchedInputRowIndicesCursor++
if s.groupingState.doGrouping {
// Second join in paired join
inputRow = changeLookupColsToNullForUnmatchedRowInSecondPairedJoin(inputRow, s.lookupCols)
}
if !s.joinType.ShouldIncludeRightColsInOutput() {
return inputRow, jrEmittingRows, nil
}
Expand Down Expand Up @@ -502,6 +506,9 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
s.emitCursor.inputRowIdx++
s.emitCursor.outputRowIdx = 0
if s.groupingState.isUnmatched(inputRowIdx) {
if s.groupingState.doGrouping {
inputRow = changeLookupColsToNullForUnmatchedRowInSecondPairedJoin(inputRow, s.lookupCols)
}
switch s.joinType {
case descpb.LeftOuterJoin:
// An outer-join non-match means we emit the input row with NULLs for
Expand Down

0 comments on commit 888777e

Please sign in to comment.