From fda15ca365e170ff049bc3833989ab0352da7396 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Nov 2019 15:19:51 +0100 Subject: [PATCH] sql: make SQL statements operate on a read snapshot Previously, all individual KV reads performed by a SQL statement were able to observe the most recent KV writes that it performed itself. This is in violation of PostgreSQL's dialect semantics, which mandate that statements can only observe data as per a read snapshot taken at the instant a statement begins execution. Moreover, this invalid behavior causes a real observable bug: a statement that reads and writes to the same table may never complete, as the read part may become able to consume the rows that it itself writes. Or worse, it could cause logical operations to be performed multiple times: https://en.wikipedia.org/wiki/Halloween_Problem This patch fixes it (partially) by exploiting the new KV `Step()` API which decouples the read and write sequence numbers. The fix is not complete however; additional sequence points must also be introduced prior to FK existence checks and cascading actions. See [#42864](https://github.com/cockroachdb/cockroach/pull/42864) and [#33475](https://github.com/cockroachdb/cockroach/issues/33475) for details. For now, this patch excludes any mutation that 1) involves a foreign key and 2) does not uyse the new CBO-driven FK logic, from the new (fixed) semantics. When a mutation involves a FK without CBO involvement, the previous (broken) semantics still apply. Release note (bug fix): SQL mutation statements that target tables with no foreign key relationships now correctly read data as per the state of the database when the statement started execution. This is required for compatibility with PostgreSQL and to ensure deterministic behavior when certain operations are parallelized. Prior to this fix, a statement [could incorrectly operate multiple times](https://en.wikipedia.org/wiki/Halloween_Problem) on data that itself was writing, and potentially never terminate. This fix is limited to tables without FK relationships, and for certain operations on tables with FK relationships; in other cases, the fix is not active and the bug is still present. A full fix will be provided in a later release. --- pkg/internal/client/txn.go | 4 +- pkg/sql/alter_index.go | 5 + pkg/sql/alter_sequence.go | 5 + pkg/sql/alter_table.go | 5 + pkg/sql/conn_executor_exec.go | 47 +++++- pkg/sql/conn_executor_test.go | 72 +++++++- pkg/sql/copy.go | 2 +- pkg/sql/create_index.go | 5 + pkg/sql/create_sequence.go | 5 + pkg/sql/create_table.go | 158 ++++++++++-------- pkg/sql/create_view.go | 5 + pkg/sql/distsql_running.go | 7 + pkg/sql/drop_index.go | 5 + pkg/sql/drop_sequence.go | 5 + pkg/sql/drop_table.go | 5 + pkg/sql/drop_view.go | 5 + pkg/sql/grant_revoke.go | 5 + pkg/sql/insert.go | 9 +- pkg/sql/insert_fast_path.go | 10 +- pkg/sql/internal.go | 12 ++ pkg/sql/logictest/logic.go | 2 +- .../testdata/logic_test/statement_source | 4 +- pkg/sql/plan.go | 42 ++++- pkg/sql/rename_column.go | 5 + pkg/sql/rename_database.go | 5 + pkg/sql/rename_index.go | 5 + pkg/sql/rename_table.go | 5 + pkg/sql/row/cascader.go | 46 +++++ pkg/sql/row/fetcher.go | 4 +- pkg/sql/row/fk_existence_delete.go | 22 +++ pkg/sql/row/fk_existence_insert.go | 22 +++ pkg/sql/row/kv_batch_fetcher.go | 15 +- pkg/sql/scan_test.go | 4 +- pkg/sql/set_zone_config.go | 5 + pkg/sql/stats/create_stats_job_test.go | 2 +- pkg/sql/txn_state.go | 2 +- 36 files changed, 468 insertions(+), 98 deletions(-) diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 9d83aa59d74c..c739873343a6 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1189,8 +1189,8 @@ func (txn *Txn) Active() bool { return txn.mu.sender.Active() } -// Step enables step-wise execution in the transaction, or -// performs a step if step-wise execution is already enabled. +// Step performs a sequencing step. Step-wise execution must be +// already enabled. // // In step-wise execution, reads operate at a snapshot established at // the last step, instead of the latest write if not yet enabled. diff --git a/pkg/sql/alter_index.go b/pkg/sql/alter_index.go index 30afb5d6ae75..c69863a76279 100644 --- a/pkg/sql/alter_index.go +++ b/pkg/sql/alter_index.go @@ -44,6 +44,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode, return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterIndexNode) ReadingOwnWrites() {} + func (n *alterIndexNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/alter_sequence.go b/pkg/sql/alter_sequence.go index 3e7f47f9a658..34efdb0b8746 100644 --- a/pkg/sql/alter_sequence.go +++ b/pkg/sql/alter_sequence.go @@ -42,6 +42,11 @@ func (p *planner) AlterSequence(ctx context.Context, n *tree.AlterSequence) (pla return &alterSequenceNode{n: n, seqDesc: seqDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterSequenceNode) ReadingOwnWrites() {} + func (n *alterSequenceNode) startExec(params runParams) error { desc := n.seqDesc diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index d809f6ad3e4f..e41f862c4b1e 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -90,6 +90,11 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode, }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterTableNode) ReadingOwnWrites() {} + func (n *alterTableNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9ace90d37b46..446e5135709d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -359,8 +359,40 @@ func (ex *connExecutor) execStmtInOpenState( discardRows = s.DiscardRows } - // For regular statements (the ones that get to this point), we don't return - // any event unless an an error happens. + // For regular statements (the ones that get to this point), we + // don't return any event unless an error happens. + + // The first order of business is to create a sequencing point. As + // per PostgreSQL's dialect specs, the "read" part of statements + // always see the data as per a snapshot of the database taken the + // instant the statement begins to run. In particular a mutation + // does not see its own writes. If a query contains multiple + // mutations using CTEs (WITH) or a read part following a mutation, + // all still operate on the same read snapshot. + // + // (To communicate data between CTEs and a main query, the result + // set / RETURNING can be used instead. However this is not relevant + // here.) + // + // This is not the only place where a sequencing point is placed. There + // are also sequencing point after every stage of constraint checks + // and cascading actions at the _end_ of a statement's execution. + // + // TODO(knz): At the time of this writing CockroachDB performs + // cascading actions and the corresponding FK existence checks + // interleaved with mutations. This is incorrect; the correct + // behavior, as described in issue + // https://github.com/cockroachdb/cockroach/issues/33475, is to + // execute cascading actions no earlier than after all the "main + // effects" of the current statement (including all its CTEs) have + // completed. There should be a sequence point between the end of + // the main execution and the start of the cascading actions, as + // well as in-between very stage of cascading actions. + // This TODO can be removed when the cascading code is reorganized + // accordingly and the missing call to Step() is introduced. + if err := ex.state.mu.txn.Step(); err != nil { + return makeErrEvent(err) + } p := &ex.planner stmtTS := ex.server.cfg.Clock.PhysicalTime() @@ -423,6 +455,15 @@ func (ex *connExecutor) execStmtInOpenState( } txn := ex.state.mu.txn + + // Re-enable stepping mode after the execution has completed. + // This is necessary because until https://github.com/cockroachdb/cockroach/issues/33475 is fixed + // any mutation with FK work unconditionally disables + // stepping mode when it starts. + if _, err := txn.ConfigureStepping(client.SteppingEnabled); err != nil { + return makeErrEvent(err) + } + if !os.ImplicitTxn.Get() && txn.IsSerializablePushAndRefreshNotPossible() { rc, canAutoRetry := ex.getRewindTxnCapability() if canAutoRetry { @@ -553,7 +594,7 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error // Create a new transaction to retry with a higher timestamp than the // timestamps used in the retry loop above. - ex.state.mu.txn = client.NewTxn(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) + ex.state.mu.txn = client.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) if err := ex.state.mu.txn.SetUserPriority(userPriority); err != nil { return err } diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index d21f162ffc60..9764c05ddefe 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -91,7 +92,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs.SQLExecutor = aborter.executorKnobs() s, mainDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.TODO()) + defer s.Stopper().Stop(context.Background()) { pgURL, cleanup := sqlutils.PGUrl( t, s.ServingSQLAddr(), "TestSessionFinishRollsBackTxn", url.User(security.RootUser)) @@ -348,6 +349,75 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); } } +func TestHalloweenProblemAvoidance(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Populate a sufficiently large number of rows. We want at least as + // many rows as an insert can batch in its output buffer (to force a + // buffer flush), plus as many rows as a fetcher can fetch at once + // (to force a read buffer update), plus some more. + // + // Instead of customizing the working set size of the test up to the + // default settings for the SQL package, we scale down the config + // of the SQL package to the test. The reason for this is that + // race-enable builds are very slow and the default batch sizes + // would cause the test duration to exceed the timeout. + // + // We are also careful to override these defaults before starting + // the server, so as to not risk updating them concurrently with + // some background SQL activity. + const smallerKvBatchSize = 10 + defer row.TestingSetKVBatchSize(smallerKvBatchSize)() + const smallerInsertBatchSize = 5 + defer sql.TestingSetInsertBatchSize(smallerInsertBatchSize)() + numRows := smallerKvBatchSize + smallerInsertBatchSize + 10 + + params, _ := tests.CreateTestServerParams() + params.Insecure = true + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.TODO()) + + if _, err := db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (x FLOAT); +`); err != nil { + t.Fatal(err) + } + + if _, err := db.Exec( + `INSERT INTO t.test(x) SELECT generate_series(1, $1)::FLOAT`, + numRows); err != nil { + t.Fatal(err) + } + + // Now slightly modify the values in duplicate rows. + // We choose a float +0.1 to ensure that none of the derived + // values become duplicate of already-present values. + if _, err := db.Exec(` +INSERT INTO t.test(x) + -- the if ensures that no row is processed two times. +SELECT IF(x::INT::FLOAT = x, + x, + crdb_internal.force_error( + 'NOOPE', 'insert saw its own writes: ' || x::STRING || ' (it is halloween today)')::FLOAT) + + 0.1 + FROM t.test +`); err != nil { + t.Fatal(err) + } + + // Finally verify that no rows has been operated on more than once. + row := db.QueryRow(`SELECT count(DISTINCT x) FROM t.test`) + var cnt int + if err := row.Scan(&cnt); err != nil { + t.Fatal(err) + } + + if cnt != 2*numRows { + t.Fatalf("expected %d rows in final table, got %d", 2*numRows, cnt) + } +} + func TestAppNameStatisticsInitialization(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index e5ac19f140f2..97a1b16937f3 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -292,7 +292,7 @@ func (c *copyMachine) preparePlanner(ctx context.Context) func(context.Context, stmtTs := c.txnOpt.stmtTimestamp autoCommit := false if txn == nil { - txn = client.NewTxn(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) + txn = client.NewTxnWithSteppingEnabled(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) txnTs = c.p.execCfg.Clock.PhysicalTime() stmtTs = txnTs autoCommit = true diff --git a/pkg/sql/create_index.go b/pkg/sql/create_index.go index 3282f05ba6b0..4e2079b49802 100644 --- a/pkg/sql/create_index.go +++ b/pkg/sql/create_index.go @@ -79,6 +79,11 @@ func MakeIndexDescriptor(n *tree.CreateIndex) (*sqlbase.IndexDescriptor, error) return &indexDesc, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createIndexNode) ReadingOwnWrites() {} + func (n *createIndexNode) startExec(params runParams) error { _, dropped, err := n.tableDesc.FindIndexByName(string(n.n.Name)) if err == nil { diff --git a/pkg/sql/create_sequence.go b/pkg/sql/create_sequence.go index 0c68019a40db..d19212b8abb5 100644 --- a/pkg/sql/create_sequence.go +++ b/pkg/sql/create_sequence.go @@ -44,6 +44,11 @@ func (p *planner) CreateSequence(ctx context.Context, n *tree.CreateSequence) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createSequenceNode) ReadingOwnWrites() {} + func (n *createSequenceNode) startExec(params runParams) error { // TODO(arul): Allow temporary sequences once temp tables work for regular tables. if n.n.Temporary { diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 6e0f4bdfab9d..4bbab68136bb 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -105,6 +105,11 @@ var storageParamExpectedTypes = map[string]storageParamType{ `user_catalog_table`: storageParamUnimplemented, } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createTableNode) ReadingOwnWrites() {} + func (n *createTableNode) startExec(params runParams) error { isTemporary := n.n.Temporary @@ -265,96 +270,108 @@ func (n *createTableNode) startExec(params runParams) error { // If we are in an explicit txn or the source has placeholders, we execute the // CTAS query synchronously. if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit { - // This is a very simplified version of the INSERT logic: no CHECK - // expressions, no FK checks, no arbitrary insertion order, no - // RETURNING, etc. - - // Instantiate a row inserter and table writer. It has a 1-1 - // mapping to the definitions in the descriptor. - ri, err := row.MakeInserter( - params.p.txn, - sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), - desc.Columns, - row.SkipFKs, - nil, /* fkTables */ - ¶ms.p.alloc) - if err != nil { - return err - } - ti := tableInserterPool.Get().(*tableInserter) - *ti = tableInserter{ri: ri} - tw := tableWriter(ti) - if n.run.autoCommit == autoCommitEnabled { - tw.enableAutoCommit() - } - defer func() { - tw.close(params.ctx) - *ti = tableInserter{} - tableInserterPool.Put(ti) - }() - if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { - return err - } - - // Prepare the buffer for row values. At this point, one more column has - // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if - // a PRIMARY KEY is not specified by the user. - rowBuffer := make(tree.Datums, len(desc.Columns)) - pkColIdx := len(desc.Columns) - 1 - - // The optimizer includes the rowID expression as part of the input - // expression. But the heuristic planner does not do this, so construct - // a rowID expression to be evaluated separately. - var defTypedExpr tree.TypedExpr - if n.run.synthRowID { - // Prepare the rowID expression. - defExprSQL := *desc.Columns[pkColIdx].DefaultExpr - defExpr, err := parser.ParseExpr(defExprSQL) + if func() error { + // The data fill portion of CREATE AS must operate on a read snapshot, + // so that it doesn't end up observing its own writes. + prevMode, err := params.p.Txn().ConfigureStepping(client.SteppingEnabled) if err != nil { return err } - defTypedExpr, err = params.p.analyzeExpr( - params.ctx, - defExpr, - nil, /*sources*/ - tree.IndexedVarHelper{}, - types.Any, - false, /*requireType*/ - "CREATE TABLE AS") + defer func() { _, _ = params.p.Txn().ConfigureStepping(prevMode) }() + + // This is a very simplified version of the INSERT logic: no CHECK + // expressions, no FK checks, no arbitrary insertion order, no + // RETURNING, etc. + + // Instantiate a row inserter and table writer. It has a 1-1 + // mapping to the definitions in the descriptor. + ri, err := row.MakeInserter( + params.p.txn, + sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), + desc.Columns, + row.SkipFKs, + nil, /* fkTables */ + ¶ms.p.alloc) if err != nil { return err } - } - - for { - if err := params.p.cancelChecker.Check(); err != nil { + ti := tableInserterPool.Get().(*tableInserter) + *ti = tableInserter{ri: ri} + tw := tableWriter(ti) + if n.run.autoCommit == autoCommitEnabled { + tw.enableAutoCommit() + } + defer func() { + tw.close(params.ctx) + *ti = tableInserter{} + tableInserterPool.Put(ti) + }() + if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { return err } - if next, err := n.sourcePlan.Next(params); !next { + + // Prepare the buffer for row values. At this point, one more column has + // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if + // a PRIMARY KEY is not specified by the user. + rowBuffer := make(tree.Datums, len(desc.Columns)) + pkColIdx := len(desc.Columns) - 1 + + // The optimizer includes the rowID expression as part of the input + // expression. But the heuristic planner does not do this, so construct + // a rowID expression to be evaluated separately. + var defTypedExpr tree.TypedExpr + if n.run.synthRowID { + // Prepare the rowID expression. + defExprSQL := *desc.Columns[pkColIdx].DefaultExpr + defExpr, err := parser.ParseExpr(defExprSQL) if err != nil { return err } - _, err := tw.finalize( - params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + defTypedExpr, err = params.p.analyzeExpr( + params.ctx, + defExpr, + nil, /*sources*/ + tree.IndexedVarHelper{}, + types.Any, + false, /*requireType*/ + "CREATE TABLE AS") if err != nil { return err } - break } - // Populate the buffer and generate the PK value. - copy(rowBuffer, n.sourcePlan.Values()) - if n.run.synthRowID { - rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) - if err != nil { + for { + if err := params.p.cancelChecker.Check(); err != nil { return err } - } + if next, err := n.sourcePlan.Next(params); !next { + if err != nil { + return err + } + _, err := tw.finalize( + params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + if err != nil { + return err + } + break + } - err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err + // Populate the buffer and generate the PK value. + copy(rowBuffer, n.sourcePlan.Values()) + if n.run.synthRowID { + rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) + if err != nil { + return err + } + } + + if err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil { + return err + } } + return nil + }(); err != nil { + return err } } @@ -1364,6 +1381,7 @@ func MakeTableDesc( return desc, errors.Errorf("unsupported table def: %T", def) } } + // Now that we have all the other columns set up, we can validate // any computed columns. for _, def := range n.Defs { diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index 435984c79555..bb424cfbbe0f 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -37,6 +37,11 @@ type createViewNode struct { planDeps planDependencies } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createViewNode) ReadingOwnWrites() {} + func (n *createViewNode) startExec(params runParams) error { // TODO(arul): Allow temporary views once temp tables work for regular tables. if n.temporary { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 025a8cf0fe4c..d41f19f5301f 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -987,6 +987,13 @@ func (dsp *DistSQLPlanner) PlanAndRunPostqueries( maybeDistribute bool, ) bool { for _, postqueryPlan := range postqueryPlans { + // We place a sequence point before every postquery, so + // that each subsequent postquery can observe the writes + // by the previous step. + if err := planner.Txn().Step(); err != nil { + recv.SetError(err) + return false + } if err := dsp.planAndRunPostquery( ctx, postqueryPlan, diff --git a/pkg/sql/drop_index.go b/pkg/sql/drop_index.go index d8c62e714a36..a69e22fdb854 100644 --- a/pkg/sql/drop_index.go +++ b/pkg/sql/drop_index.go @@ -59,6 +59,11 @@ func (p *planner) DropIndex(ctx context.Context, n *tree.DropIndex) (planNode, e return &dropIndexNode{n: n, idxNames: idxNames}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropIndexNode) ReadingOwnWrites() {} + func (n *dropIndexNode) startExec(params runParams) error { ctx := params.ctx for _, index := range n.idxNames { diff --git a/pkg/sql/drop_sequence.go b/pkg/sql/drop_sequence.go index bf047c881a78..fd7760bda4e7 100644 --- a/pkg/sql/drop_sequence.go +++ b/pkg/sql/drop_sequence.go @@ -55,6 +55,11 @@ func (p *planner) DropSequence(ctx context.Context, n *tree.DropSequence) (planN }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropSequenceNode) ReadingOwnWrites() {} + func (n *dropSequenceNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 0ad565c3a336..8c1efbc45128 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -97,6 +97,11 @@ func (p *planner) DropTable(ctx context.Context, n *tree.DropTable) (planNode, e return &dropTableNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropTableNode) ReadingOwnWrites() {} + func (n *dropTableNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_view.go b/pkg/sql/drop_view.go index 099ec9fc64f1..85d4af2e028e 100644 --- a/pkg/sql/drop_view.go +++ b/pkg/sql/drop_view.go @@ -69,6 +69,11 @@ func (p *planner) DropView(ctx context.Context, n *tree.DropView) (planNode, err return &dropViewNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropViewNode) ReadingOwnWrites() {} + func (n *dropViewNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 778fde0d88fa..396de948d427 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -63,6 +63,11 @@ type changePrivilegesNode struct { changePrivilege func(*sqlbase.PrivilegeDescriptor, string) } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because GRANT/REVOKE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *changePrivilegesNode) ReadingOwnWrites() {} + func (n *changePrivilegesNode) startExec(params runParams) error { ctx := params.ctx p := params.p diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 2a4d0b1f0d9b..e42beeba92bf 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -177,7 +177,7 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro // the insert operation (including secondary index updates, FK // cascading updates, etc), before the current KV batch is executed // and a new batch is started. -const maxInsertBatchSize = 10000 +var maxInsertBatchSize = 10000 func (n *insertNode) startExec(params runParams) error { if err := params.p.maybeSetSystemConfig(n.run.ti.tableDesc().GetID()); err != nil { @@ -294,3 +294,10 @@ func (n *insertNode) Close(ctx context.Context) { func (n *insertNode) enableAutoCommit() { n.run.ti.enableAutoCommit() } + +// TestingSetInsertBatchSize exports a constant for testing only. +func TestingSetInsertBatchSize(val int) func() { + oldVal := maxInsertBatchSize + maxInsertBatchSize = val + return func() { maxInsertBatchSize = oldVal } +} diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index ff2ec5de3de7..0d57933f211e 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -31,9 +31,13 @@ var insertFastPathNodePool = sync.Pool{ }, } -// Check that exec.InsertFastPathMaxRows does not exceed maxInsertBatchSize -// (this is a compile error if the value is negative). -const _ uint = maxInsertBatchSize - exec.InsertFastPathMaxRows +// Check that exec.InsertFastPathMaxRows does not exceed the default +// maxInsertBatchSize. +func init() { + if maxInsertBatchSize < exec.InsertFastPathMaxRows { + panic("decrease exec.InsertFastPathMaxRows") + } +} // insertFastPathNode is a faster implementation of inserting values in a table // and performing FK checks. It is used when all the foreign key checks can be diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index ed39c6f49faa..4f59640d545f 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -481,6 +481,18 @@ func (ie *internalExecutorImpl) execInternal( ctx, sp := tracing.EnsureChildSpan(ctx, ie.s.cfg.AmbientCtx.Tracer, opName) defer sp.Finish() + // The client.Txn handed to this executor may currently have + // disabled stepping mode. However, any and all SQL expects + // stepping mode to be enabled. We'll enable it here and restore + // what we had upon exit. + if txn != nil { + prevStepping, err := txn.ConfigureStepping(client.SteppingEnabled) + if err != nil { + return result{}, err + } + defer func() { _, _ = txn.ConfigureStepping(prevStepping) }() + } + timeReceived := timeutil.Now() parseStart := timeReceived parsed, err := parser.ParseOne(stmt) diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 301d667ffe04..dafdee90daf2 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1764,7 +1764,7 @@ func (t *logicTest) processSubtest( return errors.Errorf("kv-batch-size needs an integer argument; %s", err) } t.outf("Setting kv batch size %d", batchSize) - defer row.SetKVBatchSize(int64(batchSize))() + defer row.TestingSetKVBatchSize(int64(batchSize))() default: return errors.Errorf("%s:%d: unknown command: %s", diff --git a/pkg/sql/logictest/testdata/logic_test/statement_source b/pkg/sql/logictest/testdata/logic_test/statement_source index 04ac9c3a1dfb..c26c7c210f14 100644 --- a/pkg/sql/logictest/testdata/logic_test/statement_source +++ b/pkg/sql/logictest/testdata/logic_test/statement_source @@ -72,10 +72,10 @@ SELECT * FROM a ORDER BY b # Regression for #30936: ensure that wrapped planNodes with non-needed columns work ok statement ok -CREATE TABLE b (a int, b int) +CREATE TABLE b (a int, b int); query II -SELECT * FROM b WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); +SELECT * FROM (VALUES (1, 2)) WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); ---- 1 2 diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 6455d659c7cd..a21e2f346e1e 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -13,12 +13,14 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" ) // runParams is a struct containing all parameters passed to planNode.Next() and @@ -130,6 +132,19 @@ type planNodeFastPath interface { FastPathResults() (int, bool) } +// planNodeReadingOwnWrites can be implemented by planNodes which do +// not use the standard SQL principle of reading at the snapshot +// established at the start of the transaction. This is the case +// e.g. for most DDL statements that perform multiple KV operations on +// descriptors, expecting to read their own writes. +// +// This constraint is obeyed by (*planner).startExec(). +type planNodeReadingOwnWrites interface { + // ReadingOwnWrites can be implemented as no-op by nodes wishing + // to request the semantics described above. + ReadingOwnWrites() +} + var _ planNode = &alterIndexNode{} var _ planNode = &alterSequenceNode{} var _ planNode = &alterTableNode{} @@ -207,6 +222,16 @@ var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} var _ planNodeFastPath = &controlJobsNode{} +var _ planNodeReadingOwnWrites = &alterIndexNode{} +var _ planNodeReadingOwnWrites = &alterSequenceNode{} +var _ planNodeReadingOwnWrites = &alterTableNode{} +var _ planNodeReadingOwnWrites = &createIndexNode{} +var _ planNodeReadingOwnWrites = &createSequenceNode{} +var _ planNodeReadingOwnWrites = &createTableNode{} +var _ planNodeReadingOwnWrites = &createViewNode{} +var _ planNodeReadingOwnWrites = &changePrivilegesNode{} +var _ planNodeReadingOwnWrites = &setZoneConfigNode{} + // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results // spooled) during the start phase. This is currently implemented by @@ -318,6 +343,11 @@ func (p *planTop) close(ctx context.Context) { // startExec calls startExec() on each planNode using a depth-first, post-order // traversal. The subqueries, if any, are also started. // +// If the planNode also implements the nodeReadingOwnWrites interface, +// the txn is temporarily reconfigured to use read-your-own-writes for +// the duration of the call to startExec. This is used e.g. by +// DDL statements. +// // Reminder: walkPlan() ensures that subqueries and sub-plans are // started before startExec() is called. func startExec(params runParams, plan planNode) error { @@ -333,7 +363,17 @@ func startExec(params runParams, plan planNode) error { } return true, nil }, - leaveNode: func(_ string, n planNode) error { + leaveNode: func(_ string, n planNode) (err error) { + if _, ok := n.(planNodeReadingOwnWrites); ok { + _, err = params.p.Txn().ConfigureStepping(client.SteppingDisabled) + if err != nil { + return err + } + defer func() { + _, thisErr := params.p.Txn().ConfigureStepping(client.SteppingEnabled) + err = errors.CombineErrors(err, thisErr) + }() + } return n.startExec(params) }, } diff --git a/pkg/sql/rename_column.go b/pkg/sql/rename_column.go index 61d177a2d055..cbeeb842d4d5 100644 --- a/pkg/sql/rename_column.go +++ b/pkg/sql/rename_column.go @@ -50,6 +50,11 @@ func (p *planner) RenameColumn(ctx context.Context, n *tree.RenameColumn) (planN return &renameColumnNode{n: n, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME COLUMN performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameColumnNode) ReadingOwnWrites() {} + func (n *renameColumnNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_database.go b/pkg/sql/rename_database.go index d41195a7d4f3..1fb6aa382b30 100644 --- a/pkg/sql/rename_database.go +++ b/pkg/sql/rename_database.go @@ -63,6 +63,11 @@ func (p *planner) RenameDatabase(ctx context.Context, n *tree.RenameDatabase) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameDatabaseNode) ReadingOwnWrites() {} + func (n *renameDatabaseNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_index.go b/pkg/sql/rename_index.go index ae6b1f389394..afd4f4398ab3 100644 --- a/pkg/sql/rename_index.go +++ b/pkg/sql/rename_index.go @@ -60,6 +60,11 @@ func (p *planner) RenameIndex(ctx context.Context, n *tree.RenameIndex) (planNod return &renameIndexNode{n: n, idx: idx, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameIndexNode) ReadingOwnWrites() {} + func (n *renameIndexNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_table.go b/pkg/sql/rename_table.go index f26b451a1efa..5bd06c7f8530 100644 --- a/pkg/sql/rename_table.go +++ b/pkg/sql/rename_table.go @@ -71,6 +71,11 @@ func (p *planner) RenameTable(ctx context.Context, n *tree.RenameTable) (planNod return &renameTableNode{n: n, oldTn: &oldTn, newTn: &newTn, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameTableNode) ReadingOwnWrites() {} + func (n *renameTableNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/row/cascader.go b/pkg/sql/row/cascader.go index 2d011f47ed2d..3a85b3f4011f 100644 --- a/pkg/sql/row/cascader.go +++ b/pkg/sql/row/cascader.go @@ -86,6 +86,29 @@ func makeDeleteCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return nil, err + } + return &cascader{ txn: txn, fkTables: tablesByID, @@ -155,6 +178,29 @@ func makeUpdateCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return nil, err + } + return &cascader{ txn: txn, fkTables: tablesByID, diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index c6a7fda7d22d..b916f826804d 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -503,7 +503,7 @@ func (rf *Fetcher) StartInconsistentScan( maxTimestampAge, ) } - txn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn := client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp) @@ -518,7 +518,7 @@ func (rf *Fetcher) StartInconsistentScan( // Advance the timestamp by the time that passed. txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */) txnStartTime = now - txn = client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn = client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { diff --git a/pkg/sql/row/fk_existence_delete.go b/pkg/sql/row/fk_existence_delete.go index d88ea885c8ba..1fc76efd0627 100644 --- a/pkg/sql/row/fk_existence_delete.go +++ b/pkg/sql/row/fk_existence_delete.go @@ -96,6 +96,28 @@ func makeFkExistenceCheckHelperForDelete( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return h, err + } + } + return h, nil } diff --git a/pkg/sql/row/fk_existence_insert.go b/pkg/sql/row/fk_existence_insert.go index 11f975e50fe1..cdc9d60eca1b 100644 --- a/pkg/sql/row/fk_existence_insert.go +++ b/pkg/sql/row/fk_existence_insert.go @@ -87,6 +87,28 @@ func makeFkExistenceCheckHelperForInsert( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return h, err + } + } + return h, nil } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 5897ad0d9e19..05d9bc8bbf0d 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -28,13 +28,6 @@ import ( // TODO(radu): parameters like this should be configurable var kvBatchSize int64 = 10000 -// SetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. -func SetKVBatchSize(val int64) func() { - oldVal := kvBatchSize - kvBatchSize = val - return func() { kvBatchSize = oldVal } -} - // sendFunc is the function used to execute a KV batch; normally // wraps (*client.Txn).Send. type sendFunc func( @@ -352,3 +345,11 @@ func (f *txnKVFetcher) nextBatch( } return f.nextBatch(ctx) } + +// TestingSetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. +// This is to be used only in tests - we have no test coverage for arbitrary kv batch sizes at this time. +func TestingSetKVBatchSize(val int64) func() { + oldVal := kvBatchSize + kvBatchSize = val + return func() { kvBatchSize = oldVal } +} diff --git a/pkg/sql/scan_test.go b/pkg/sql/scan_test.go index 261a0002c844..243d50aa207a 100644 --- a/pkg/sql/scan_test.go +++ b/pkg/sql/scan_test.go @@ -125,7 +125,7 @@ func TestScanBatches(t *testing.T) { defer leaktest.AfterTest(t)() // The test will screw around with KVBatchSize; make sure to restore it at the end. - restore := row.SetKVBatchSize(10) + restore := row.TestingSetKVBatchSize(10) defer restore() s, db, _ := serverutils.StartServer( @@ -171,7 +171,7 @@ func TestScanBatches(t *testing.T) { numSpanValues := []int{0, 1, 2, 3} for _, batch := range batchSizes { - row.SetKVBatchSize(int64(batch)) + row.TestingSetKVBatchSize(int64(batch)) for _, numSpans := range numSpanValues { testScanBatchQuery(t, db, numSpans, numAs, numBs, false) testScanBatchQuery(t, db, numSpans, numAs, numBs, true) diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index 385ab386100f..0eb662a01278 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -201,6 +201,11 @@ type setZoneConfigRun struct { numAffected int } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CONFIGURE ZONE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *setZoneConfigNode) ReadingOwnWrites() {} + func (n *setZoneConfigNode) startExec(params runParams) error { var yamlConfig string var setters []func(c *zonepb.ZoneConfig) diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 329bee195896..ebdc007e31e4 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -467,7 +467,7 @@ func TestCreateStatsProgress(t *testing.T) { }(rowexec.SamplerProgressInterval) rowexec.SamplerProgressInterval = 10 - resetKVBatchSize := row.SetKVBatchSize(10) + resetKVBatchSize := row.TestingSetKVBatchSize(10) defer resetKVBatchSize() var allowRequest chan struct{} diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 059db0425b4f..bf5c768b84a6 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -203,7 +203,7 @@ func (ts *txnState) resetForNewSQLTxn( ts.mon.Start(ts.Ctx, tranCtx.connMon, mon.BoundAccount{} /* reserved */) ts.mu.Lock() if txn == nil { - ts.mu.txn = client.NewTxn(ts.Ctx, tranCtx.db, tranCtx.nodeID) + ts.mu.txn = client.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeID) ts.mu.txn.SetDebugName(opName) } else { ts.mu.txn = txn