diff --git a/pkg/sql/buffer.go b/pkg/sql/buffer.go new file mode 100644 index 000000000000..7abaa3760b03 --- /dev/null +++ b/pkg/sql/buffer.go @@ -0,0 +1,97 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" +) + +// bufferNode consumes its input one row at a time, stores it in the buffer, +// and passes the row through. The buffered rows can be iterated over multiple +// times. +type bufferNode struct { + plan planNode + + // TODO(yuzefovich): the buffer should probably be backed by disk. If so, the + // comments about TempStorage suggest that it should be used by DistSQL + // processors, but this node is local. + bufferedRows *rowcontainer.RowContainer + passThruNextRowIdx int +} + +func (n *bufferNode) startExec(params runParams) error { + n.bufferedRows = rowcontainer.NewRowContainer( + params.EvalContext().Mon.MakeBoundAccount(), + sqlbase.ColTypeInfoFromResCols(getPlanColumns(n.plan, false /* mut */)), + 0, /* rowCapacity */ + ) + return nil +} + +func (n *bufferNode) Next(params runParams) (bool, error) { + if err := params.p.cancelChecker.Check(); err != nil { + return false, err + } + ok, err := n.plan.Next(params) + if err != nil { + return false, err + } + if !ok { + return false, nil + } + if _, err = n.bufferedRows.AddRow(params.ctx, n.plan.Values()); err != nil { + return false, err + } + n.passThruNextRowIdx++ + return true, nil +} + +func (n *bufferNode) Values() tree.Datums { + return n.bufferedRows.At(n.passThruNextRowIdx - 1) +} + +func (n *bufferNode) Close(ctx context.Context) { + n.plan.Close(ctx) + n.bufferedRows.Close(ctx) +} + +// scanBufferNode behaves like an iterator into the bufferNode it is +// referencing. The bufferNode can be iterated over multiple times +// simultaneously, however, a new scanBufferNode is needed. +type scanBufferNode struct { + buffer *bufferNode + + nextRowIdx int +} + +func (n *scanBufferNode) startExec(runParams) error { + return nil +} + +func (n *scanBufferNode) Next(runParams) (bool, error) { + n.nextRowIdx++ + return n.nextRowIdx <= n.buffer.bufferedRows.Len(), nil +} + +func (n *scanBufferNode) Values() tree.Datums { + return n.buffer.bufferedRows.At(n.nextRowIdx - 1) +} + +func (n *scanBufferNode) Close(context.Context) { +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index c4559249e645..d2d478bd4a37 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -868,6 +868,9 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *errorIfRowsNode: n.plan = p.simplifyOrderings(n.plan, nil) + case *bufferNode: + n.plan = p.simplifyOrderings(n.plan, usefulOrdering) + case *valuesNode: case *virtualTableNode: case *alterIndexNode: @@ -904,6 +907,7 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *setZoneConfigNode: case *showFingerprintsNode: case *showTraceNode: + case *scanBufferNode: case *scatterNode: default: diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 86278cbf9f5f..5dd71744a3a4 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -366,6 +366,11 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *bufferNode: + if n.plan, err = p.triggerFilterPropagation(ctx, n.plan); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: @@ -401,6 +406,7 @@ func (p *planner) propagateFilters( case *setZoneConfigNode: case *showFingerprintsNode: case *showTraceNode: + case *scanBufferNode: case *scatterNode: default: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index fa912e1c6bc5..f5349c7e5f52 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -199,6 +199,9 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *errorIfRowsNode: p.setUnlimited(n.plan) + case *bufferNode: + p.setUnlimited(n.plan) + case *valuesNode: case *virtualTableNode: case *alterIndexNode: @@ -237,6 +240,7 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *showFingerprintsNode: case *showTraceNode: case *scatterNode: + case *scanBufferNode: case *applyJoinNode, *lookupJoinNode, *zigzagJoinNode, *saveTableNode: // These nodes are only planned by the optimizer. diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index 4b6f8c8a7921..3b1eabfcf79a 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -257,6 +257,9 @@ func setNeededColumns(plan planNode, needed []bool) { case *errorIfRowsNode: setNeededColumns(n.plan, allColumns(n.plan)) + case *bufferNode: + setNeededColumns(n.plan, needed) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: @@ -293,6 +296,7 @@ func setNeededColumns(plan planNode, needed []bool) { case *showFingerprintsNode: case *showTraceNode: case *scatterNode: + case *scanBufferNode: default: panic(fmt.Sprintf("unhandled node type: %T", plan)) diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 0f800e0bef91..54da77ef2a4e 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -165,6 +165,7 @@ type planNodeFastPath interface { var _ planNode = &alterIndexNode{} var _ planNode = &alterSequenceNode{} var _ planNode = &alterTableNode{} +var _ planNode = &bufferNode{} var _ planNode = &cancelQueriesNode{} var _ planNode = &cancelSessionsNode{} var _ planNode = &createDatabaseNode{} @@ -204,6 +205,7 @@ var _ planNode = &renameIndexNode{} var _ planNode = &renameTableNode{} var _ planNode = &renderNode{} var _ planNode = &rowCountNode{} +var _ planNode = &scanBufferNode{} var _ planNode = &scanNode{} var _ planNode = &scatterNode{} var _ planNode = &serializeNode{} diff --git a/pkg/sql/plan_columns.go b/pkg/sql/plan_columns.go index ea6d186d3a90..2d59d36ec358 100644 --- a/pkg/sql/plan_columns.go +++ b/pkg/sql/plan_columns.go @@ -118,6 +118,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns { // Nodes that have the same schema as their source or their // valueNode helper. + case *bufferNode: + return getPlanColumns(n.plan, mut) case *distinctNode: return getPlanColumns(n.plan, mut) case *filterNode: @@ -132,6 +134,8 @@ func getPlanColumns(plan planNode, mut bool) sqlbase.ResultColumns { return getPlanColumns(n.source, mut) case *saveTableNode: return getPlanColumns(n.source, mut) + case *scanBufferNode: + return getPlanColumns(n.buffer, mut) case *rowSourceToPlanNode: return n.planCols diff --git a/pkg/sql/plan_physical_props.go b/pkg/sql/plan_physical_props.go index 4bf91d54840a..d9bfc8fd9497 100644 --- a/pkg/sql/plan_physical_props.go +++ b/pkg/sql/plan_physical_props.go @@ -93,6 +93,8 @@ func planPhysicalProps(plan planNode) physicalProps { case *zigzagJoinNode: return n.props case *applyJoinNode: + case *bufferNode: + case *scanBufferNode: // Every other node simply has no guarantees on its output rows. case *CreateUserNode: diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index cec0af364580..1be7964fec67 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -621,6 +621,9 @@ func (v *planVisitor) visitInternal(plan planNode, name string) { case *errorIfRowsNode: n.plan = v.visit(n.plan) + + case *bufferNode: + n.plan = v.visit(n.plan) } } @@ -727,6 +730,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterTableNode{}): "alter table", reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", reflect.TypeOf(&applyJoinNode{}): "apply-join", + reflect.TypeOf(&bufferNode{}): "buffer node", reflect.TypeOf(&commentOnColumnNode{}): "comment on column", reflect.TypeOf(&commentOnDatabaseNode{}): "comment on database", reflect.TypeOf(&commentOnTableNode{}): "comment on table", @@ -773,6 +777,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&rowCountNode{}): "count", reflect.TypeOf(&rowSourceToPlanNode{}): "row source to plan node", reflect.TypeOf(&saveTableNode{}): "save table", + reflect.TypeOf(&scanBufferNode{}): "scan buffer node", reflect.TypeOf(&scanNode{}): "scan", reflect.TypeOf(&scatterNode{}): "scatter", reflect.TypeOf(&scrubNode{}): "scrub",