Skip to content

Commit

Permalink
distsqlrun: add "auto" setting to vectorize
Browse files Browse the repository at this point in the history
Adds a fourth option "auto" to vectorize execution mode setting
which plans the queries containing only streaming operators via the
vectorized engine. This option is made a default one. Other options
have been renamed: "on" -> "experimental_on" and "always" ->
"experimental_always" to highlight for users the risk of using them.

Release note: None
  • Loading branch information
yuzefovich committed Aug 2, 2019
1 parent 64e155d commit fa5f826
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 47 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<tr><td><code>sql.defaults.reorder_joins_limit</code></td><td>integer</td><td><code>4</code></td><td>default number of joins to reorder</td></tr>
<tr><td><code>sql.defaults.results_buffer.size</code></td><td>byte size</td><td><code>16 KiB</code></td><td>default size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client. This can be overridden on an individual connection with the 'results_buffer_size' parameter. Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering.</td></tr>
<tr><td><code>sql.defaults.serial_normalization</code></td><td>enumeration</td><td><code>rowid</code></td><td>default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2]</td></tr>
<tr><td><code>sql.defaults.vectorize</code></td><td>enumeration</td><td><code>off</code></td><td>default vectorize mode [off = 0, on = 1, always = 2]</td></tr>
<tr><td><code>sql.defaults.vectorize</code></td><td>enumeration</td><td><code>auto</code></td><td>default vectorize mode [off = 0, auto = 1, experimental_on = 2, experimental_always = 3]</td></tr>
<tr><td><code>sql.distsql.distribute_index_joins</code></td><td>boolean</td><td><code>true</code></td><td>if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader</td></tr>
<tr><td><code>sql.distsql.flow_stream_timeout</code></td><td>duration</td><td><code>10s</code></td><td>amount of time incoming streams wait for a flow to be set up before erroring out</td></tr>
<tr><td><code>sql.distsql.interleaved_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set we plan interleaved table joins instead of merge joins when possible</td></tr>
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func (dsp *DistSQLPlanner) setupFlows(
); err != nil {
// Vectorization attempt failed with an error.
returnVectorizationSetupError := false
if evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeAlways {
if evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeExperimentalAlways {
returnVectorizationSetupError = true
// If running with VectorizeAlways, this check makes sure that we can
// still run SET statements (mostly to set vectorize to off) and the
// like.
// If running with VectorizeExperimentalAlways, this check makes sure
// that we can still run SET statements (mostly to set vectorize to
// off) and the like.
if len(spec.Processors) == 1 &&
spec.Processors[0].Core.LocalPlanNode != nil {
rsidx := spec.Processors[0].Core.LocalPlanNode.RowSourceIdx
Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv"
"github.com/cockroachdb/cockroach/pkg/sql/exec/vecbuiltins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
semtypes "github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -100,6 +101,7 @@ type newColOperatorResult struct {
outputTypes []types.T
memUsage int
metadataSources []distsqlpb.MetadataSource
isStreaming bool
}

// newColOperator creates a new columnar operator according to the given spec.
Expand All @@ -118,12 +120,18 @@ func newColOperator(
// interface.
var columnTypes []semtypes.T

// By default, we safely assume that an operator is not streaming. Note that
// projections, renders, filters, limits, offsets as well as all internal
// operators (like stats collectors and cancel checkers) are streaming, so in
// order to determine whether the resulting chain of operators is streaming,
// it is sufficient to look only at the "core" operator.
result.isStreaming = false
switch {
case core.Noop != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
}
result.op = exec.NewNoop(inputs[0])
result.op, result.isStreaming = exec.NewNoop(inputs[0]), true
columnTypes = spec.Input[0].ColumnTypes
case core.TableReader != nil:
if err := checkNumIn(inputs, 0); err != nil {
Expand All @@ -146,7 +154,7 @@ func newColOperator(
// However, some of the long-running operators (for example, sorter) are
// still responsible for doing the cancellation check on their own while
// performing long operations.
result.op = exec.NewCancelChecker(result.op)
result.op, result.isStreaming = exec.NewCancelChecker(result.op), true
returnMutations := core.TableReader.Visibility == distsqlpb.ScanVisibility_PUBLIC_AND_NOT_PUBLIC
columnTypes = core.TableReader.Table.ColumnTypesWithMutations(returnMutations)
case core.Aggregator != nil:
Expand Down Expand Up @@ -250,6 +258,7 @@ func newColOperator(
result.op, err = exec.NewOrderedAggregator(
inputs[0], typs, aggFns, aggSpec.GroupCols, aggCols, isScalarAggregate(aggSpec),
)
result.isStreaming = true
}

case core.Distinct != nil:
Expand Down Expand Up @@ -279,13 +288,14 @@ func newColOperator(
return result, err
}
result.op, err = exec.NewOrderedDistinct(inputs[0], core.Distinct.OrderedColumns, typs)
result.isStreaming = true

case core.Ordinality != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
}
columnTypes = append(spec.Input[0].ColumnTypes, *semtypes.Int)
result.op = exec.NewOrdinalityOp(inputs[0])
result.op, result.isStreaming = exec.NewOrdinalityOp(inputs[0]), true

case core.HashJoiner != nil:
if err := checkNumIn(inputs, 2); err != nil {
Expand Down Expand Up @@ -363,6 +373,9 @@ func newColOperator(
}

case core.MergeJoiner != nil:
// TODO(yuzefovich): merge joiner is streaming when both input sources are
// unique. We probably need to propagate that information from the
// optimizer.
if err := checkNumIn(inputs, 2); err != nil {
return result, err
}
Expand Down Expand Up @@ -470,7 +483,7 @@ func newColOperator(
columnTypes = jr.OutputTypes()
return jr, nil
})
result.op = c
result.op, result.isStreaming = c, true
result.metadataSources = append(result.metadataSources, c)

case core.Sorter != nil:
Expand All @@ -494,7 +507,7 @@ func newColOperator(
// exactly how many rows the sorter should output. Choose a top K sorter,
// which uses a heap to avoid storing more rows than necessary.
k := uint16(post.Limit + post.Offset)
result.op = exec.NewTopKSorter(input, inputTypes, orderingCols, k)
result.op, result.isStreaming = exec.NewTopKSorter(input, inputTypes, orderingCols, k), true
} else {
// No optimizations possible. Default to the standard sort operator.
result.op, err = exec.NewSorter(input, inputTypes, orderingCols)
Expand Down Expand Up @@ -536,6 +549,8 @@ func newColOperator(
if len(wf.Ordering.Columns) > 0 {
input, err = exec.NewSorter(input, typs, wf.Ordering.Columns)
}
// TODO(yuzefovich): when both PARTITION BY and ORDER BY clauses are
// omitted, the window function operator is actually streaming.
}
if err != nil {
return result, err
Expand Down Expand Up @@ -1367,6 +1382,10 @@ func (s *vectorizedFlowCreator) setupFlow(
if err != nil {
return errors.Wrapf(err, "unable to vectorize execution plan")
}
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.VectorizeAuto &&
!result.isStreaming {
return errors.Errorf("non-streaming operator encountered when vectorize=auto")
}
if err = acc.Grow(ctx, int64(result.memUsage)); err != nil {
return errors.Wrapf(err, "not enough memory to setup vectorized plan")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
f.ctxDone = ctx.Done()

if f.EvalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
log.VEventf(ctx, 1, "setting up vectorize flow %d with setting %s", f.id, f.EvalCtx.SessionData.VectorizeMode)
log.VEventf(ctx, 1, "setting up vectorize flow %d", f.id)
acc := f.EvalCtx.Mon.MakeBoundAccount()
f.vectorizedBoundAccount = &acc
err := f.setupVectorizedFlow(ctx, f.vectorizedBoundAccount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow_vectorize_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVectorizeSpaceError(t *testing.T) {
EvalCtx: &evalCtx,
}

// Without a limit, the default sorter creates a vectorized operater
// Without a limit, the default sorter creates a vectorized operator
// that we don't know memory usage of statically.
sorterCore := &distsqlpb.SorterSpec{
OutputOrdering: distsqlpb.Ordering{
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,12 @@ var ReorderJoinsLimitClusterValue = settings.RegisterValidatedIntSetting(
var VectorizeClusterMode = settings.RegisterEnumSetting(
"sql.defaults.vectorize",
"default vectorize mode",
"off",
"auto",
map[int64]string{
int64(sessiondata.VectorizeOff): "off",
int64(sessiondata.VectorizeOn): "on",
int64(sessiondata.VectorizeAlways): "always",
int64(sessiondata.VectorizeOff): "off",
int64(sessiondata.VectorizeAuto): "auto",
int64(sessiondata.VectorizeExperimentalOn): "experimental_on",
int64(sessiondata.VectorizeExperimentalAlways): "experimental_always",
},
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,17 @@ var logicTestConfigs = []testClusterConfig{
disableUpgrade: true,
},
{name: "local-opt", numNodes: 1, overrideDistSQLMode: "off", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "local-vec", numNodes: 1, overrideOptimizerMode: "on", overrideVectorize: "on"},
{name: "local-vec", numNodes: 1, overrideOptimizerMode: "on", overrideVectorize: "experimental_on"},
{name: "fakedist", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off"},
{name: "fakedist-opt", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "fakedist-vec", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false", overrideVectorize: "on"},
{name: "fakedist-vec", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false", overrideVectorize: "experimental_on"},
{name: "fakedist-metadata", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off",
distSQLMetadataTestEnabled: true, skipShort: true},
{name: "fakedist-disk", numNodes: 3, useFakeSpanResolver: true, overrideDistSQLMode: "on", overrideOptimizerMode: "off",
distSQLUseDisk: true, skipShort: true},
{name: "5node-local", numNodes: 5, overrideDistSQLMode: "off", overrideOptimizerMode: "off"},
{name: "5node-dist", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "off"},
{name: "5node-dist-vec", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideVectorize: "on", overrideAutoStats: "false"},
{name: "5node-dist-vec", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideVectorize: "experimental_on", overrideAutoStats: "false"},
{name: "5node-dist-opt", numNodes: 5, overrideDistSQLMode: "on", overrideOptimizerMode: "on", overrideAutoStats: "false"},
{name: "5node-dist-metadata", numNodes: 5, overrideDistSQLMode: "on", distSQLMetadataTestEnabled: true,
skipShort: true, overrideOptimizerMode: "off"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/dist_vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJzUl89vm0oQx-_vr0Bzek9ZP9

# Verify execution.
statement ok
SET vectorize = always
SET vectorize = experimental_always

query I rowsort
SELECT kv.k FROM kv JOIN kw ON kv.k = kw.k
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/exec_window
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ INSERT INTO t VALUES
(1, 2, 3)

statement ok
SET vectorize=always
SET vectorize=experimental_always

# Test that non-default window frames are not supported.
statement error window functions with non-default window frames are not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lFGL1DAQx9_9FGGeFFKapq
query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) SELECT k FROM kv WHERE k = 0];
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkD9LxEAQxXs_xfJsR7JptzrsrvHktJMUe9nhCCbZMDNRjyPfXZIFxUK48v3Ztz_mijEnfooDK8IbajSESXLLqllWqxT26QvBE7pxmm21G0KbhRGusM56RsBrPPV85JhYKg9CYotdv81O0g1RLrv3DxBepjhqcJV_qHx1D8JhtuB2NQiSP9UJxxTcuqAW-95ZN3BwXkE4XYx_Cu4RzULIs_0yqcUzI9QL3c59ZJ3yqPwH-b9lvzQETmcut9E8S8vPktvtmyIP27vNSKxW0rqI_ViipVnuvgMAAP__mHp6ww==
https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkDFPwzAUhHd-hXWshjirp65dKCpsKINrPxULJ7b8XiqqKv8dJWYBCYnxvvOdfb5hyoGe3EgM-4Yeg0ap2RNzritqB_bhE9ZoxKnMsuJBw-dKsDdIlESweHWnREdygWpnoBFIXExbbalxdPW6-7hA46W4ia3qzENnuntoHGaxatdD4-TEvxOrPEtZ4Vojc0m_EFMiL_ES5WqVeTQbE5eSkjiSVYYxLBot8v1aFncm2H7R_190JC55Yvox5q9mswwaFM7Ufo3zXD091-y3a5o8bLkNBGJpbt_EfmrWMix3XwEAAP__bYGETQ==
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ transaction_isolation serializable NULL NULL N
transaction_priority normal NULL NULL NULL string
transaction_read_only off NULL NULL NULL string
transaction_status NoTxn NULL NULL NULL string
vectorize off NULL NULL NULL string
vectorize auto NULL NULL NULL string

query TTTTTTT colnames
SELECT
Expand Down Expand Up @@ -1607,7 +1607,7 @@ transaction_isolation serializable NULL user NULL
transaction_priority normal NULL user NULL normal normal
transaction_read_only off NULL user NULL off off
transaction_status NoTxn NULL user NULL NoTxn NoTxn
vectorize off NULL user NULL off off
vectorize auto NULL user NULL auto auto

query TTTTTT colnames
SELECT name, source, min_val, max_val, sourcefile, sourceline FROM pg_catalog.pg_settings
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/set
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,13 @@ statement error invalid value for parameter "distsql": "bogus"
SET distsql = bogus

statement ok
SET vectorize = on
SET vectorize = auto

statement ok
SET vectorize = always
SET vectorize = experimental_on

statement ok
SET vectorize = experimental_always

statement ok
SET vectorize = off
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ transaction_isolation serializable
transaction_priority normal
transaction_read_only off
transaction_status NoTxn
vectorize off
vectorize auto

query T colnames
SELECT * FROM [SHOW CLUSTER SETTING sql.defaults.distsql]
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ SELECT count(*) FROM [EXPLAIN SELECT c.a FROM c JOIN d ON d.b = c.b] WHERE tree
1

statement ok
SET vectorize = always
SET vectorize = experimental_always

# Simple lookup join.
query I
Expand Down Expand Up @@ -282,7 +282,7 @@ statement ok
INSERT INTO e VALUES ('abc'), ('xyz')

statement ok
SET vectorize = always
SET vectorize = experimental_always

query T
SELECT * FROM e WHERE x LIKE ''
Expand Down Expand Up @@ -336,7 +336,7 @@ xyz

# Test that vectorized stats are collected correctly.
statement ok
SET vectorize = on
SET vectorize = experimental_on

statement ok
SET distsql = on
Expand Down Expand Up @@ -464,7 +464,7 @@ statement ok
INSERT INTO t38908 VALUES (1)

statement ok
SET vectorize=always
SET vectorize=experimental_always

query I
SELECT * FROM t38908 WHERE x IN (1, 2)
Expand Down
36 changes: 22 additions & 14 deletions pkg/sql/sessiondata/session_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,22 +254,28 @@ type VectorizeExecMode int64
const (
// VectorizeOff means that columnar execution is disabled.
VectorizeOff VectorizeExecMode = iota
// VectorizeOn means that any supported queries will be run using the columnar
// execution on.
VectorizeOn
// VectorizeAlways means that we attempt to vectorize all queries; unsupported
// queries will fail. Mostly used for testing.
VectorizeAlways
// VectorizeAuto means that that any supported queries that use only
// streaming operators (i.e. those that do not require any buffering) will be
//run using the columnar execution.
VectorizeAuto
// VectorizeExperimentalOn means that any supported queries will be run using
// the columnar execution on.
VectorizeExperimentalOn
// VectorizeExperimentalAlways means that we attempt to vectorize all
// queries; unsupported queries will fail. Mostly used for testing.
VectorizeExperimentalAlways
)

func (m VectorizeExecMode) String() string {
switch m {
case VectorizeOff:
return "off"
case VectorizeOn:
return "on"
case VectorizeAlways:
return "always"
case VectorizeAuto:
return "auto"
case VectorizeExperimentalOn:
return "experimental_on"
case VectorizeExperimentalAlways:
return "experimental_always"
default:
return fmt.Sprintf("invalid (%d)", m)
}
Expand All @@ -282,10 +288,12 @@ func VectorizeExecModeFromString(val string) (VectorizeExecMode, bool) {
switch strings.ToUpper(val) {
case "OFF":
m = VectorizeOff
case "ON":
m = VectorizeOn
case "ALWAYS":
m = VectorizeAlways
case "AUTO":
m = VectorizeAuto
case "EXPERIMENTAL_ON":
m = VectorizeExperimentalOn
case "EXPERIMENTAL_ALWAYS":
m = VectorizeExperimentalAlways
default:
return 0, false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestTrace(t *testing.T) {
if _, err := sqlDB.Exec("SET distsql = off"); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec("SET vectorize = on"); err != nil {
if _, err := sqlDB.Exec("SET vectorize = experimental_on"); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec("SET tracing = on; SELECT * FROM test.foo; SET tracing = off"); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ var varGen = map[string]sessionVar{
Set: func(_ context.Context, m *sessionDataMutator, s string) error {
mode, ok := sessiondata.VectorizeExecModeFromString(s)
if !ok {
return newVarValueError(`vectorize`, s, "off", "on", "always")
return newVarValueError(`vectorize`, s,
"off", "auto", "experimental_on", "experimental_always")
}
m.SetVectorize(mode)
return nil
Expand Down

0 comments on commit fa5f826

Please sign in to comment.