-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
prepared_stmt.go
334 lines (300 loc) · 12.1 KB
/
prepared_stmt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"time"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
// PreparedStatementOrigin is an enum representing the source of where
// the prepare statement was made.
type PreparedStatementOrigin int
const (
// PreparedStatementOriginWire signifies the prepared statement was made
// over the wire.
PreparedStatementOriginWire PreparedStatementOrigin = iota + 1
// PreparedStatementOriginSQL signifies the prepared statement was made
// over a parsed SQL query.
PreparedStatementOriginSQL
// PreparedStatementOriginSessionMigration signifies that the prepared
// statement came from a call to crdb_internal.deserialize_session.
PreparedStatementOriginSessionMigration
)
// PreparedStatement is a SQL statement that has been parsed and the types
// of arguments and results have been determined.
//
// Note that PreparedStatements maintain a reference counter internally.
// References need to be registered with incRef() and de-registered with
// decRef().
type PreparedStatement struct {
querycache.PrepareMetadata
// Memo is the memoized data structure constructed by the cost-based optimizer
// during prepare of a SQL statement. It can significantly speed up execution
// if it is used by the optimizer as a starting point.
Memo *memo.Memo
// refCount keeps track of the number of references to this PreparedStatement.
// New references are registered through incRef().
// Once refCount hits 0 (through calls to decRef()), the following memAcc is
// closed.
// Most references are being held by portals created from this prepared
// statement.
refCount int
memAcc mon.BoundAccount
// createdAt is the timestamp this prepare statement was made at.
// Used for reporting on `pg_prepared_statements`.
createdAt time.Time
// origin is the protocol in which this prepare statement was created.
// Used for reporting on `pg_prepared_statements`.
origin PreparedStatementOrigin
}
// MemoryEstimate returns a rough estimate of the PreparedStatement's memory
// usage, in bytes.
func (p *PreparedStatement) MemoryEstimate() int64 {
// Account for the memory used by this prepared statement:
// 1. Size of the prepare metadata.
// 2. Size of the prepared memo, if using the cost-based optimizer.
size := p.PrepareMetadata.MemoryEstimate()
if p.Memo != nil {
size += p.Memo.MemoryEstimate()
}
return size
}
func (p *PreparedStatement) decRef(ctx context.Context) {
if p.refCount <= 0 {
log.Fatal(ctx, "corrupt PreparedStatement refcount")
}
p.refCount--
if p.refCount == 0 {
p.memAcc.Close(ctx)
}
}
func (p *PreparedStatement) incRef(ctx context.Context) {
if p.refCount <= 0 {
log.Fatal(ctx, "corrupt PreparedStatement refcount")
}
p.refCount++
}
// preparedStatementsAccessor gives a planner access to a session's collection
// of prepared statements.
type preparedStatementsAccessor interface {
// List returns all prepared statements as a map keyed by name.
// The map itself is a copy of the prepared statements.
List() map[string]*PreparedStatement
// Get returns the prepared statement with the given name. The returned bool
// is false if a statement with the given name doesn't exist.
Get(name string) (*PreparedStatement, bool)
// Delete removes the PreparedStatement with the provided name from the
// collection. If a portal exists for that statement, it is also removed.
// The method returns true if statement with that name was found and removed,
// false otherwise.
Delete(ctx context.Context, name string) bool
// DeleteAll removes all prepared statements and portals from the collection.
DeleteAll(ctx context.Context)
}
// PortalPausablity mark if the portal is pausable and the reason. This is
// needed to give the correct error for usage of multiple active portals.
type PortalPausablity int64
const (
// PortalPausabilityNotset the default status of a portal when
// sql.multiple_modifications_of_table.enabled is false.
PortalPausabilityNotset PortalPausablity = iota
// PausablePortal is set when sql.multiple_modifications_of_table.enabled is
// set to true and the underlying statement is a read-only SELECT query with
// no sub-queries or post-queries.
PausablePortal
// NotPausablePortalForUnsupportedStmt is used when the cluster setting
// sql.multiple_modifications_of_table.enabled is set to true, while we don't
// support underlying statement it.
NotPausablePortalForUnsupportedStmt
)
// PreparedPortal is a PreparedStatement that has been bound with query
// arguments.
type PreparedPortal struct {
Name string
Stmt *PreparedStatement
Qargs tree.QueryArguments
// OutFormats contains the requested formats for the output columns.
OutFormats []pgwirebase.FormatCode
// exhausted tracks whether this portal has already been fully exhausted,
// meaning that any additional attempts to execute it should return no
// rows.
exhausted bool
// portalPausablity is used to log the correct error message when user pause
// a portal.
// See comments for PortalPausablity for more details.
portalPausablity PortalPausablity
// pauseInfo is the saved info needed for a pausable portal.
pauseInfo *portalPauseInfo
}
// makePreparedPortal creates a new PreparedPortal.
//
// accountForCopy() doesn't need to be called on the prepared statement.
func (ex *connExecutor) makePreparedPortal(
ctx context.Context,
name string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
isInternal bool,
outFormats []pgwirebase.FormatCode,
) (PreparedPortal, error) {
portal := PreparedPortal{
Name: name,
Stmt: stmt,
Qargs: qargs,
OutFormats: outFormats,
}
// TODO(janexing): maybe we should also add telemetry for the stmt that the
// portal hooks on.
enableMultipleActivePortals.SetOnChange(&ex.server.cfg.Settings.SV, func(ctx context.Context) {
telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
})
if enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) && !isInternal {
if tree.IsReadOnly(stmt.AST) {
portal.pauseInfo = &portalPauseInfo{queryStats: &topLevelQueryStats{}}
portal.portalPausablity = PausablePortal
} else {
// We have set sql.defaults.multiple_active_portals.enabled to true, but
// we don't support the underlying query for a pausable portal.
portal.portalPausablity = NotPausablePortalForUnsupportedStmt
}
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}
// accountForCopy updates the state to account for the copy of the
// PreparedPortal (p is the copy).
func (p *PreparedPortal) accountForCopy(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string,
) error {
if err := prepStmtsNamespaceMemAcc.Grow(ctx, p.size(portalName)); err != nil {
return err
}
// Only increment the reference if we're going to keep it.
p.Stmt.incRef(ctx)
return nil
}
// close closes this portal.
func (p *PreparedPortal) close(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string,
) {
prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName))
p.Stmt.decRef(ctx)
if p.pauseInfo != nil {
p.pauseInfo.cleanupAll()
p.pauseInfo = nil
}
}
func (p *PreparedPortal) size(portalName string) int64 {
return int64(uintptr(len(portalName)) + unsafe.Sizeof(p))
}
func (p *PreparedPortal) isPausable() bool {
return p.pauseInfo != nil
}
// cleanupFuncStack stores cleanup functions for a portal. The clean-up
// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type cleanupFuncStack struct {
stack []namedFunc
isComplete bool
}
func (n *cleanupFuncStack) appendFunc(f namedFunc) {
n.stack = append(n.stack, f)
}
func (n *cleanupFuncStack) run() {
for i := 0; i < len(n.stack); i++ {
n.stack[i].f()
}
*n = cleanupFuncStack{}
}
// namedFunc is function with name, which makes the debugging easier. It is
// used just for clean up functions of a pausable portal.
type namedFunc struct {
fName string
f func()
}
// instrumentationHelperWrapper wraps the instrumentation helper.
// We need to maintain it for a paused portal.
type instrumentationHelperWrapper struct {
ih instrumentationHelper
}
// portalPauseInfo stores info that enables the pause of a portal. After pausing
// the portal, execute any other statement, and come back to re-execute it or
// close it.
type portalPauseInfo struct {
// curRes is the command result of the current execution. For each execution
// we update this field. We need this when we encounter an error during
// execution, so that the error is correctly transmitted.
curRes RestrictedCommandResult
// sp stores the tracing span of the underlying statement. It is closed when
// the portal finishes.
sp *tracing.Span
// outputTypes are the types of the result columns produced by the physical plan.
// We need this as when re-executing the portal, we are reusing the flow
// with the new receiver, but not re-generating the physical plan.
outputTypes []*types.T
// We need to store the flow for a portal so that when re-executing it, we
// continue from the previous execution. It lives along with the portal, and
// will be cleaned-up when the portal is closed.
flow flowinfra.Flow
// queryID stores the id of the query that this portal bound to. When we re-execute
// an existing portal, we should use the same query id.
queryID clusterunique.ID
// ihWrapper stores the instrumentation helper that should be reused for
// each execution of the portal.
ihWrapper *instrumentationHelperWrapper
// cancelQueryFunc will be called to cancel the context of the query when
// the portal is closed.
cancelQueryFunc context.CancelFunc
// cancelQueryCtx is the context to be canceled when closing the portal.
cancelQueryCtx context.Context
// curPlan collects the properties of the current plan being prepared.
// We reuse it when re-executing the portal.
planTop planTop
// queryStats stores statistics on query execution. It is incremented for
// each execution of the portal.
queryStats *topLevelQueryStats
// The following 4 stacks store functions to call when close the portal.
// They should be called in this order:
// flowCleanup -> dispatchToExecEngCleanup -> execStmtInOpenStateCleanup ->
// exhaustPortal.
// Each stack is defined in the closure of its corresponding function.
// When encounter an error in any of these function, we run cleanup of this
// layer and its children layers and propagate the error to the parent layer.
// For example, when encounter an error in execStmtInOpenStateCleanup(),
// run flowCleanup -> dispatchToExecEngCleanup -> execStmtInOpenStateCleanup
// when exiting connExecutor.execStmtInOpenState(), and finally run
// exhaustPortal in connExecutor.execPortal().
exhaustPortal cleanupFuncStack
execStmtInOpenStateCleanup cleanupFuncStack
dispatchToExecEngCleanup cleanupFuncStack
flowCleanup cleanupFuncStack
}
// cleanupAll is to run all the cleanup layers.
func (pm *portalPauseInfo) cleanupAll() {
pm.flowCleanup.run()
pm.dispatchToExecEngCleanup.run()
pm.execStmtInOpenStateCleanup.run()
pm.exhaustPortal.run()
}
// isQueryIDSet returns true if the query id for the portal is set.
func (pm *portalPauseInfo) isQueryIDSet() bool {
return !pm.queryID.Equal(clusterunique.ID{}.Uint128)
}