Skip to content

Commit

Permalink
Merge #39125
Browse files Browse the repository at this point in the history
39125: exec: support ON expression with INNER merge and hash joins r=yuzefovich a=yuzefovich

**exec: support ON expresion with INNER merge join**

Supporting ON expression is simple in case of an inner merge join
since we simply need to filter out the rows that don't satisfy the
predicate. It is done as a selection operator after the merge joiner.

**exec: add support for INNER hash joiner with ON expression**

Similarly to merge joiner case, it is very simple to add support
for ON expression in case of inner hash joiner by just planning
a filter on top. Additionally, this commit introduces a test of
hash joiner operator against the processor.

Addresses: #38018.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 29, 2019
2 parents af4865e + cd473c5 commit 35045f1
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 88 deletions.
89 changes: 55 additions & 34 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ func newColOperator(
return result, err
}

if !core.HashJoiner.OnExpr.Empty() {
return result, errors.Newf("can't plan hash join with on expressions")
}

var leftTypes, rightTypes []types.T
leftTypes, err = conv.FromColumnTypes(spec.Input[0].ColumnTypes)
if err != nil {
Expand Down Expand Up @@ -345,17 +341,23 @@ func newColOperator(
core.HashJoiner.LeftEqColumnsAreKey || core.HashJoiner.RightEqColumnsAreKey,
core.HashJoiner.Type,
)
if err != nil {
return result, err
}

if !core.HashJoiner.OnExpr.Empty() {
if core.HashJoiner.Type != sqlbase.JoinType_INNER {
return result, errors.Newf("can't plan non-inner hash join with on expressions")
}
columnTypes, err = result.planFilterExpr(flowCtx, core.HashJoiner.OnExpr, columnTypes)
}

case core.MergeJoiner != nil:
if err := checkNumIn(inputs, 2); err != nil {
return result, err
}

if !core.MergeJoiner.OnExpr.Empty() {
return result, errors.Newf("can't plan merge join with on expressions")
}
if core.MergeJoiner.Type == sqlbase.JoinType_INTERSECT_ALL ||
core.MergeJoiner.Type == sqlbase.JoinType_EXCEPT_ALL {
if core.MergeJoiner.Type.IsSetOpJoin() {
return result, errors.AssertionFailedf("unexpectedly %s merge join was planned", core.MergeJoiner.Type.String())
}

Expand Down Expand Up @@ -408,6 +410,9 @@ func newColOperator(
core.MergeJoiner.LeftOrdering.Columns,
core.MergeJoiner.RightOrdering.Columns,
)
if err != nil {
return result, err
}

columnTypes = make([]semtypes.T, nLeftCols+nRightCols)
copy(columnTypes, spec.Input[0].ColumnTypes)
Expand All @@ -418,6 +423,13 @@ func newColOperator(
columnTypes = columnTypes[:nLeftCols]
}

if !core.MergeJoiner.OnExpr.Empty() {
if core.MergeJoiner.Type != sqlbase.JoinType_INNER {
return result, errors.Errorf("can't plan non-inner merge joins with on expressions")
}
columnTypes, err = result.planFilterExpr(flowCtx, core.MergeJoiner.OnExpr, columnTypes)
}

case core.JoinReader != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
Expand Down Expand Up @@ -552,6 +564,10 @@ func newColOperator(
return result, errors.Newf("unsupported processor core %q", core)
}

if err != nil {
return result, err
}

// After constructing the base operator, calculate the memory usage
// of the operator.
if sMemOp, ok := result.op.(exec.StaticMemoryOperator); ok {
Expand All @@ -560,38 +576,14 @@ func newColOperator(

log.VEventf(ctx, 1, "made op %T\n", result.op)

if err != nil {
return result, err
}

if columnTypes == nil {
return result, errors.AssertionFailedf("output columnTypes unset after planning %T", result.op)
}

if !post.Filter.Empty() {
var (
helper exprHelper
selectionMem int
)
err := helper.init(post.Filter, columnTypes, flowCtx.EvalCtx)
if err != nil {
if columnTypes, err = result.planFilterExpr(flowCtx, post.Filter, columnTypes); err != nil {
return result, err
}
var filterColumnTypes []semtypes.T
result.op, _, filterColumnTypes, selectionMem, err = planSelectionOperators(flowCtx.NewEvalCtx(), helper.expr, columnTypes, result.op)
if err != nil {
return result, errors.Wrapf(err, "unable to columnarize filter expression %q", post.Filter.Expr)
}
result.memUsage += selectionMem
if len(filterColumnTypes) > len(columnTypes) {
// Additional columns were appended to store projection results while
// evaluating the filter. Project them away.
var outputColumns []uint32
for i := range columnTypes {
outputColumns = append(outputColumns, uint32(i))
}
result.op = exec.NewSimpleProjectOp(result.op, len(filterColumnTypes), outputColumns)
}
}
if post.Projection {
result.op = exec.NewSimpleProjectOp(result.op, len(columnTypes), post.OutputColumns)
Expand Down Expand Up @@ -645,6 +637,35 @@ func newColOperator(
return result, err
}

func (r *newColOperatorResult) planFilterExpr(
flowCtx *FlowCtx, filter distsqlpb.Expression, columnTypes []semtypes.T,
) ([]semtypes.T, error) {
var (
helper exprHelper
selectionMem int
)
err := helper.init(filter, columnTypes, flowCtx.EvalCtx)
if err != nil {
return columnTypes, err
}
var filterColumnTypes []semtypes.T
r.op, _, filterColumnTypes, selectionMem, err = planSelectionOperators(flowCtx.NewEvalCtx(), helper.expr, columnTypes, r.op)
if err != nil {
return columnTypes, errors.Wrapf(err, "unable to columnarize filter expression %q", filter.Expr)
}
r.memUsage += selectionMem
if len(filterColumnTypes) > len(columnTypes) {
// Additional columns were appended to store projections while evaluating
// the filter. Project them away.
var outputColumns []uint32
for i := range columnTypes {
outputColumns = append(outputColumns, uint32(i))
}
r.op = exec.NewSimpleProjectOp(r.op, len(filterColumnTypes), outputColumns)
}
return columnTypes, nil
}

func planSelectionOperators(
ctx *tree.EvalContext, expr tree.TypedExpr, columnTypes []semtypes.T, input exec.Operator,
) (op exec.Operator, resultIdx int, ct []semtypes.T, memUsed int, err error) {
Expand Down
Loading

0 comments on commit 35045f1

Please sign in to comment.