Skip to content

Commit

Permalink
sql: add buffer and scanBuffer nodes
Browse files Browse the repository at this point in the history
Adds bufferNode that consumes its input, stores all the rows in a
buffer, and then proceeds on passing the rows through. The buffer
can be iterated over multiple times using scanBuffer node that is
referencing a single bufferNode.

Release note: None
  • Loading branch information
yuzefovich committed Apr 25, 2019
1 parent df024f7 commit 17b958d
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 0 deletions.
115 changes: 115 additions & 0 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/rowcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)

// bufferNode consumes all the input rows at once and writes them to a buffer.
// After the input has been fully consumed, it proceeds on passing the rows
// through. The buffer can be iterated over multiple times.
// TODO(yuzefovich): is this buffering all rows at once desirable?
// TODO(yuzefovich): current version supports only a single scanBufferNode at a
// time, is this sufficient?
type bufferNode struct {
plan planNode
inputDone bool

// TODO(yuzefovich): should the buffer be backed by the disk? If so, the
// comments about TempStorage suggest that it should be used by DistSQL
// processors, but this node is local.
bufferedRows *rowcontainer.RowContainer
nextRowIdx int
}

func (n *bufferNode) startExec(params runParams) error {
n.bufferedRows = rowcontainer.NewRowContainer(
params.EvalContext().Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)),
0, /* rowCapacity */
)
return nil
}

func (n *bufferNode) Next(params runParams) (bool, error) {
if !n.inputDone {
for {
if err := params.p.cancelChecker.Check(); err != nil {
return false, err
}
ok, err := n.plan.Next(params)
if err != nil {
return false, err
}
if !ok {
n.inputDone = true
break
}
if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil {
return false, err
}
}
}
if n.nextRowIdx == n.bufferedRows.Len() {
return false, nil
}
n.nextRowIdx++
return true, nil
}

func (n *bufferNode) Values() tree.Datums {
return n.bufferedRows.At(n.nextRowIdx - 1)
}

// TODO(yuzefovich): does this need to have some special behavior?
func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.bufferedRows.Close(ctx)
}

// Rewind resets the index of the row to be returned next which allows for
// multiple iterations over the buffer. Notably, it doesn't reset the source.
func (n *bufferNode) Rewind() {
n.nextRowIdx = 0
}

// scanBufferNode behaves like an iterator into the bufferNode it is
// referencing. The bufferNode can be iterated over multiple times, however, a
// new scanBufferNode is needed.
type scanBufferNode struct {
buffer *bufferNode
}

func (n *scanBufferNode) startExec(params runParams) error {
n.buffer.Rewind()
return nil
}

func (n *scanBufferNode) Next(params runParams) (bool, error) {
return n.buffer.Next(params)
}

func (n *scanBufferNode) Values() tree.Datums {
return n.buffer.Values()
}

// Note that scanBufferNode does not close the corresponding to it bufferNode.
func (n *scanBufferNode) Close(context.Context) {
}
4 changes: 4 additions & 0 deletions pkg/sql/expand_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *controlJobsNode:
n.rows = p.simplifyOrderings(n.rows, nil)

case *bufferNode:
n.plan = p.simplifyOrderings(n.plan, usefulOrdering)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -900,6 +903,7 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *showZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ func (p *planner) propagateFilters(
return plan, extraFilter, err
}

case *bufferNode:
if n.plan, err = p.triggerFilterPropagation(ctx, n.plan); err != nil {
return plan, extraFilter, err
}

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -397,6 +402,7 @@ func (p *planner) propagateFilters(
case *showZoneConfigNode:
case *showFingerprintsNode:
case *showTraceNode:
case *scanBufferNode:
case *scatterNode:

default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *controlJobsNode:
p.setUnlimited(n.rows)

case *bufferNode:
p.setUnlimited(n.plan)

case *valuesNode:
case *virtualTableNode:
case *alterIndexNode:
Expand Down Expand Up @@ -235,6 +238,7 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

case *applyJoinNode:
// The apply join node is only planned by the optimizer.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt_needed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func setNeededColumns(plan planNode, needed []bool) {
case *controlJobsNode:
setNeededColumns(n.rows, allColumns(n.rows))

case *bufferNode:
setNeededColumns(n.plan, needed)

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
Expand Down Expand Up @@ -291,6 +294,7 @@ func setNeededColumns(plan planNode, needed []bool) {
case *showFingerprintsNode:
case *showTraceNode:
case *scatterNode:
case *scanBufferNode:

default:
panic(fmt.Sprintf("unhandled node type: %T", plan))
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type planNodeFastPath interface {
var _ planNode = &alterIndexNode{}
var _ planNode = &alterSequenceNode{}
var _ planNode = &alterTableNode{}
var _ planNode = &bufferNode{}
var _ planNode = &cancelQueriesNode{}
var _ planNode = &cancelSessionsNode{}
var _ planNode = &createDatabaseNode{}
Expand Down Expand Up @@ -203,6 +204,7 @@ var _ planNode = &renameIndexNode{}
var _ planNode = &renameTableNode{}
var _ planNode = &renderNode{}
var _ planNode = &rowCountNode{}
var _ planNode = &scanBufferNode{}
var _ planNode = &scanNode{}
var _ planNode = &scatterNode{}
var _ planNode = &serializeNode{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/plan_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {

// Nodes that have the same schema as their source or their
// valueNode helper.
case *bufferNode:
return getPlanColumns(n.plan, mut)
case *distinctNode:
return getPlanColumns(n.plan, mut)
case *filterNode:
Expand All @@ -132,6 +134,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns {
return getPlanColumns(n.source, mut)
case *serializeNode:
return getPlanColumns(n.source, mut)
case *scanBufferNode:
return getPlanColumns(n.buffer, mut)

case *rowSourceToPlanNode:
return n.planCols
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan_physical_props.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func planPhysicalProps(plan planNode) physicalProps {
case *zigzagJoinNode:
return n.props
case *applyJoinNode:
case *bufferNode:
case *scanBufferNode:

// Every other node simply has no guarantees on its output rows.
case *CreateUserNode:
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ func (v *planVisitor) visitInternal(plan planNode, name string) {
if v.observer.followRowSourceToPlanNode && n.originalPlanNode != nil {
v.visit(n.originalPlanNode)
}

case *bufferNode:
n.plan = v.visit(n.plan)
}
}

Expand Down Expand Up @@ -718,6 +721,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&alterTableNode{}): "alter table",
reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user",
reflect.TypeOf(&applyJoinNode{}): "apply-join",
reflect.TypeOf(&bufferNode{}): "buffer node",
reflect.TypeOf(&commentOnColumnNode{}): "comment on column",
reflect.TypeOf(&commentOnDatabaseNode{}): "comment on database",
reflect.TypeOf(&commentOnTableNode{}): "comment on table",
Expand Down Expand Up @@ -762,6 +766,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&renderNode{}): "render",
reflect.TypeOf(&rowCountNode{}): "count",
reflect.TypeOf(&rowSourceToPlanNode{}): "row source to plan node",
reflect.TypeOf(&scanBufferNode{}): "scan buffer node",
reflect.TypeOf(&scanNode{}): "scan",
reflect.TypeOf(&scatterNode{}): "scatter",
reflect.TypeOf(&scrubNode{}): "scrub",
Expand Down

0 comments on commit 17b958d

Please sign in to comment.