Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
40754: colexec: make sure that joiners output only the needed columns r=asubiotto a=yuzefovich

Previously, the joiners would create an output batch with all
the input columns but would set the values only the requested
output columns. Now this is fixed, and both hash and merge joiners
output the batch only with the requested columns.

Additionally, this commit fixes a bug with ON expression planning
for joiners. Previously, the column types that the remapped ON
expression references were not remapped themselves.

Furthermore, the ON expression itself was modified in-place. This
is now fixed.

Also, post.Filter was handled incorrectly after a joiner if the
columns used by the filter were not a part of the projection. This
is now fixed.

Fixes: cockroachdb#40732.

Release justification: Category 3: Fixes for high-priority or
high-severity bugs in existing functionality.

Release note: None


Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Sep 18, 2019
2 parents 98159fa + 88729ca commit 8e15411
Show file tree
Hide file tree
Showing 8 changed files with 977 additions and 968 deletions.
175 changes: 113 additions & 62 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func checkNumIn(inputs []Operator, numIn int) error {
return nil
}

// wrapRowSource, given an input exec.Operator, integrates toWrap into a
// columnar execution flow and returns toWrap's output as an exec.Operator.
// wrapRowSource, given an input Operator, integrates toWrap into a columnar
// execution flow and returns toWrap's output as an Operator.
func wrapRowSource(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -104,6 +104,20 @@ func NewColOperator(
core := &spec.Core
post := &spec.Post

var (
// projectionHandled indicates whether the "core" operator handles the
// projection itself (for example, hash joiner and merge joiner do that
// because of efficiency).
projectionHandled bool

// postFilterPlanning will be set by the operators that handle the
// projection themselves. This is needed to handle post.Filter correctly so
// that those operators output all the columns that are used by post.Filter
// even if some columns are not needed by post.OutputColumns. If it remains
// unset, then postFilterPlanning will act as a noop.
postFilterPlanning filterPlanningState
)

// By default, we safely assume that an operator is not streaming. Note that
// projections, renders, filters, limits, offsets as well as all internal
// operators (like stats collectors and cancel checkers) are streaming, so in
Expand Down Expand Up @@ -138,7 +152,7 @@ func NewColOperator(
log.VEventf(ctx, 1, "made op %T\n", result.Op)

// We want to check for cancellation once per input batch, and wrapping
// only colBatchScan with an exec.CancelChecker allows us to do just that.
// only colBatchScan with a CancelChecker allows us to do just that.
// It's sufficient for most of the operators since they are extremely fast.
// However, some of the long-running operators (for example, sorter) are
// still responsible for doing the cancellation check on their own while
Expand Down Expand Up @@ -319,6 +333,7 @@ func NewColOperator(
rightOutCols = append(rightOutCols, col-nLeftCols)
}
}
projectionHandled = true
} else {
for i := uint32(0); i < nLeftCols; i++ {
leftOutCols = append(leftOutCols, i)
Expand All @@ -328,17 +343,24 @@ func NewColOperator(
}
}

if !post.Filter.Empty() {
postFilterPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = postFilterPlanning.renderAllNeededCols(
post.Filter, leftOutCols, rightOutCols,
)
}

var (
onExpr *execinfrapb.Expression
filterPlanning *filterPlanningState
onExprPlanning filterPlanningState
)
if !core.HashJoiner.OnExpr.Empty() {
if core.HashJoiner.Type != sqlbase.JoinType_INNER {
return result, errors.Newf("can't plan non-inner hash join with on expressions")
}
onExpr = &core.HashJoiner.OnExpr
filterPlanning = newFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = filterPlanning.renderAllNeededCols(
onExprPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = onExprPlanning.renderAllNeededCols(
*onExpr, leftOutCols, rightOutCols,
)
}
Expand All @@ -360,19 +382,12 @@ func NewColOperator(
return result, err
}

result.ColumnTypes = make([]types.T, nLeftCols+nRightCols)
copy(result.ColumnTypes, spec.Input[0].ColumnTypes)
if core.HashJoiner.Type != sqlbase.JoinType_LEFT_SEMI {
// TODO(yuzefovich): update this conditional once LEFT ANTI is supported.
copy(result.ColumnTypes[nLeftCols:], spec.Input[1].ColumnTypes)
} else {
result.ColumnTypes = result.ColumnTypes[:nLeftCols]
}
result.setProjectedByJoinerColumnTypes(spec, leftOutCols, rightOutCols)

if onExpr != nil {
filterPlanning.remapIVars(onExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), *onExpr)
filterPlanning.projectOutExtraCols(&result, leftOutCols, rightOutCols)
remappedOnExpr := onExprPlanning.remapIVars(*onExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), remappedOnExpr)
onExprPlanning.projectOutExtraCols(&result)
}

case core.MergeJoiner != nil:
Expand Down Expand Up @@ -415,6 +430,7 @@ func NewColOperator(
rightOutCols = append(rightOutCols, col-nLeftCols)
}
}
projectionHandled = true
} else {
for i := uint32(0); i < nLeftCols; i++ {
leftOutCols = append(leftOutCols, i)
Expand All @@ -424,9 +440,16 @@ func NewColOperator(
}
}

if !post.Filter.Empty() {
postFilterPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = postFilterPlanning.renderAllNeededCols(
post.Filter, leftOutCols, rightOutCols,
)
}

var (
onExpr *execinfrapb.Expression
filterPlanning *filterPlanningState
onExprPlanning filterPlanningState
filterOnlyOnLeft bool
filterConstructor func(Operator) (Operator, error)
)
Expand All @@ -440,14 +463,14 @@ func NewColOperator(
result.IsStreaming = false

onExpr = &core.MergeJoiner.OnExpr
filterPlanning = newFilterPlanningState(len(leftTypes), len(rightTypes))
onExprPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
switch core.MergeJoiner.Type {
case sqlbase.JoinType_INNER:
leftOutCols, rightOutCols = filterPlanning.renderAllNeededCols(
leftOutCols, rightOutCols = onExprPlanning.renderAllNeededCols(
*onExpr, leftOutCols, rightOutCols,
)
case sqlbase.JoinType_LEFT_SEMI, sqlbase.JoinType_LEFT_ANTI:
filterOnlyOnLeft = filterPlanning.isFilterOnlyOnLeft(*onExpr)
filterOnlyOnLeft = onExprPlanning.isFilterOnlyOnLeft(*onExpr)
filterConstructor = func(op Operator) (Operator, error) {
r := NewColOperatorResult{
Op: op,
Expand Down Expand Up @@ -481,19 +504,12 @@ func NewColOperator(
return result, err
}

result.ColumnTypes = make([]types.T, nLeftCols+nRightCols)
copy(result.ColumnTypes, spec.Input[0].ColumnTypes)
if core.MergeJoiner.Type != sqlbase.JoinType_LEFT_SEMI &&
core.MergeJoiner.Type != sqlbase.JoinType_LEFT_ANTI {
copy(result.ColumnTypes[nLeftCols:], spec.Input[1].ColumnTypes)
} else {
result.ColumnTypes = result.ColumnTypes[:nLeftCols]
}
result.setProjectedByJoinerColumnTypes(spec, leftOutCols, rightOutCols)

if onExpr != nil && core.MergeJoiner.Type == sqlbase.JoinType_INNER {
filterPlanning.remapIVars(onExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), *onExpr)
filterPlanning.projectOutExtraCols(&result, leftOutCols, rightOutCols)
remappedOnExpr := onExprPlanning.remapIVars(*onExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), remappedOnExpr)
onExprPlanning.projectOutExtraCols(&result)
}

case core.JoinReader != nil:
Expand Down Expand Up @@ -655,11 +671,13 @@ func NewColOperator(
}

if !post.Filter.Empty() {
if err = result.planFilterExpr(flowCtx.NewEvalCtx(), post.Filter); err != nil {
filterExpr := postFilterPlanning.remapIVars(post.Filter)
if err = result.planFilterExpr(flowCtx.NewEvalCtx(), filterExpr); err != nil {
return result, err
}
postFilterPlanning.projectOutExtraCols(&result)
}
if post.Projection {
if post.Projection && !projectionHandled {
result.Op = NewSimpleProjectOp(result.Op, len(result.ColumnTypes), post.OutputColumns)
// Update output ColumnTypes.
newTypes := make([]types.T, 0, len(post.OutputColumns))
Expand Down Expand Up @@ -712,13 +730,15 @@ type filterPlanningState struct {
numRightInputCols int
// indexVarMap will be populated when rendering all needed columns in case
// when at least one column from either side is used by the filter.
indexVarMap []int
extraLeftOutCols int
extraRightOutCols int
indexVarMap []int
// originalLeftOutCols and originalRightOutCols are stored so that we can
// remove all the extra columns that were added to handle the filter.
originalLeftOutCols []uint32
originalRightOutCols []uint32
}

func newFilterPlanningState(numLeftInputCols, numRightInputCols int) *filterPlanningState {
return &filterPlanningState{
func makeFilterPlanningState(numLeftInputCols, numRightInputCols int) filterPlanningState {
return filterPlanningState{
numLeftInputCols: numLeftInputCols,
numRightInputCols: numRightInputCols,
}
Expand All @@ -728,10 +748,10 @@ func newFilterPlanningState(numLeftInputCols, numRightInputCols int) *filterPlan
// will be output. It does so by extracting the indices of all indexed vars
// used in the expression and appending those that are missing from *OutCols
// slices to the slices. Additionally, it populates p.indexVarMap to be used
// later to correctly remap the indexed vars and stores information about how
// many extra columns are added so that those extra columns could be projected
// out after the filter has been run.
// later to correctly remap the indexed vars and stores the original *OutCols
// to be projected after the filter has been run.
// It returns updated leftOutCols and rightOutCols.
// NOTE: projectOutExtraCols must be called after the filter has been run.
func (p *filterPlanningState) renderAllNeededCols(
filter execinfrapb.Expression, leftOutCols []uint32, rightOutCols []uint32,
) ([]uint32, []uint32) {
Expand All @@ -741,6 +761,9 @@ func (p *filterPlanningState) renderAllNeededCols(
p.numLeftInputCols+p.numRightInputCols,
)
if len(neededColumnsForFilter) > 0 {
// Store the original out columns to be restored later.
p.originalLeftOutCols = leftOutCols
p.originalRightOutCols = rightOutCols
// At least one column is referenced by the filter expression.
p.indexVarMap = make([]int, p.numLeftInputCols+p.numRightInputCols)
for i := range p.indexVarMap {
Expand All @@ -755,7 +778,6 @@ func (p *filterPlanningState) renderAllNeededCols(
if p.indexVarMap[neededCol] == -1 {
p.indexVarMap[neededCol] = len(leftOutCols)
leftOutCols = append(leftOutCols, neededCol)
p.extraLeftOutCols++
}
}
}
Expand Down Expand Up @@ -797,7 +819,6 @@ func (p *filterPlanningState) renderAllNeededCols(
if p.indexVarMap[neededCol] == -1 {
p.indexVarMap[neededCol] = len(rightOutCols) + len(leftOutCols)
rightOutCols = append(rightOutCols, neededCol-rColOffset)
p.extraRightOutCols++
}
}
}
Expand All @@ -815,45 +836,75 @@ func (p *filterPlanningState) isFilterOnlyOnLeft(filter execinfrapb.Expression)
return len(neededColumnsForFilter) == 0
}

// remapIVars remaps tree.IndexedVars in expr using p.indexVarMap. Note that
// expr is modified in-place.
func (p *filterPlanningState) remapIVars(expr *execinfrapb.Expression) {
// remapIVars remaps tree.IndexedVars in expr using p.indexVarMap. Note that if
// the remapping is needed, then a new remapped expression is returned, but if
// the remapping is not needed (which is the case when all needed by the filter
// columns were part of the projection), then the same expression is returned.
func (p *filterPlanningState) remapIVars(expr execinfrapb.Expression) execinfrapb.Expression {
if p.indexVarMap == nil {
// If p.indexVarMap is nil, then there is no remapping to do.
return
return expr
}
ret := execinfrapb.Expression{}
if expr.LocalExpr != nil {
expr.LocalExpr = sqlbase.RemapIVarsInTypedExpr(expr.LocalExpr, p.indexVarMap)
ret.LocalExpr = sqlbase.RemapIVarsInTypedExpr(expr.LocalExpr, p.indexVarMap)
} else {
ret.Expr = expr.Expr
// We iterate in the reverse order so that the multiple digit numbers are
// handled correctly (consider an expression like @1 AND @11).
for idx := len(p.indexVarMap) - 1; idx >= 0; idx-- {
if p.indexVarMap[idx] != -1 {
// We need +1 below because the ordinals are counting from 1.
expr.Expr = strings.ReplaceAll(
expr.Expr,
ret.Expr = strings.ReplaceAll(
ret.Expr,
fmt.Sprintf("@%d", idx+1),
fmt.Sprintf("@%d", p.indexVarMap[idx]+1),
)
}
}
}
return ret
}

// projectOutExtraCols, possibly, adds a projection to remove all the extra
// columns that were needed by the filter expression.
func (p *filterPlanningState) projectOutExtraCols(
result *NewColOperatorResult, leftOutCols, rightOutCols []uint32,
// NOTE: result.ColumnTypes is updated if the projection is added.
func (p *filterPlanningState) projectOutExtraCols(result *NewColOperatorResult) {
if p.indexVarMap == nil {
// If p.indexVarMap is nil, then this filter planning didn't add any extra
// columns, so there is nothing to project out.
return
}
projection := make([]uint32, 0, len(p.originalLeftOutCols)+len(p.originalRightOutCols))
for _, i := range p.originalLeftOutCols {
projection = append(projection, uint32(p.indexVarMap[i]))
}
rColOffset := uint32(p.numLeftInputCols)
for _, i := range p.originalRightOutCols {
projection = append(projection, uint32(p.indexVarMap[rColOffset+i]))
}
result.Op = NewSimpleProjectOp(result.Op, len(result.ColumnTypes), projection)

// Update output column types according to the projection.
newTypes := make([]types.T, 0, len(projection))
for _, j := range projection {
newTypes = append(newTypes, result.ColumnTypes[j])
}
result.ColumnTypes = newTypes
}

// setProjectedByJoinerColumnTypes sets column types on r according to a
// joiner handled projection.
// NOTE: r.ColumnTypes is updated.
func (r *NewColOperatorResult) setProjectedByJoinerColumnTypes(
spec *execinfrapb.ProcessorSpec, leftOutCols, rightOutCols []uint32,
) {
if p.extraLeftOutCols+p.extraRightOutCols > 0 {
projection := make([]uint32, 0, len(leftOutCols)+len(rightOutCols)-p.extraLeftOutCols-p.extraRightOutCols)
for i := 0; i < len(leftOutCols)-p.extraLeftOutCols; i++ {
projection = append(projection, uint32(i))
}
for i := 0; i < len(rightOutCols)-p.extraRightOutCols; i++ {
projection = append(projection, uint32(i+len(leftOutCols)))
}
result.Op = NewSimpleProjectOp(result.Op, len(leftOutCols)+len(rightOutCols), projection)
r.ColumnTypes = make([]types.T, 0, len(leftOutCols)+len(rightOutCols))
for _, leftOutCol := range leftOutCols {
r.ColumnTypes = append(r.ColumnTypes, spec.Input[0].ColumnTypes[leftOutCol])
}
for _, rightOutCol := range rightOutCols {
r.ColumnTypes = append(r.ColumnTypes, spec.Input[1].ColumnTypes[rightOutCol])
}
}

Expand Down
Loading

0 comments on commit 8e15411

Please sign in to comment.