Skip to content

Commit

Permalink
Merge #38777
Browse files Browse the repository at this point in the history
38777: exec: add "auto" vectorize setting and make it default r=yuzefovich a=yuzefovich

First commit renames Vectorize to VectorizeMode and removes
"experimental_" prefix of vectorize setting.

Second commit adds a fourth option "auto" to vectorize execution mode setting
which plans the queries containing only streaming operators via the
vectorized engine. This option is made a default one. Other options
have been renamed: "on" -> "experimental_on" and "always" ->
"experimental_always" to highlight for users the risk of using them.

Fixes: #38620.
Fixes: #38920.

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 2, 2019
2 parents 802ae38 + 2900184 commit 3221107
Show file tree
Hide file tree
Showing 23 changed files with 201 additions and 168 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.defaults.default_int_size</code></td><td>integer</td><td><code>8</code></td><td>the size, in bytes, of an INT type</td></tr>
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>auto</code></td><td>default distributed SQL execution mode [off = 0, auto = 1, on = 2]</td></tr>
<tr><td><code>sql.defaults.experimental_vectorize</code></td><td>enumeration</td><td><code>off</code></td><td>default experimental_vectorize mode [off = 0, on = 1, always = 2]</td></tr>
<tr><td><code>sql.defaults.optimizer</code></td><td>enumeration</td><td><code>on</code></td><td>default cost-based optimizer mode [off = 0, on = 1, local = 2]</td></tr>
<tr><td><code>sql.defaults.reorder_joins_limit</code></td><td>integer</td><td><code>4</code></td><td>default number of joins to reorder</td></tr>
<tr><td><code>sql.defaults.results_buffer.size</code></td><td>byte size</td><td><code>16 KiB</code></td><td>default size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client. This can be overridden on an individual connection with the 'results_buffer_size' parameter. Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering.</td></tr>
<tr><td><code>sql.defaults.serial_normalization</code></td><td>enumeration</td><td><code>rowid</code></td><td>default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2]</td></tr>
<tr><td><code>sql.defaults.vectorize</code></td><td>enumeration</td><td><code>auto</code></td><td>default vectorize mode [off = 0, auto = 1, experimental_on = 2, experimental_always = 3]</td></tr>
<tr><td><code>sql.distsql.distribute_index_joins</code></td><td>boolean</td><td><code>true</code></td><td>if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader</td></tr>
<tr><td><code>sql.distsql.flow_stream_timeout</code></td><td>duration</td><td><code>10s</code></td><td>amount of time incoming streams wait for a flow to be set up before erroring out</td></tr>
<tr><td><code>sql.distsql.interleaved_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set we plan interleaved table joins instead of merge joins when possible</td></tr>
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,17 @@ func (dsp *DistSQLPlanner) setupFlows(
// TODO(yuzefovich): this is a safe but quite inefficient way of setting up
// vectorized flows since the flows will effectively be planned twice. Remove
// this once logic of falling back to DistSQL is bullet-proof.
if evalCtx.SessionData.Vectorize != sessiondata.VectorizeOff {
if evalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
for _, spec := range flows {
if err := distsqlrun.SupportsVectorized(
ctx, &distsqlrun.FlowCtx{EvalCtx: &evalCtx.EvalContext, NodeID: -1}, spec.Processors,
); err != nil {
// Vectorization attempt failed with an error.
returnVectorizationSetupError := false
if evalCtx.SessionData.Vectorize == sessiondata.VectorizeAlways {
if evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeExperimentalAlways {
returnVectorizationSetupError = true
// If running with VectorizeAlways, this check makes sure that we can
// still run SET statements (mostly to set experimental_vectorize to
// If running with VectorizeExperimentalAlways, this check makes sure
// that we can still run SET statements (mostly to set vectorize to
// off) and the like.
if len(spec.Processors) == 1 &&
spec.Processors[0].Core.LocalPlanNode != nil {
Expand All @@ -171,9 +171,9 @@ func (dsp *DistSQLPlanner) setupFlows(
// it can be used in the future, but setupReq is used only once, so we
// don't need to restore it.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
origMode := evalCtx.SessionData.Vectorize
evalCtx.SessionData.Vectorize = sessiondata.VectorizeOff
defer func() { evalCtx.SessionData.Vectorize = origMode }()
origMode := evalCtx.SessionData.VectorizeMode
evalCtx.SessionData.VectorizeMode = sessiondata.VectorizeOff
defer func() { evalCtx.SessionData.VectorizeMode = origMode }()
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func MakeEvalContext(evalCtx tree.EvalContext) EvalContext {
ApplicationName: evalCtx.SessionData.ApplicationName,
BytesEncodeFormat: be,
ExtraFloatDigits: int32(evalCtx.SessionData.DataConversion.ExtraFloatDigits),
Vectorize: int32(evalCtx.SessionData.Vectorize),
Vectorize: int32(evalCtx.SessionData.VectorizeMode),
}

// Populate the search path. Make sure not to include the implicit pg_catalog,
Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv"
"github.com/cockroachdb/cockroach/pkg/sql/exec/vecbuiltins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
semtypes "github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -100,6 +101,7 @@ type newColOperatorResult struct {
outputTypes []types.T
memUsage int
metadataSources []distsqlpb.MetadataSource
isStreaming bool
}

// newColOperator creates a new columnar operator according to the given spec.
Expand All @@ -118,12 +120,18 @@ func newColOperator(
// interface.
var columnTypes []semtypes.T

// By default, we safely assume that an operator is not streaming. Note that
// projections, renders, filters, limits, offsets as well as all internal
// operators (like stats collectors and cancel checkers) are streaming, so in
// order to determine whether the resulting chain of operators is streaming,
// it is sufficient to look only at the "core" operator.
result.isStreaming = false
switch {
case core.Noop != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
}
result.op = exec.NewNoop(inputs[0])
result.op, result.isStreaming = exec.NewNoop(inputs[0]), true
columnTypes = spec.Input[0].ColumnTypes
case core.TableReader != nil:
if err := checkNumIn(inputs, 0); err != nil {
Expand All @@ -146,7 +154,7 @@ func newColOperator(
// However, some of the long-running operators (for example, sorter) are
// still responsible for doing the cancellation check on their own while
// performing long operations.
result.op = exec.NewCancelChecker(result.op)
result.op, result.isStreaming = exec.NewCancelChecker(result.op), true
returnMutations := core.TableReader.Visibility == distsqlpb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC
columnTypes = core.TableReader.Table.ColumnTypesWithMutations(returnMutations)
case core.Aggregator != nil:
Expand Down Expand Up @@ -250,6 +258,7 @@ func newColOperator(
result.op, err = exec.NewOrderedAggregator(
inputs[0], typs, aggFns, aggSpec.GroupCols, aggCols, isScalarAggregate(aggSpec),
)
result.isStreaming = true
}

case core.Distinct != nil:
Expand Down Expand Up @@ -279,13 +288,14 @@ func newColOperator(
return result, err
}
result.op, err = exec.NewOrderedDistinct(inputs[0], core.Distinct.OrderedColumns, typs)
result.isStreaming = true

case core.Ordinality != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
}
columnTypes = append(spec.Input[0].ColumnTypes, *semtypes.Int)
result.op = exec.NewOrdinalityOp(inputs[0])
result.op, result.isStreaming = exec.NewOrdinalityOp(inputs[0]), true

case core.HashJoiner != nil:
if err := checkNumIn(inputs, 2); err != nil {
Expand Down Expand Up @@ -363,6 +373,9 @@ func newColOperator(
}

case core.MergeJoiner != nil:
// TODO(yuzefovich): merge joiner is streaming when both input sources are
// unique. We probably need to propagate that information from the
// optimizer.
if err := checkNumIn(inputs, 2); err != nil {
return result, err
}
Expand Down Expand Up @@ -470,7 +483,7 @@ func newColOperator(
columnTypes = jr.OutputTypes()
return jr, nil
})
result.op = c
result.op, result.isStreaming = c, true
result.metadataSources = append(result.metadataSources, c)

case core.Sorter != nil:
Expand All @@ -494,7 +507,7 @@ func newColOperator(
// exactly how many rows the sorter should output. Choose a top K sorter,
// which uses a heap to avoid storing more rows than necessary.
k := uint16(post.Limit + post.Offset)
result.op = exec.NewTopKSorter(input, inputTypes, orderingCols, k)
result.op, result.isStreaming = exec.NewTopKSorter(input, inputTypes, orderingCols, k), true
} else {
// No optimizations possible. Default to the standard sort operator.
result.op, err = exec.NewSorter(input, inputTypes, orderingCols)
Expand Down Expand Up @@ -536,6 +549,8 @@ func newColOperator(
if len(wf.Ordering.Columns) > 0 {
input, err = exec.NewSorter(input, typs, wf.Ordering.Columns)
}
// TODO(yuzefovich): when both PARTITION BY and ORDER BY clauses are
// omitted, the window function operator is actually streaming.
}
if err != nil {
return result, err
Expand Down Expand Up @@ -1367,6 +1382,10 @@ func (s *vectorizedFlowCreator) setupFlow(
if err != nil {
return errors.Wrapf(err, "unable to vectorize execution plan")
}
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.VectorizeAuto &&
!result.isStreaming {
return errors.Errorf("non-streaming operator encountered when vectorize=auto")
}
if err = acc.Grow(ctx, int64(result.memUsage)); err != nil {
return errors.Wrapf(err, "not enough memory to setup vectorized plan")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
ctx, f.ctxCancel = contextutil.WithCancel(ctx)
f.ctxDone = ctx.Done()

if f.EvalCtx.SessionData.Vectorize != sessiondata.VectorizeOff {
log.VEventf(ctx, 1, "setting up vectorize flow %d with setting %s", f.id, f.EvalCtx.SessionData.Vectorize)
if f.EvalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
log.VEventf(ctx, 1, "setting up vectorize flow %d", f.id)
acc := f.EvalCtx.Mon.MakeBoundAccount()
f.vectorizedBoundAccount = &acc
err := f.setupVectorizedFlow(ctx, f.vectorizedBoundAccount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow_vectorize_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVectorizeSpaceError(t *testing.T) {
EvalCtx: &evalCtx,
}

// Without a limit, the default sorter creates a vectorized operater
// Without a limit, the default sorter creates a vectorized operator
// that we don't know memory usage of statically.
sorterCore := &distsqlpb.SorterSpec{
OutputOrdering: distsqlpb.Ordering{
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsqlrun/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,16 +1213,16 @@ type LocalProcessor interface {
}

// VectorizeAlwaysException is an object that returns whether or not execution
// should continue if experimental_vectorize=always and an error occurred when
// should continue if vectorize=experimental_always and an error occurred when
// setting up the vectorized flow. Consider the case in which
// experimental_vectorize=always. The user must be able to unset this session
// vectorize=experimental_always. The user must be able to unset this session
// variable without getting an error.
type VectorizeAlwaysException interface {
// IsException returns whether this object should be an exception to the rule
// that an inability to run this node in a vectorized flow should produce an
// error.
// TODO(asubiotto): This is the cleanest way I can think of to not error out
// on SET statements when running with experimental_vectorize = always. If
// on SET statements when running with vectorize = experimental_always. If
// there is a better way, we should get rid of this interface.
IsException() bool
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (ds *ServerImpl) setupFlow(
BytesEncodeFormat: be,
ExtraFloatDigits: int(req.EvalContext.ExtraFloatDigits),
},
Vectorize: sessiondata.VectorizeExecMode(req.EvalContext.Vectorize),
VectorizeMode: sessiondata.VectorizeExecMode(req.EvalContext.Vectorize),
}
// Enable better compatibility with PostgreSQL date math.
if req.Version >= 22 {
Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ var ReorderJoinsLimitClusterValue = settings.RegisterValidatedIntSetting(
// VectorizeClusterMode controls the cluster default for when automatic
// vectorization is enabled.
var VectorizeClusterMode = settings.RegisterEnumSetting(
"sql.defaults.experimental_vectorize",
"default experimental_vectorize mode",
"off",
"sql.defaults.vectorize",
"default vectorize mode",
"auto",
map[int64]string{
int64(sessiondata.VectorizeOff): "off",
int64(sessiondata.VectorizeOn): "on",
int64(sessiondata.VectorizeAlways): "always",
int64(sessiondata.VectorizeOff): "off",
int64(sessiondata.VectorizeAuto): "auto",
int64(sessiondata.VectorizeExperimentalOn): "experimental_on",
int64(sessiondata.VectorizeExperimentalAlways): "experimental_always",
},
)

Expand Down Expand Up @@ -1800,7 +1801,7 @@ func (m *sessionDataMutator) SetReorderJoinsLimit(val int) {
}

func (m *sessionDataMutator) SetVectorize(val sessiondata.VectorizeExecMode) {
m.data.Vectorize = val
m.data.VectorizeMode = val
}

func (m *sessionDataMutator) SetOptimizerMode(val sessiondata.OptimizerMode) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ type testClusterConfig struct {
overrideOptimizerMode string
// if non-empty, overrides the default distsql mode.
overrideDistSQLMode string
// if non-empty, overrides the default experimental_vectorize mode.
overrideExpVectorize string
// if non-empty, overrides the default vectorize mode.
overrideVectorize string
// if non-empty, overrides the default automatic statistics mode.
overrideAutoStats string
// if set, queries using distSQL processors that can fall back to disk do
Expand Down Expand Up @@ -422,17 +422,17 @@ var logicTestConfigs = []testClusterConfig{
disableUpgrade: true,
},
{name: "local-opt", numNodes: 1, overrideDistSQLMode: "off", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "local-vec", numNodes: 1, overrideOptimizerMode: "on", overrideExpVectorize: "on"},
{name: "local-vec", numNodes: 1, overrideOptimizerMode: "on", overrideVectorize: "experimental_on"},
{name: "fakedist", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off"},
{name: "fakedist-opt", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "fakedist-vec", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false", overrideExpVectorize: "on"},
{name: "fakedist-vec", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false", overrideVectorize: "experimental_on"},
{name: "fakedist-metadata", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off",
distSQLMetadataTestEnabled: true, skipShort: true},
{name: "fakedist-disk", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off",
distSQLUseDisk: true, skipShort: true},
{name: "5node-local", numNodes: 5, overrideDistSQLMode: "off", overrideOptimizerMode: "off"},
{name: "5node-dist", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "off"},
{name: "5node-dist-vec", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideExpVectorize: "on", overrideAutoStats: "false"},
{name: "5node-dist-vec", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideVectorize: "experimental_on", overrideAutoStats: "false"},
{name: "5node-dist-opt", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "5node-dist-metadata", numNodes: 5, overrideDistSQLMode: "on", distSQLMetadataTestEnabled: true,
skipShort: true, overrideOptimizerMode: "off"},
Expand Down Expand Up @@ -1062,9 +1062,9 @@ func (t *logicTest) setup(cfg testClusterConfig) {
return nil
})
}
if cfg.overrideExpVectorize != "" {
if cfg.overrideVectorize != "" {
if _, err := t.cluster.ServerConn(0).Exec(
"SET CLUSTER SETTING sql.defaults.experimental_vectorize = $1::string", cfg.overrideExpVectorize,
"SET CLUSTER SETTING sql.defaults.vectorize = $1::string", cfg.overrideVectorize,
); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/dist_vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJzUl89vm0oQx-_vr0Bzek9ZP9

# Verify execution.
statement ok
SET experimental_vectorize = always
SET vectorize = experimental_always

query I rowsort
SELECT kv.k FROM kv JOIN kw ON kv.k = kw.k
Expand All @@ -72,7 +72,7 @@ SELECT kv.k FROM kv JOIN kw ON kv.k = kw.k
5

statement ok
RESET experimental_vectorize
RESET vectorize

# Regression test for #38919.
statement ok
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/exec_window
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ INSERT INTO t VALUES
(1, 2, 3)

statement ok
SET experimental_vectorize=always
SET vectorize=experimental_always

# Test that non-default window frames are not supported.
statement error window functions with non-default window frames are not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lFGL1DAQx9_9FGGeFFKapq
query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) SELECT k FROM kv WHERE k = 0];
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkD9LxEAQxXs_xfJsR7JptzrsrvHktJMUe9nhCCbZMDNRjyPfXZIFxUK48v3Ztz_mijEnfooDK8IbajSESXLLqllWqxT26QvBE7pxmm21G0KbhRGusM56RsBrPPV85JhYKg9CYotdv81O0g1RLrv3DxBepjhqcJV_qHx1D8JhtuB2NQiSP9UJxxTcuqAW-95ZN3BwXkE4XYx_Cu4RzULIs_0yqcUzI9QL3c59ZJ3yqPwH-b9lvzQETmcut9E8S8vPktvtmyIP27vNSKxW0rqI_ViipVnuvgMAAP__mHp6ww==
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkDFPwzAUhHd-hXWshjirp65dKCpsKINrPxULJ7b8XiqqKv8dJWYBCYnxvvOdfb5hyoGe3EgM-4Yeg0ap2RNzritqB_bhE9ZoxKnMsuJBw-dKsDdIlESweHWnREdygWpnoBFIXExbbalxdPW6-7hA46W4ia3qzENnuntoHGaxatdD4-TEvxOrPEtZ4Vojc0m_EFMiL_ES5WqVeTQbE5eSkjiSVYYxLBot8v1aFncm2H7R_190JC55Yvox5q9mswwaFM7Ufo3zXD091-y3a5o8bLkNBGJpbt_EfmrWMix3XwEAAP__bYGETQ==
Loading

0 comments on commit 3221107

Please sign in to comment.