Skip to content

Commit

Permalink
sql: add buffer and scanBuffer nodes
Browse files Browse the repository at this point in the history
Adds bufferNode that consumes its input, stores all the rows in a
buffer, and then proceeds on passing the rows through. The buffer
can be iterated over multiple times using scanBuffer node that is
referencing a single bufferNode.

Release note: None
  • Loading branch information
yuzefovich committed May 1, 2019
1 parent cc49687 commit c89548c
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 0 deletions.
105 changes: 105 additions & 0 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

// bufferNode consumes all the input rows at once and writes them to a buffer.
// After the input has been fully consumed, it proceeds on passing the rows
// through. The buffer can be iterated over multiple times.
type bufferNode struct {
plan planNode

// TODO(yuzefovich): should the buffer be backed by the disk? If so, the
// comments about TempStorage suggest that it should be used by DistSQL
// processors, but this node is local.
bufferedRows *rowcontainer.RowContainer
passThruNextRowIdx int
}

func (n *bufferNode) startExec(params runParams) error {
n.bufferedRows = rowcontainer.NewRowContainer(
params.EvalContext().Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)),
0, /* rowCapacity */
)
for {
if err := params.p.cancelChecker.Check(); err != nil {
return err
}
ok, err := n.plan.Next(params)
if err != nil {
return err
}
if !ok {
return nil
}
if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil {
return err
}
}
}

func (n *bufferNode) Next(params runParams) (bool, error) {
if n.passThruNextRowIdx >= n.bufferedRows.Len() {
return false, nil
}
n.passThruNextRowIdx++
return true, nil
}

func (n *bufferNode) Values() tree.Datums {
return n.bufferedRows.At(n.passThruNextRowIdx - 1)
}

// TODO(yuzefovich): does this need to have some special behavior?
func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.bufferedRows.Close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
// referencing. The bufferNode can be iterated over multiple times
// simultaneously, however, a new scanBufferNode is needed. Note that
// scanBufferNode can only start its execution after the corresponding
// bufferNode finishes its execution completely.
type scanBufferNode struct {
buffer *bufferNode

nextRowIdx int
}

func (n *scanBufferNode) startExec(runParams) error {
return nil
}

func (n *scanBufferNode) Next(runParams) (bool, error) {
n.nextRowIdx++
return n.nextRowIdx <= n.buffer.bufferedRows.Len(), nil
}

func (n *scanBufferNode) Values() tree.Datums {
return n.buffer.bufferedRows.At(n.nextRowIdx - 1)
}

// Note that scanBufferNode does not close the corresponding to it bufferNode.
func (n *scanBufferNode) Close(context.Context) {
}
4 changes: 4 additions & 0 deletions pkg/sql/expand_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,9 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *errorIfRowsNode:
n.plan = p.simplifyOrderings(n.plan, nil)

case *bufferNode:
n.plan = p.simplifyOrderings(n.plan, usefulOrdering)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -904,6 +907,7 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *setZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ func (p *planner) propagateFilters(
return plan, extraFilter, err
}

case *bufferNode:
if n.plan, err = p.triggerFilterPropagation(ctx, n.plan); err != nil {
return plan, extraFilter, err
}

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -401,6 +406,7 @@ func (p *planner) propagateFilters(
case *setZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *errorIfRowsNode:
p.setUnlimited(n.plan)

case *bufferNode:
p.setUnlimited(n.plan)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -237,6 +240,7 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

case *applyJoinNode:
// The apply join node is only planned by the optimizer.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_needed.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func setNeededColumns(plan planNode, needed []bool) {
case *errorIfRowsNode:
setNeededColumns(n.plan, allColumns(n.plan))

case *bufferNode:
setNeededColumns(n.plan, needed)

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -293,6 +296,7 @@ func setNeededColumns(plan planNode, needed []bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

default:
panic(fmt.Sprintf("unhandled node type: %T", plan))
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type planNodeFastPath interface {
var _ planNode = &alterIndexNode{}
var _ planNode = &alterSequenceNode{}
var _ planNode = &alterTableNode{}
var _ planNode = &bufferNode{}
var _ planNode = &cancelQueriesNode{}
var _ planNode = &cancelSessionsNode{}
var _ planNode = &createDatabaseNode{}
Expand Down Expand Up @@ -204,6 +205,7 @@ var _ planNode = &renameIndexNode{}
var _ planNode = &renameTableNode{}
var _ planNode = &renderNode{}
var _ planNode = &rowCountNode{}
var _ planNode = &scanBufferNode{}
var _ planNode = &scanNode{}
var _ planNode = &scatterNode{}
var _ planNode = &serializeNode{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/plan_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {

// Nodes that have the same schema as their source or their
// valueNode helper.
case *bufferNode:
return getPlanColumns(n.plan, mut)
case *distinctNode:
return getPlanColumns(n.plan, mut)
case *filterNode:
Expand All @@ -130,6 +132,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {
return getPlanColumns(n.source, mut)
case *serializeNode:
return getPlanColumns(n.source, mut)
case *scanBufferNode:
return getPlanColumns(n.buffer, mut)

case *rowSourceToPlanNode:
return n.planCols
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan_physical_props.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func planPhysicalProps(plan planNode) physicalProps {
case *zigzagJoinNode:
return n.props
case *applyJoinNode:
case *bufferNode:
case *scanBufferNode:

// Every other node simply has no guarantees on its output rows.
case *CreateUserNode:
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,9 @@ func (v *planVisitor) visitInternal(plan planNode, name string) {

case *errorIfRowsNode:
n.plan = v.visit(n.plan)

case *bufferNode:
n.plan = v.visit(n.plan)
}
}

Expand Down Expand Up @@ -721,6 +724,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&alterTableNode{}): "alter table",
reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user",
reflect.TypeOf(&applyJoinNode{}): "apply-join",
reflect.TypeOf(&bufferNode{}): "buffer node",
reflect.TypeOf(&commentOnColumnNode{}): "comment on column",
reflect.TypeOf(&commentOnDatabaseNode{}): "comment on database",
reflect.TypeOf(&commentOnTableNode{}): "comment on table",
Expand Down Expand Up @@ -766,6 +770,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&renderNode{}): "render",
reflect.TypeOf(&rowCountNode{}): "count",
reflect.TypeOf(&rowSourceToPlanNode{}): "row source to plan node",
reflect.TypeOf(&scanBufferNode{}): "scan buffer node",
reflect.TypeOf(&scanNode{}): "scan",
reflect.TypeOf(&scatterNode{}): "scatter",
reflect.TypeOf(&scrubNode{}): "scrub",
Expand Down

0 comments on commit c89548c

Please sign in to comment.