-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
joinerbase.go
187 lines (165 loc) · 5.72 KB
/
joinerbase.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rowexec
import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
// joinerBase is the common core of all joiners.
type joinerBase struct {
execinfra.ProcessorBase
joinType descpb.JoinType
onCond execinfrapb.ExprHelper
emptyLeft rowenc.EncDatumRow
emptyRight rowenc.EncDatumRow
combinedRow rowenc.EncDatumRow
// EqCols contains the indices of the columns that are constrained to be
// equal. Specifically column EqCols[0][i] on the left side must match the
// column EqCols[1][i] on the right side.
eqCols [2][]uint32
}
// Init initializes the joinerBase.
//
// opts is passed along to the underlying ProcessorBase. The zero value is used
// if the processor using the joinerBase is not implementing RowSource.
func (jb *joinerBase) init(
self execinfra.RowSource,
flowCtx *execinfra.FlowCtx,
processorID int32,
leftTypes []*types.T,
rightTypes []*types.T,
jType descpb.JoinType,
onExpr execinfrapb.Expression,
leftEqColumns []uint32,
rightEqColumns []uint32,
outputContinuationColumn bool,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
opts execinfra.ProcStateOpts,
) error {
jb.joinType = jType
if jb.joinType.IsSetOpJoin() {
if !onExpr.Empty() {
return errors.Errorf("expected empty onExpr, got %v", onExpr)
}
}
jb.emptyLeft = make(rowenc.EncDatumRow, len(leftTypes))
for i := range jb.emptyLeft {
jb.emptyLeft[i] = rowenc.DatumToEncDatum(leftTypes[i], tree.DNull)
}
jb.emptyRight = make(rowenc.EncDatumRow, len(rightTypes))
for i := range jb.emptyRight {
jb.emptyRight[i] = rowenc.DatumToEncDatum(rightTypes[i], tree.DNull)
}
jb.eqCols[leftSide] = leftEqColumns
jb.eqCols[rightSide] = rightEqColumns
condRowSize := len(leftTypes) + len(rightTypes)
combinedRowSize := condRowSize
if outputContinuationColumn {
// NB: Can only be true for inner joins and left outer joins.
combinedRowSize++
}
jb.combinedRow = make(rowenc.EncDatumRow, combinedRowSize)
// condTypes does not include the continuation column, but the slice has the
// capacity for it.
condTypes := make([]*types.T, 0, combinedRowSize)
condTypes = append(condTypes, leftTypes...)
condTypes = append(condTypes, rightTypes...)
outputTypes := condTypes
if !jb.joinType.ShouldIncludeRightColsInOutput() {
outputTypes = outputTypes[:len(leftTypes)]
} else if outputContinuationColumn {
outputTypes = append(outputTypes, types.Bool)
}
if err := jb.ProcessorBase.Init(
self, post, outputTypes, flowCtx, processorID, output, nil /* memMonitor */, opts,
); err != nil {
return err
}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.EvalCtx.Txn)
return jb.onCond.Init(onExpr, condTypes, semaCtx, jb.EvalCtx)
}
// joinSide is the utility type to distinguish between two sides of the join.
type joinSide uint8
const (
// leftSide indicates the left side of the join.
leftSide joinSide = 0
// rightSide indicates the right side of the join.
rightSide joinSide = 1
)
// otherSide returns the opposite to s side.
func otherSide(s joinSide) joinSide {
return joinSide(1 - uint8(s))
}
func (j joinSide) String() string {
if j == leftSide {
return "left"
}
return "right"
}
// renderUnmatchedRow creates a result row given an unmatched row on either
// side. Only used for outer joins. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) renderUnmatchedRow(row rowenc.EncDatumRow, side joinSide) rowenc.EncDatumRow {
lrow, rrow := jb.emptyLeft, jb.emptyRight
if side == leftSide {
lrow = row
} else {
rrow = row
}
jb.combinedRow = jb.combinedRow[:0]
jb.combinedRow = append(jb.combinedRow, lrow...)
jb.combinedRow = append(jb.combinedRow, rrow...)
return jb.combinedRow
}
// shouldEmitUnmatchedRow determines if we should emit am ummatched row (with
// NULLs for the columns of the other stream). This happens in FULL OUTER joins
// and LEFT or RIGHT OUTER joins and ANTI joins (depending on which stream is
// stored).
func shouldEmitUnmatchedRow(side joinSide, joinType descpb.JoinType) bool {
switch joinType {
case descpb.LeftSemiJoin, descpb.InnerJoin, descpb.IntersectAllJoin:
return false
case descpb.RightOuterJoin:
return side == rightSide
case descpb.LeftOuterJoin:
return side == leftSide
case descpb.LeftAntiJoin:
return side == leftSide
case descpb.ExceptAllJoin:
return side == leftSide
case descpb.FullOuterJoin:
return true
default:
return true
}
}
// render constructs a row with columns from both sides. The ON condition is
// evaluated; if it fails, returns nil. Note that if the join is outputting a
// continuation column, the returned slice does not include the continuation
// column, but has the capacity for it.
func (jb *joinerBase) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, error) {
jb.combinedRow = jb.combinedRow[:len(lrow)+len(rrow)]
copy(jb.combinedRow, lrow)
copy(jb.combinedRow[len(lrow):], rrow)
if jb.onCond.Expr != nil {
res, err := jb.onCond.EvalFilter(jb.combinedRow)
if !res || err != nil {
return nil, err
}
}
return jb.combinedRow, nil
}