-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
simple_project.go
147 lines (129 loc) · 4.19 KB
/
simple_project.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexec
import (
"context"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// simpleProjectOp is an operator that implements "simple projection" - removal of
// columns that aren't needed by later operators.
type simpleProjectOp struct {
oneInputCloserHelper
NonExplainable
projection []uint32
batches map[coldata.Batch]*projectingBatch
// numBatchesLoggingThreshold is the threshold on the number of items in
// 'batches' map at which we will log a message when a new projectingBatch
// is created. It is growing exponentially.
numBatchesLoggingThreshold int
}
var _ closableOperator = &simpleProjectOp{}
var _ ResettableOperator = &simpleProjectOp{}
// projectingBatch is a Batch that applies a simple projection to another,
// underlying batch, discarding all columns but the ones in its projection
// slice, in order.
type projectingBatch struct {
coldata.Batch
projection []uint32
// colVecs is a lazily populated slice of coldata.Vecs to support returning
// these in ColVecs().
colVecs []coldata.Vec
}
func newProjectionBatch(projection []uint32) *projectingBatch {
p := &projectingBatch{
projection: make([]uint32, len(projection)),
}
// We make a copy of projection to be safe.
copy(p.projection, projection)
return p
}
func (b *projectingBatch) ColVec(i int) coldata.Vec {
return b.Batch.ColVec(int(b.projection[i]))
}
func (b *projectingBatch) ColVecs() []coldata.Vec {
if b.Batch == coldata.ZeroBatch {
return nil
}
if b.colVecs == nil || len(b.colVecs) != len(b.projection) {
b.colVecs = make([]coldata.Vec, len(b.projection))
}
for i := range b.colVecs {
b.colVecs[i] = b.Batch.ColVec(int(b.projection[i]))
}
return b.colVecs
}
func (b *projectingBatch) Width() int {
return len(b.projection)
}
func (b *projectingBatch) AppendCol(col coldata.Vec) {
b.Batch.AppendCol(col)
b.projection = append(b.projection, uint32(b.Batch.Width())-1)
}
func (b *projectingBatch) ReplaceCol(col coldata.Vec, idx int) {
b.Batch.ReplaceCol(col, int(b.projection[idx]))
}
// NewSimpleProjectOp returns a new simpleProjectOp that applies a simple
// projection on the columns in its input batch, returning a new batch with
// only the columns in the projection slice, in order. In a degenerate case
// when input already outputs batches that satisfy the projection, a
// simpleProjectOp is not planned and input is returned.
func NewSimpleProjectOp(
input colexecbase.Operator, numInputCols int, projection []uint32,
) colexecbase.Operator {
if numInputCols == len(projection) {
projectionIsRedundant := true
for i := range projection {
if projection[i] != uint32(i) {
projectionIsRedundant = false
}
}
if projectionIsRedundant {
return input
}
}
s := &simpleProjectOp{
oneInputCloserHelper: makeOneInputCloserHelper(input),
projection: make([]uint32, len(projection)),
batches: make(map[coldata.Batch]*projectingBatch),
numBatchesLoggingThreshold: 128,
}
// We make a copy of projection to be safe.
copy(s.projection, projection)
return s
}
func (d *simpleProjectOp) Init() {
d.input.Init()
}
func (d *simpleProjectOp) Next(ctx context.Context) coldata.Batch {
batch := d.input.Next(ctx)
if batch.Length() == 0 {
return coldata.ZeroBatch
}
projBatch, found := d.batches[batch]
if !found {
projBatch = newProjectionBatch(d.projection)
d.batches[batch] = projBatch
if len(d.batches) == d.numBatchesLoggingThreshold {
if log.V(1) {
log.Infof(ctx, "simpleProjectOp: size of 'batches' map = %d", len(d.batches))
}
d.numBatchesLoggingThreshold = d.numBatchesLoggingThreshold * 2
}
}
projBatch.Batch = batch
return projBatch
}
func (d *simpleProjectOp) reset(ctx context.Context) {
if r, ok := d.input.(resetter); ok {
r.reset(ctx)
}
}