diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 9d83aa59d74c..c739873343a6 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1189,8 +1189,8 @@ func (txn *Txn) Active() bool { return txn.mu.sender.Active() } -// Step enables step-wise execution in the transaction, or -// performs a step if step-wise execution is already enabled. +// Step performs a sequencing step. Step-wise execution must be +// already enabled. // // In step-wise execution, reads operate at a snapshot established at // the last step, instead of the latest write if not yet enabled. diff --git a/pkg/sql/alter_index.go b/pkg/sql/alter_index.go index 30afb5d6ae75..c69863a76279 100644 --- a/pkg/sql/alter_index.go +++ b/pkg/sql/alter_index.go @@ -44,6 +44,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode, return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterIndexNode) ReadingOwnWrites() {} + func (n *alterIndexNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/alter_sequence.go b/pkg/sql/alter_sequence.go index 3e7f47f9a658..34efdb0b8746 100644 --- a/pkg/sql/alter_sequence.go +++ b/pkg/sql/alter_sequence.go @@ -42,6 +42,11 @@ func (p *planner) AlterSequence(ctx context.Context, n *tree.AlterSequence) (pla return &alterSequenceNode{n: n, seqDesc: seqDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterSequenceNode) ReadingOwnWrites() {} + func (n *alterSequenceNode) startExec(params runParams) error { desc := n.seqDesc diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index d809f6ad3e4f..e41f862c4b1e 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -90,6 +90,11 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode, }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because ALTER TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *alterTableNode) ReadingOwnWrites() {} + func (n *alterTableNode) startExec(params runParams) error { // Commands can either change the descriptor directly (for // alterations that don't require a backfill) or add a mutation to diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9ace90d37b46..446e5135709d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -359,8 +359,40 @@ func (ex *connExecutor) execStmtInOpenState( discardRows = s.DiscardRows } - // For regular statements (the ones that get to this point), we don't return - // any event unless an an error happens. + // For regular statements (the ones that get to this point), we + // don't return any event unless an error happens. + + // The first order of business is to create a sequencing point. As + // per PostgreSQL's dialect specs, the "read" part of statements + // always see the data as per a snapshot of the database taken the + // instant the statement begins to run. In particular a mutation + // does not see its own writes. If a query contains multiple + // mutations using CTEs (WITH) or a read part following a mutation, + // all still operate on the same read snapshot. + // + // (To communicate data between CTEs and a main query, the result + // set / RETURNING can be used instead. However this is not relevant + // here.) + // + // This is not the only place where a sequencing point is placed. There + // are also sequencing point after every stage of constraint checks + // and cascading actions at the _end_ of a statement's execution. + // + // TODO(knz): At the time of this writing CockroachDB performs + // cascading actions and the corresponding FK existence checks + // interleaved with mutations. This is incorrect; the correct + // behavior, as described in issue + // https://github.com/cockroachdb/cockroach/issues/33475, is to + // execute cascading actions no earlier than after all the "main + // effects" of the current statement (including all its CTEs) have + // completed. There should be a sequence point between the end of + // the main execution and the start of the cascading actions, as + // well as in-between very stage of cascading actions. + // This TODO can be removed when the cascading code is reorganized + // accordingly and the missing call to Step() is introduced. + if err := ex.state.mu.txn.Step(); err != nil { + return makeErrEvent(err) + } p := &ex.planner stmtTS := ex.server.cfg.Clock.PhysicalTime() @@ -423,6 +455,15 @@ func (ex *connExecutor) execStmtInOpenState( } txn := ex.state.mu.txn + + // Re-enable stepping mode after the execution has completed. + // This is necessary because until https://github.com/cockroachdb/cockroach/issues/33475 is fixed + // any mutation with FK work unconditionally disables + // stepping mode when it starts. + if _, err := txn.ConfigureStepping(client.SteppingEnabled); err != nil { + return makeErrEvent(err) + } + if !os.ImplicitTxn.Get() && txn.IsSerializablePushAndRefreshNotPossible() { rc, canAutoRetry := ex.getRewindTxnCapability() if canAutoRetry { @@ -553,7 +594,7 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error // Create a new transaction to retry with a higher timestamp than the // timestamps used in the retry loop above. - ex.state.mu.txn = client.NewTxn(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) + ex.state.mu.txn = client.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID) if err := ex.state.mu.txn.SetUserPriority(userPriority); err != nil { return err } diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index d21f162ffc60..9764c05ddefe 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -91,7 +92,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs.SQLExecutor = aborter.executorKnobs() s, mainDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.TODO()) + defer s.Stopper().Stop(context.Background()) { pgURL, cleanup := sqlutils.PGUrl( t, s.ServingSQLAddr(), "TestSessionFinishRollsBackTxn", url.User(security.RootUser)) @@ -348,6 +349,75 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT); } } +func TestHalloweenProblemAvoidance(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Populate a sufficiently large number of rows. We want at least as + // many rows as an insert can batch in its output buffer (to force a + // buffer flush), plus as many rows as a fetcher can fetch at once + // (to force a read buffer update), plus some more. + // + // Instead of customizing the working set size of the test up to the + // default settings for the SQL package, we scale down the config + // of the SQL package to the test. The reason for this is that + // race-enable builds are very slow and the default batch sizes + // would cause the test duration to exceed the timeout. + // + // We are also careful to override these defaults before starting + // the server, so as to not risk updating them concurrently with + // some background SQL activity. + const smallerKvBatchSize = 10 + defer row.TestingSetKVBatchSize(smallerKvBatchSize)() + const smallerInsertBatchSize = 5 + defer sql.TestingSetInsertBatchSize(smallerInsertBatchSize)() + numRows := smallerKvBatchSize + smallerInsertBatchSize + 10 + + params, _ := tests.CreateTestServerParams() + params.Insecure = true + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.TODO()) + + if _, err := db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (x FLOAT); +`); err != nil { + t.Fatal(err) + } + + if _, err := db.Exec( + `INSERT INTO t.test(x) SELECT generate_series(1, $1)::FLOAT`, + numRows); err != nil { + t.Fatal(err) + } + + // Now slightly modify the values in duplicate rows. + // We choose a float +0.1 to ensure that none of the derived + // values become duplicate of already-present values. + if _, err := db.Exec(` +INSERT INTO t.test(x) + -- the if ensures that no row is processed two times. +SELECT IF(x::INT::FLOAT = x, + x, + crdb_internal.force_error( + 'NOOPE', 'insert saw its own writes: ' || x::STRING || ' (it is halloween today)')::FLOAT) + + 0.1 + FROM t.test +`); err != nil { + t.Fatal(err) + } + + // Finally verify that no rows has been operated on more than once. + row := db.QueryRow(`SELECT count(DISTINCT x) FROM t.test`) + var cnt int + if err := row.Scan(&cnt); err != nil { + t.Fatal(err) + } + + if cnt != 2*numRows { + t.Fatalf("expected %d rows in final table, got %d", 2*numRows, cnt) + } +} + func TestAppNameStatisticsInitialization(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index e5ac19f140f2..97a1b16937f3 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -292,7 +292,7 @@ func (c *copyMachine) preparePlanner(ctx context.Context) func(context.Context, stmtTs := c.txnOpt.stmtTimestamp autoCommit := false if txn == nil { - txn = client.NewTxn(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) + txn = client.NewTxnWithSteppingEnabled(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get()) txnTs = c.p.execCfg.Clock.PhysicalTime() stmtTs = txnTs autoCommit = true diff --git a/pkg/sql/create_index.go b/pkg/sql/create_index.go index 3282f05ba6b0..4e2079b49802 100644 --- a/pkg/sql/create_index.go +++ b/pkg/sql/create_index.go @@ -79,6 +79,11 @@ func MakeIndexDescriptor(n *tree.CreateIndex) (*sqlbase.IndexDescriptor, error) return &indexDesc, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createIndexNode) ReadingOwnWrites() {} + func (n *createIndexNode) startExec(params runParams) error { _, dropped, err := n.tableDesc.FindIndexByName(string(n.n.Name)) if err == nil { diff --git a/pkg/sql/create_sequence.go b/pkg/sql/create_sequence.go index 0c68019a40db..d19212b8abb5 100644 --- a/pkg/sql/create_sequence.go +++ b/pkg/sql/create_sequence.go @@ -44,6 +44,11 @@ func (p *planner) CreateSequence(ctx context.Context, n *tree.CreateSequence) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createSequenceNode) ReadingOwnWrites() {} + func (n *createSequenceNode) startExec(params runParams) error { // TODO(arul): Allow temporary sequences once temp tables work for regular tables. if n.n.Temporary { diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 6e0f4bdfab9d..4bbab68136bb 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -105,6 +105,11 @@ var storageParamExpectedTypes = map[string]storageParamType{ `user_catalog_table`: storageParamUnimplemented, } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createTableNode) ReadingOwnWrites() {} + func (n *createTableNode) startExec(params runParams) error { isTemporary := n.n.Temporary @@ -265,96 +270,108 @@ func (n *createTableNode) startExec(params runParams) error { // If we are in an explicit txn or the source has placeholders, we execute the // CTAS query synchronously. if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit { - // This is a very simplified version of the INSERT logic: no CHECK - // expressions, no FK checks, no arbitrary insertion order, no - // RETURNING, etc. - - // Instantiate a row inserter and table writer. It has a 1-1 - // mapping to the definitions in the descriptor. - ri, err := row.MakeInserter( - params.p.txn, - sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), - desc.Columns, - row.SkipFKs, - nil, /* fkTables */ - ¶ms.p.alloc) - if err != nil { - return err - } - ti := tableInserterPool.Get().(*tableInserter) - *ti = tableInserter{ri: ri} - tw := tableWriter(ti) - if n.run.autoCommit == autoCommitEnabled { - tw.enableAutoCommit() - } - defer func() { - tw.close(params.ctx) - *ti = tableInserter{} - tableInserterPool.Put(ti) - }() - if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { - return err - } - - // Prepare the buffer for row values. At this point, one more column has - // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if - // a PRIMARY KEY is not specified by the user. - rowBuffer := make(tree.Datums, len(desc.Columns)) - pkColIdx := len(desc.Columns) - 1 - - // The optimizer includes the rowID expression as part of the input - // expression. But the heuristic planner does not do this, so construct - // a rowID expression to be evaluated separately. - var defTypedExpr tree.TypedExpr - if n.run.synthRowID { - // Prepare the rowID expression. - defExprSQL := *desc.Columns[pkColIdx].DefaultExpr - defExpr, err := parser.ParseExpr(defExprSQL) + if func() error { + // The data fill portion of CREATE AS must operate on a read snapshot, + // so that it doesn't end up observing its own writes. + prevMode, err := params.p.Txn().ConfigureStepping(client.SteppingEnabled) if err != nil { return err } - defTypedExpr, err = params.p.analyzeExpr( - params.ctx, - defExpr, - nil, /*sources*/ - tree.IndexedVarHelper{}, - types.Any, - false, /*requireType*/ - "CREATE TABLE AS") + defer func() { _, _ = params.p.Txn().ConfigureStepping(prevMode) }() + + // This is a very simplified version of the INSERT logic: no CHECK + // expressions, no FK checks, no arbitrary insertion order, no + // RETURNING, etc. + + // Instantiate a row inserter and table writer. It has a 1-1 + // mapping to the definitions in the descriptor. + ri, err := row.MakeInserter( + params.p.txn, + sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), + desc.Columns, + row.SkipFKs, + nil, /* fkTables */ + ¶ms.p.alloc) if err != nil { return err } - } - - for { - if err := params.p.cancelChecker.Check(); err != nil { + ti := tableInserterPool.Get().(*tableInserter) + *ti = tableInserter{ri: ri} + tw := tableWriter(ti) + if n.run.autoCommit == autoCommitEnabled { + tw.enableAutoCommit() + } + defer func() { + tw.close(params.ctx) + *ti = tableInserter{} + tableInserterPool.Put(ti) + }() + if err := tw.init(params.p.txn, params.p.EvalContext()); err != nil { return err } - if next, err := n.sourcePlan.Next(params); !next { + + // Prepare the buffer for row values. At this point, one more column has + // been added by ensurePrimaryKey() to the list of columns in sourcePlan, if + // a PRIMARY KEY is not specified by the user. + rowBuffer := make(tree.Datums, len(desc.Columns)) + pkColIdx := len(desc.Columns) - 1 + + // The optimizer includes the rowID expression as part of the input + // expression. But the heuristic planner does not do this, so construct + // a rowID expression to be evaluated separately. + var defTypedExpr tree.TypedExpr + if n.run.synthRowID { + // Prepare the rowID expression. + defExprSQL := *desc.Columns[pkColIdx].DefaultExpr + defExpr, err := parser.ParseExpr(defExprSQL) if err != nil { return err } - _, err := tw.finalize( - params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + defTypedExpr, err = params.p.analyzeExpr( + params.ctx, + defExpr, + nil, /*sources*/ + tree.IndexedVarHelper{}, + types.Any, + false, /*requireType*/ + "CREATE TABLE AS") if err != nil { return err } - break } - // Populate the buffer and generate the PK value. - copy(rowBuffer, n.sourcePlan.Values()) - if n.run.synthRowID { - rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) - if err != nil { + for { + if err := params.p.cancelChecker.Check(); err != nil { return err } - } + if next, err := n.sourcePlan.Next(params); !next { + if err != nil { + return err + } + _, err := tw.finalize( + params.ctx, params.extendedEvalCtx.Tracing.KVTracingEnabled()) + if err != nil { + return err + } + break + } - err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err + // Populate the buffer and generate the PK value. + copy(rowBuffer, n.sourcePlan.Values()) + if n.run.synthRowID { + rowBuffer[pkColIdx], err = defTypedExpr.Eval(params.p.EvalContext()) + if err != nil { + return err + } + } + + if err := tw.row(params.ctx, rowBuffer, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil { + return err + } } + return nil + }(); err != nil { + return err } } @@ -1364,6 +1381,7 @@ func MakeTableDesc( return desc, errors.Errorf("unsupported table def: %T", def) } } + // Now that we have all the other columns set up, we can validate // any computed columns. for _, def := range n.Defs { diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index 435984c79555..bb424cfbbe0f 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -37,6 +37,11 @@ type createViewNode struct { planDeps planDependencies } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CREATE VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *createViewNode) ReadingOwnWrites() {} + func (n *createViewNode) startExec(params runParams) error { // TODO(arul): Allow temporary views once temp tables work for regular tables. if n.temporary { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 025a8cf0fe4c..d41f19f5301f 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -987,6 +987,13 @@ func (dsp *DistSQLPlanner) PlanAndRunPostqueries( maybeDistribute bool, ) bool { for _, postqueryPlan := range postqueryPlans { + // We place a sequence point before every postquery, so + // that each subsequent postquery can observe the writes + // by the previous step. + if err := planner.Txn().Step(); err != nil { + recv.SetError(err) + return false + } if err := dsp.planAndRunPostquery( ctx, postqueryPlan, diff --git a/pkg/sql/drop_index.go b/pkg/sql/drop_index.go index d8c62e714a36..a69e22fdb854 100644 --- a/pkg/sql/drop_index.go +++ b/pkg/sql/drop_index.go @@ -59,6 +59,11 @@ func (p *planner) DropIndex(ctx context.Context, n *tree.DropIndex) (planNode, e return &dropIndexNode{n: n, idxNames: idxNames}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP INDEX performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropIndexNode) ReadingOwnWrites() {} + func (n *dropIndexNode) startExec(params runParams) error { ctx := params.ctx for _, index := range n.idxNames { diff --git a/pkg/sql/drop_sequence.go b/pkg/sql/drop_sequence.go index bf047c881a78..fd7760bda4e7 100644 --- a/pkg/sql/drop_sequence.go +++ b/pkg/sql/drop_sequence.go @@ -55,6 +55,11 @@ func (p *planner) DropSequence(ctx context.Context, n *tree.DropSequence) (planN }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP SEQUENCE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropSequenceNode) ReadingOwnWrites() {} + func (n *dropSequenceNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 0ad565c3a336..8c1efbc45128 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -97,6 +97,11 @@ func (p *planner) DropTable(ctx context.Context, n *tree.DropTable) (planNode, e return &dropTableNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP TABLE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropTableNode) ReadingOwnWrites() {} + func (n *dropTableNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/drop_view.go b/pkg/sql/drop_view.go index 099ec9fc64f1..85d4af2e028e 100644 --- a/pkg/sql/drop_view.go +++ b/pkg/sql/drop_view.go @@ -69,6 +69,11 @@ func (p *planner) DropView(ctx context.Context, n *tree.DropView) (planNode, err return &dropViewNode{n: n, td: td}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because DROP VIEW performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *dropViewNode) ReadingOwnWrites() {} + func (n *dropViewNode) startExec(params runParams) error { ctx := params.ctx for _, toDel := range n.td { diff --git a/pkg/sql/grant_revoke.go b/pkg/sql/grant_revoke.go index 778fde0d88fa..396de948d427 100644 --- a/pkg/sql/grant_revoke.go +++ b/pkg/sql/grant_revoke.go @@ -63,6 +63,11 @@ type changePrivilegesNode struct { changePrivilege func(*sqlbase.PrivilegeDescriptor, string) } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because GRANT/REVOKE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *changePrivilegesNode) ReadingOwnWrites() {} + func (n *changePrivilegesNode) startExec(params runParams) error { ctx := params.ctx p := params.p diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 2a4d0b1f0d9b..e42beeba92bf 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -177,7 +177,7 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro // the insert operation (including secondary index updates, FK // cascading updates, etc), before the current KV batch is executed // and a new batch is started. -const maxInsertBatchSize = 10000 +var maxInsertBatchSize = 10000 func (n *insertNode) startExec(params runParams) error { if err := params.p.maybeSetSystemConfig(n.run.ti.tableDesc().GetID()); err != nil { @@ -294,3 +294,10 @@ func (n *insertNode) Close(ctx context.Context) { func (n *insertNode) enableAutoCommit() { n.run.ti.enableAutoCommit() } + +// TestingSetInsertBatchSize exports a constant for testing only. +func TestingSetInsertBatchSize(val int) func() { + oldVal := maxInsertBatchSize + maxInsertBatchSize = val + return func() { maxInsertBatchSize = oldVal } +} diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index ff2ec5de3de7..0d57933f211e 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -31,9 +31,13 @@ var insertFastPathNodePool = sync.Pool{ }, } -// Check that exec.InsertFastPathMaxRows does not exceed maxInsertBatchSize -// (this is a compile error if the value is negative). -const _ uint = maxInsertBatchSize - exec.InsertFastPathMaxRows +// Check that exec.InsertFastPathMaxRows does not exceed the default +// maxInsertBatchSize. +func init() { + if maxInsertBatchSize < exec.InsertFastPathMaxRows { + panic("decrease exec.InsertFastPathMaxRows") + } +} // insertFastPathNode is a faster implementation of inserting values in a table // and performing FK checks. It is used when all the foreign key checks can be diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index ed39c6f49faa..4f59640d545f 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -481,6 +481,18 @@ func (ie *internalExecutorImpl) execInternal( ctx, sp := tracing.EnsureChildSpan(ctx, ie.s.cfg.AmbientCtx.Tracer, opName) defer sp.Finish() + // The client.Txn handed to this executor may currently have + // disabled stepping mode. However, any and all SQL expects + // stepping mode to be enabled. We'll enable it here and restore + // what we had upon exit. + if txn != nil { + prevStepping, err := txn.ConfigureStepping(client.SteppingEnabled) + if err != nil { + return result{}, err + } + defer func() { _, _ = txn.ConfigureStepping(prevStepping) }() + } + timeReceived := timeutil.Now() parseStart := timeReceived parsed, err := parser.ParseOne(stmt) diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 301d667ffe04..dafdee90daf2 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1764,7 +1764,7 @@ func (t *logicTest) processSubtest( return errors.Errorf("kv-batch-size needs an integer argument; %s", err) } t.outf("Setting kv batch size %d", batchSize) - defer row.SetKVBatchSize(int64(batchSize))() + defer row.TestingSetKVBatchSize(int64(batchSize))() default: return errors.Errorf("%s:%d: unknown command: %s", diff --git a/pkg/sql/logictest/testdata/logic_test/statement_source b/pkg/sql/logictest/testdata/logic_test/statement_source index 04ac9c3a1dfb..c26c7c210f14 100644 --- a/pkg/sql/logictest/testdata/logic_test/statement_source +++ b/pkg/sql/logictest/testdata/logic_test/statement_source @@ -72,10 +72,10 @@ SELECT * FROM a ORDER BY b # Regression for #30936: ensure that wrapped planNodes with non-needed columns work ok statement ok -CREATE TABLE b (a int, b int) +CREATE TABLE b (a int, b int); query II -SELECT * FROM b WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); +SELECT * FROM (VALUES (1, 2)) WHERE EXISTS (SELECT * FROM [INSERT INTO b VALUES (1,2) RETURNING a,b]); ---- 1 2 diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 6455d659c7cd..a21e2f346e1e 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -13,12 +13,14 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" ) // runParams is a struct containing all parameters passed to planNode.Next() and @@ -130,6 +132,19 @@ type planNodeFastPath interface { FastPathResults() (int, bool) } +// planNodeReadingOwnWrites can be implemented by planNodes which do +// not use the standard SQL principle of reading at the snapshot +// established at the start of the transaction. This is the case +// e.g. for most DDL statements that perform multiple KV operations on +// descriptors, expecting to read their own writes. +// +// This constraint is obeyed by (*planner).startExec(). +type planNodeReadingOwnWrites interface { + // ReadingOwnWrites can be implemented as no-op by nodes wishing + // to request the semantics described above. + ReadingOwnWrites() +} + var _ planNode = &alterIndexNode{} var _ planNode = &alterSequenceNode{} var _ planNode = &alterTableNode{} @@ -207,6 +222,16 @@ var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} var _ planNodeFastPath = &controlJobsNode{} +var _ planNodeReadingOwnWrites = &alterIndexNode{} +var _ planNodeReadingOwnWrites = &alterSequenceNode{} +var _ planNodeReadingOwnWrites = &alterTableNode{} +var _ planNodeReadingOwnWrites = &createIndexNode{} +var _ planNodeReadingOwnWrites = &createSequenceNode{} +var _ planNodeReadingOwnWrites = &createTableNode{} +var _ planNodeReadingOwnWrites = &createViewNode{} +var _ planNodeReadingOwnWrites = &changePrivilegesNode{} +var _ planNodeReadingOwnWrites = &setZoneConfigNode{} + // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results // spooled) during the start phase. This is currently implemented by @@ -318,6 +343,11 @@ func (p *planTop) close(ctx context.Context) { // startExec calls startExec() on each planNode using a depth-first, post-order // traversal. The subqueries, if any, are also started. // +// If the planNode also implements the nodeReadingOwnWrites interface, +// the txn is temporarily reconfigured to use read-your-own-writes for +// the duration of the call to startExec. This is used e.g. by +// DDL statements. +// // Reminder: walkPlan() ensures that subqueries and sub-plans are // started before startExec() is called. func startExec(params runParams, plan planNode) error { @@ -333,7 +363,17 @@ func startExec(params runParams, plan planNode) error { } return true, nil }, - leaveNode: func(_ string, n planNode) error { + leaveNode: func(_ string, n planNode) (err error) { + if _, ok := n.(planNodeReadingOwnWrites); ok { + _, err = params.p.Txn().ConfigureStepping(client.SteppingDisabled) + if err != nil { + return err + } + defer func() { + _, thisErr := params.p.Txn().ConfigureStepping(client.SteppingEnabled) + err = errors.CombineErrors(err, thisErr) + }() + } return n.startExec(params) }, } diff --git a/pkg/sql/rename_column.go b/pkg/sql/rename_column.go index 61d177a2d055..cbeeb842d4d5 100644 --- a/pkg/sql/rename_column.go +++ b/pkg/sql/rename_column.go @@ -50,6 +50,11 @@ func (p *planner) RenameColumn(ctx context.Context, n *tree.RenameColumn) (planN return &renameColumnNode{n: n, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME COLUMN performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameColumnNode) ReadingOwnWrites() {} + func (n *renameColumnNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_database.go b/pkg/sql/rename_database.go index d41195a7d4f3..1fb6aa382b30 100644 --- a/pkg/sql/rename_database.go +++ b/pkg/sql/rename_database.go @@ -63,6 +63,11 @@ func (p *planner) RenameDatabase(ctx context.Context, n *tree.RenameDatabase) (p }, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameDatabaseNode) ReadingOwnWrites() {} + func (n *renameDatabaseNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_index.go b/pkg/sql/rename_index.go index ae6b1f389394..afd4f4398ab3 100644 --- a/pkg/sql/rename_index.go +++ b/pkg/sql/rename_index.go @@ -60,6 +60,11 @@ func (p *planner) RenameIndex(ctx context.Context, n *tree.RenameIndex) (planNod return &renameIndexNode{n: n, idx: idx, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameIndexNode) ReadingOwnWrites() {} + func (n *renameIndexNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/rename_table.go b/pkg/sql/rename_table.go index f26b451a1efa..5bd06c7f8530 100644 --- a/pkg/sql/rename_table.go +++ b/pkg/sql/rename_table.go @@ -71,6 +71,11 @@ func (p *planner) RenameTable(ctx context.Context, n *tree.RenameTable) (planNod return &renameTableNode{n: n, oldTn: &oldTn, newTn: &newTn, tableDesc: tableDesc}, nil } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because RENAME DATABASE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *renameTableNode) ReadingOwnWrites() {} + func (n *renameTableNode) startExec(params runParams) error { p := params.p ctx := params.ctx diff --git a/pkg/sql/row/cascader.go b/pkg/sql/row/cascader.go index 2d011f47ed2d..3a85b3f4011f 100644 --- a/pkg/sql/row/cascader.go +++ b/pkg/sql/row/cascader.go @@ -86,6 +86,29 @@ func makeDeleteCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return nil, err + } + return &cascader{ txn: txn, fkTables: tablesByID, @@ -155,6 +178,29 @@ func makeUpdateCascader( if !required { return nil, nil } + + // TODO(knz,radu): FK cascading actions need to see the writes + // performed by the mutation. Moreover, each stage of the cascading + // actions must observe the writes from the previous stages but not + // its own writes. + // + // In order to make this true, we need to split the cascading + // actions into separate sequencing steps, and have the first + // cascading action happen no early than the end of all the + // "main" part of the statement. Unfortunately, the organization + // of the code does not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return nil, err + } + return &cascader{ txn: txn, fkTables: tablesByID, diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index c6a7fda7d22d..b916f826804d 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -503,7 +503,7 @@ func (rf *Fetcher) StartInconsistentScan( maxTimestampAge, ) } - txn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn := client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp) @@ -518,7 +518,7 @@ func (rf *Fetcher) StartInconsistentScan( // Advance the timestamp by the time that passed. txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */) txnStartTime = now - txn = client.NewTxn(ctx, db, 0 /* gatewayNodeID */) + txn = client.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) txn.SetFixedTimestamp(ctx, txnTimestamp) if log.V(1) { diff --git a/pkg/sql/row/fk_existence_delete.go b/pkg/sql/row/fk_existence_delete.go index d88ea885c8ba..1fc76efd0627 100644 --- a/pkg/sql/row/fk_existence_delete.go +++ b/pkg/sql/row/fk_existence_delete.go @@ -96,6 +96,28 @@ func makeFkExistenceCheckHelperForDelete( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return h, err + } + } + return h, nil } diff --git a/pkg/sql/row/fk_existence_insert.go b/pkg/sql/row/fk_existence_insert.go index 11f975e50fe1..cdc9d60eca1b 100644 --- a/pkg/sql/row/fk_existence_insert.go +++ b/pkg/sql/row/fk_existence_insert.go @@ -87,6 +87,28 @@ func makeFkExistenceCheckHelperForInsert( h.fks[mutatedIdx.ID] = append(h.fks[mutatedIdx.ID], fk) } + if len(h.fks) > 0 { + // TODO(knz,radu): FK existence checks need to see the writes + // performed by the mutation. + // + // In order to make this true, we need to split the existence + // checks into a separate sequencing step, and have the first + // check happen no early than the end of all the "main" part of + // the statement. Unfortunately, the organization of the code does + // not allow this today. + // + // See: https://github.com/cockroachdb/cockroach/issues/33475 + // + // In order to "make do" and preserve a modicum of FK semantics we + // thus need to disable step-wise execution here. The result is that + // it will also enable any interleaved read part to observe the + // mutation, and thus introduce the risk of a Halloween problem for + // any mutation that uses FK relationships. + if _, err := txn.ConfigureStepping(client.SteppingDisabled); err != nil { + return h, err + } + } + return h, nil } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 5897ad0d9e19..05d9bc8bbf0d 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -28,13 +28,6 @@ import ( // TODO(radu): parameters like this should be configurable var kvBatchSize int64 = 10000 -// SetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. -func SetKVBatchSize(val int64) func() { - oldVal := kvBatchSize - kvBatchSize = val - return func() { kvBatchSize = oldVal } -} - // sendFunc is the function used to execute a KV batch; normally // wraps (*client.Txn).Send. type sendFunc func( @@ -352,3 +345,11 @@ func (f *txnKVFetcher) nextBatch( } return f.nextBatch(ctx) } + +// TestingSetKVBatchSize changes the kvBatchFetcher batch size, and returns a function that restores it. +// This is to be used only in tests - we have no test coverage for arbitrary kv batch sizes at this time. +func TestingSetKVBatchSize(val int64) func() { + oldVal := kvBatchSize + kvBatchSize = val + return func() { kvBatchSize = oldVal } +} diff --git a/pkg/sql/scan_test.go b/pkg/sql/scan_test.go index 261a0002c844..243d50aa207a 100644 --- a/pkg/sql/scan_test.go +++ b/pkg/sql/scan_test.go @@ -125,7 +125,7 @@ func TestScanBatches(t *testing.T) { defer leaktest.AfterTest(t)() // The test will screw around with KVBatchSize; make sure to restore it at the end. - restore := row.SetKVBatchSize(10) + restore := row.TestingSetKVBatchSize(10) defer restore() s, db, _ := serverutils.StartServer( @@ -171,7 +171,7 @@ func TestScanBatches(t *testing.T) { numSpanValues := []int{0, 1, 2, 3} for _, batch := range batchSizes { - row.SetKVBatchSize(int64(batch)) + row.TestingSetKVBatchSize(int64(batch)) for _, numSpans := range numSpanValues { testScanBatchQuery(t, db, numSpans, numAs, numBs, false) testScanBatchQuery(t, db, numSpans, numAs, numBs, true) diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index 385ab386100f..0eb662a01278 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -201,6 +201,11 @@ type setZoneConfigRun struct { numAffected int } +// ReadingOwnWrites implements the planNodeReadingOwnWrites interface. +// This is because CONFIGURE ZONE performs multiple KV operations on descriptors +// and expects to see its own writes. +func (n *setZoneConfigNode) ReadingOwnWrites() {} + func (n *setZoneConfigNode) startExec(params runParams) error { var yamlConfig string var setters []func(c *zonepb.ZoneConfig) diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 329bee195896..ebdc007e31e4 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -467,7 +467,7 @@ func TestCreateStatsProgress(t *testing.T) { }(rowexec.SamplerProgressInterval) rowexec.SamplerProgressInterval = 10 - resetKVBatchSize := row.SetKVBatchSize(10) + resetKVBatchSize := row.TestingSetKVBatchSize(10) defer resetKVBatchSize() var allowRequest chan struct{} diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 059db0425b4f..bf5c768b84a6 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -203,7 +203,7 @@ func (ts *txnState) resetForNewSQLTxn( ts.mon.Start(ts.Ctx, tranCtx.connMon, mon.BoundAccount{} /* reserved */) ts.mu.Lock() if txn == nil { - ts.mu.txn = client.NewTxn(ts.Ctx, tranCtx.db, tranCtx.nodeID) + ts.mu.txn = client.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeID) ts.mu.txn.SetDebugName(opName) } else { ts.mu.txn = txn