Skip to content

Commit

Permalink
sql: add buffer and scanBuffer nodes
Browse files Browse the repository at this point in the history
Adds bufferNode that consumes its input, stores all the rows in a
buffer, and then proceeds on passing the rows through. The buffer
can be iterated over multiple times using scanBuffer node that is
referencing a single bufferNode.

Release note: None
  • Loading branch information
yuzefovich committed May 15, 2019
1 parent ba58a7e commit 5c1d0f9
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 0 deletions.
97 changes: 97 additions & 0 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

// bufferNode consumes its input one row at a time, stores it in the buffer,
// and passes the row through. The buffered rows can be iterated over multiple
// times.
type bufferNode struct {
plan planNode

// TODO(yuzefovich): the buffer should probably be backed by disk. If so, the
// comments about TempStorage suggest that it should be used by DistSQL
// processors, but this node is local.
bufferedRows *rowcontainer.RowContainer
passThruNextRowIdx int
}

func (n *bufferNode) startExec(params runParams) error {
n.bufferedRows = rowcontainer.NewRowContainer(
params.EvalContext().Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)),
0, /* rowCapacity */
)
return nil
}

func (n *bufferNode) Next(params runParams) (bool, error) {
if err := params.p.cancelChecker.Check(); err != nil {
return false, err
}
ok, err := n.plan.Next(params)
if err != nil {
return false, err
}
if !ok {
return false, nil
}
if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil {
return false, err
}
n.passThruNextRowIdx++
return true, nil
}

func (n *bufferNode) Values() tree.Datums {
return n.bufferedRows.At(n.passThruNextRowIdx - 1)
}

func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.bufferedRows.Close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
// referencing. The bufferNode can be iterated over multiple times
// simultaneously, however, a new scanBufferNode is needed.
type scanBufferNode struct {
buffer *bufferNode

nextRowIdx int
}

func (n *scanBufferNode) startExec(runParams) error {
return nil
}

func (n *scanBufferNode) Next(runParams) (bool, error) {
n.nextRowIdx++
return n.nextRowIdx <= n.buffer.bufferedRows.Len(), nil
}

func (n *scanBufferNode) Values() tree.Datums {
return n.buffer.bufferedRows.At(n.nextRowIdx - 1)
}

func (n *scanBufferNode) Close(context.Context) {
}
4 changes: 4 additions & 0 deletions pkg/sql/expand_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,9 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *errorIfRowsNode:
n.plan = p.simplifyOrderings(n.plan, nil)

case *bufferNode:
n.plan = p.simplifyOrderings(n.plan, usefulOrdering)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -904,6 +907,7 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *setZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ func (p *planner) propagateFilters(
return plan, extraFilter, err
}

case *bufferNode:
if n.plan, err = p.triggerFilterPropagation(ctx, n.plan); err != nil {
return plan, extraFilter, err
}

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -401,6 +406,7 @@ func (p *planner) propagateFilters(
case *setZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *errorIfRowsNode:
p.setUnlimited(n.plan)

case *bufferNode:
p.setUnlimited(n.plan)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -237,6 +240,7 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

case *applyJoinNode, *lookupJoinNode, *zigzagJoinNode, *saveTableNode:
// These nodes are only planned by the optimizer.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_needed.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func setNeededColumns(plan planNode, needed []bool) {
case *errorIfRowsNode:
setNeededColumns(n.plan, allColumns(n.plan))

case *bufferNode:
setNeededColumns(n.plan, needed)

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -293,6 +296,7 @@ func setNeededColumns(plan planNode, needed []bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

default:
panic(fmt.Sprintf("unhandled node type: %T", plan))
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type planNodeFastPath interface {
var _ planNode = &alterIndexNode{}
var _ planNode = &alterSequenceNode{}
var _ planNode = &alterTableNode{}
var _ planNode = &bufferNode{}
var _ planNode = &cancelQueriesNode{}
var _ planNode = &cancelSessionsNode{}
var _ planNode = &createDatabaseNode{}
Expand Down Expand Up @@ -204,6 +205,7 @@ var _ planNode = &renameIndexNode{}
var _ planNode = &renameTableNode{}
var _ planNode = &renderNode{}
var _ planNode = &rowCountNode{}
var _ planNode = &scanBufferNode{}
var _ planNode = &scanNode{}
var _ planNode = &scatterNode{}
var _ planNode = &serializeNode{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/plan_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {

// Nodes that have the same schema as their source or their
// valueNode helper.
case *bufferNode:
return getPlanColumns(n.plan, mut)
case *distinctNode:
return getPlanColumns(n.plan, mut)
case *filterNode:
Expand All @@ -132,6 +134,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {
return getPlanColumns(n.source, mut)
case *saveTableNode:
return getPlanColumns(n.source, mut)
case *scanBufferNode:
return getPlanColumns(n.buffer, mut)

case *rowSourceToPlanNode:
return n.planCols
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan_physical_props.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func planPhysicalProps(plan planNode) physicalProps {
case *zigzagJoinNode:
return n.props
case *applyJoinNode:
case *bufferNode:
case *scanBufferNode:

// Every other node simply has no guarantees on its output rows.
case *CreateUserNode:
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,9 @@ func (v *planVisitor) visitInternal(plan planNode, name string) {

case *errorIfRowsNode:
n.plan = v.visit(n.plan)

case *bufferNode:
n.plan = v.visit(n.plan)
}
}

Expand Down Expand Up @@ -727,6 +730,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&alterTableNode{}): "alter table",
reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user",
reflect.TypeOf(&applyJoinNode{}): "apply-join",
reflect.TypeOf(&bufferNode{}): "buffer node",
reflect.TypeOf(&commentOnColumnNode{}): "comment on column",
reflect.TypeOf(&commentOnDatabaseNode{}): "comment on database",
reflect.TypeOf(&commentOnTableNode{}): "comment on table",
Expand Down Expand Up @@ -773,6 +777,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&rowCountNode{}): "count",
reflect.TypeOf(&rowSourceToPlanNode{}): "row source to plan node",
reflect.TypeOf(&saveTableNode{}): "save table",
reflect.TypeOf(&scanBufferNode{}): "scan buffer node",
reflect.TypeOf(&scanNode{}): "scan",
reflect.TypeOf(&scatterNode{}): "scatter",
reflect.TypeOf(&scrubNode{}): "scrub",
Expand Down

0 comments on commit 5c1d0f9

Please sign in to comment.