diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 57921fcf53a6..d9c9172ca928 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -65,6 +65,8 @@ /pkg/sql/pgwire/auth.go @cockroachdb/sql-experience @cockroachdb/server-prs @cockroachdb/prodsec /pkg/sql/sem/builtins/ @cockroachdb/sql-experience /pkg/sql/vtable/ @cockroachdb/sql-experience +/pkg/sql/importer/ @cockroachdb/sql-experience +/pkg/ccl/importerccl/ @cockroachdb/sql-experience /pkg/sql/sessiondata/ @cockroachdb/sql-experience /pkg/sql/tests/rsg_test.go @cockroachdb/sql-experience @@ -100,7 +102,7 @@ /pkg/cli/debug*.go @cockroachdb/kv-prs @cockroachdb/cli-prs /pkg/cli/debug_job_trace*.go @cockroachdb/jobs-prs /pkg/cli/doctor*.go @cockroachdb/sql-schema @cockroachdb/cli-prs -/pkg/cli/import_test.go @cockroachdb/disaster-recovery @cockroachdb/cli-prs +/pkg/cli/import_test.go @cockroachdb/sql-experience @cockroachdb/cli-prs /pkg/cli/sql*.go @cockroachdb/sql-experience @cockroachdb/cli-prs /pkg/cli/clisqlshell/ @cockroachdb/sql-experience @cockroachdb/cli-prs /pkg/cli/clisqlclient/ @cockroachdb/sql-experience @cockroachdb/cli-prs @@ -163,9 +165,6 @@ /pkg/ccl/backupccl/ @cockroachdb/disaster-recovery /pkg/ccl/backupccl/*_job.go @cockroachdb/disaster-recovery @cockroachdb/jobs-prs -/pkg/sql/importer/ @cockroachdb/disaster-recovery -/pkg/ccl/importerccl/ @cockroachdb/disaster-recovery -/pkg/ccl/spanconfigccl/ @cockroachdb/kv-prs /pkg/ccl/storageccl/ @cockroachdb/disaster-recovery /pkg/ccl/cloudccl/ @cockroachdb/disaster-recovery /pkg/cloud/ @cockroachdb/disaster-recovery @@ -174,6 +173,7 @@ /pkg/geo/ @cockroachdb/geospatial /pkg/kv/ @cockroachdb/kv-prs +/pkg/ccl/spanconfigccl/ @cockroachdb/kv-prs /pkg/kv/kvclient/kvstreamer @cockroachdb/sql-queries /pkg/ccl/storageccl/engineccl @cockroachdb/storage diff --git a/docs/generated/sql/bnf/BUILD.bazel b/docs/generated/sql/bnf/BUILD.bazel index af1734f8f533..f8a811fad8ec 100644 --- a/docs/generated/sql/bnf/BUILD.bazel +++ b/docs/generated/sql/bnf/BUILD.bazel @@ -210,6 +210,7 @@ FILES = [ "show_backup", "show_cluster_setting", "show_columns_stmt", + "show_commit_timestamp_stmt", "show_constraints_stmt", "show_create_stmt", "show_create_schedules_stmt", diff --git a/docs/generated/sql/bnf/show_commit_timestamp_stmt.bnf b/docs/generated/sql/bnf/show_commit_timestamp_stmt.bnf new file mode 100644 index 000000000000..b26a3f9be66e --- /dev/null +++ b/docs/generated/sql/bnf/show_commit_timestamp_stmt.bnf @@ -0,0 +1,2 @@ +show_commit_timestamp_stmt ::= + 'SHOW' 'COMMIT' 'TIMESTAMP' diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 554d55a530c6..3ec697741f4c 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -30,6 +30,7 @@ stmt_without_legacy_transaction ::= | fetch_cursor_stmt | move_cursor_stmt | unlisten_stmt + | show_commit_timestamp_stmt legacy_transaction_stmt ::= legacy_begin_stmt @@ -160,6 +161,9 @@ unlisten_stmt ::= 'UNLISTEN' type_name | 'UNLISTEN' '*' +show_commit_timestamp_stmt ::= + 'SHOW' 'COMMIT' 'TIMESTAMP' + legacy_begin_stmt ::= 'BEGIN' opt_transaction begin_transaction diff --git a/docs/generated/sql/bnf/stmt_without_legacy_transaction.bnf b/docs/generated/sql/bnf/stmt_without_legacy_transaction.bnf index f35e0e57862f..1ce3aa196361 100644 --- a/docs/generated/sql/bnf/stmt_without_legacy_transaction.bnf +++ b/docs/generated/sql/bnf/stmt_without_legacy_transaction.bnf @@ -21,3 +21,4 @@ stmt_without_legacy_transaction ::= | fetch_cursor_stmt | move_cursor_stmt | unlisten_stmt + | show_commit_timestamp_stmt diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index 7964abf1dea8..48c898cb3f4c 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -1638,6 +1638,13 @@ func TestTenantLogic_shift( runLogicTest(t, "shift") } +func TestTenantLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestTenantLogic_show_completions( t *testing.T, ) { diff --git a/pkg/gen/bnf.bzl b/pkg/gen/bnf.bzl index 34d8206ffb62..56ade0b89f54 100644 --- a/pkg/gen/bnf.bzl +++ b/pkg/gen/bnf.bzl @@ -210,6 +210,7 @@ BNF_SRCS = [ "//docs/generated/sql/bnf:show_backup.bnf", "//docs/generated/sql/bnf:show_cluster_setting.bnf", "//docs/generated/sql/bnf:show_columns_stmt.bnf", + "//docs/generated/sql/bnf:show_commit_timestamp_stmt.bnf", "//docs/generated/sql/bnf:show_constraints_stmt.bnf", "//docs/generated/sql/bnf:show_create_external_connections_stmt.bnf", "//docs/generated/sql/bnf:show_create_schedules_stmt.bnf", diff --git a/pkg/gen/diagrams.bzl b/pkg/gen/diagrams.bzl index 106d954e04f5..a2fa434f7989 100644 --- a/pkg/gen/diagrams.bzl +++ b/pkg/gen/diagrams.bzl @@ -208,6 +208,7 @@ DIAGRAMS_SRCS = [ "//docs/generated/sql/bnf:show_backup.html", "//docs/generated/sql/bnf:show_cluster_setting.html", "//docs/generated/sql/bnf:show_columns.html", + "//docs/generated/sql/bnf:show_commit_timestamp.html", "//docs/generated/sql/bnf:show_constraints.html", "//docs/generated/sql/bnf:show_create.html", "//docs/generated/sql/bnf:show_create_external_connections.html", diff --git a/pkg/gen/docs.bzl b/pkg/gen/docs.bzl index e1bc9b650c03..1e2ec78888e1 100644 --- a/pkg/gen/docs.bzl +++ b/pkg/gen/docs.bzl @@ -222,6 +222,7 @@ DOCS_SRCS = [ "//docs/generated/sql/bnf:show_backup.bnf", "//docs/generated/sql/bnf:show_cluster_setting.bnf", "//docs/generated/sql/bnf:show_columns_stmt.bnf", + "//docs/generated/sql/bnf:show_commit_timestamp_stmt.bnf", "//docs/generated/sql/bnf:show_constraints_stmt.bnf", "//docs/generated/sql/bnf:show_create_external_connections_stmt.bnf", "//docs/generated/sql/bnf:show_create_schedules_stmt.bnf", diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 7fae9e8c136e..1e6b026792a7 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "conn_executor_exec.go", "conn_executor_prepare.go", "conn_executor_savepoints.go", + "conn_executor_show_commit_timestamp.go", "conn_fsm.go", "conn_io.go", "constraint.go", diff --git a/pkg/sql/catalog/colinfo/result_columns.go b/pkg/sql/catalog/colinfo/result_columns.go index 429d89626748..cf07937fb39c 100644 --- a/pkg/sql/catalog/colinfo/result_columns.go +++ b/pkg/sql/catalog/colinfo/result_columns.go @@ -142,6 +142,11 @@ var ExplainPlanColumns = ResultColumns{ {Name: "info", Typ: types.String}, } +// ShowCommitTimestampColumns are the result columns of SHOW COMMIT TIMESTAMP. +var ShowCommitTimestampColumns = ResultColumns{ + {Name: "commit_timestamp", Typ: types.Decimal}, +} + // ShowTraceColumns are the result columns of a SHOW [KV] TRACE statement. var ShowTraceColumns = ResultColumns{ {Name: "timestamp", Typ: types.TimestampTZ}, diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 6d69447c5e5f..c3acee83d0a7 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1358,7 +1358,7 @@ type connExecutor struct { txnFinishClosure struct { // txnStartTime is the time that the transaction started. txnStartTime time.Time - // implicit is whether or not the transaction was implicit. + // implicit is whether the transaction was implicit. implicit bool } @@ -1414,6 +1414,13 @@ type connExecutor struct { rowsWrittenLogged bool rowsReadLogged bool + // shouldAcceptReleaseCockroachRestartInCommitWait is set to true if the + // transaction had SAVEPOINT cockroach_restart installed at the time that + // SHOW COMMIT TIMESTAMP was executed to commit the transaction. If so, the + // connExecutor will permit one invocation of RELEASE SAVEPOINT + // cockroach_restart while in the CommitWait state. + shouldAcceptReleaseCockroachRestartInCommitWait bool + // hasAdminRole is used to cache if the user running the transaction // has admin privilege. hasAdminRoleCache is set for the first statement // in a transaction. @@ -1542,6 +1549,11 @@ type connExecutor struct { // totalActiveTimeStopWatch tracks the total active time of the session. // This is defined as the time spent executing transactions and statements. totalActiveTimeStopWatch *timeutil.StopWatch + + // previousTransactionCommitTimestamp is the timestamp of the previous + // transaction which committed. It is zero-valued when there is a transaction + // open or the previous transaction did not successfully commit. + previousTransactionCommitTimestamp hlc.Timestamp } // ctxHolder contains a connection's context and, while session tracing is @@ -1928,7 +1940,9 @@ func (ex *connExecutor) execCmd() error { // The behavior is configurable, in case users want to preserve the // behavior from v21.2 and earlier. implicitTxnForBatch := ex.sessionData().EnableImplicitTransactionForBatchStatements - canAutoCommit := ex.implicitTxn() && (tcmd.LastInBatch || !implicitTxnForBatch) + canAutoCommit := ex.implicitTxn() && + (tcmd.LastInBatchBeforeShowCommitTimestamp || + tcmd.LastInBatch || !implicitTxnForBatch) ev, payload, err = ex.execStmt( ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit, ) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 56fbd8353b2f..ca60c523dbbd 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -123,7 +123,7 @@ func (ex *connExecutor) execStmt( // Note: when not using explicit transactions, we go through this transition // for every statement. It is important to minimize the amount of work and // allocations performed up to this point. - ev, payload = ex.execStmtInNoTxnState(ctx, ast) + ev, payload = ex.execStmtInNoTxnState(ctx, ast, res) case stateOpen: err = ex.execWithProfiling(ctx, ast, prepared, func(ctx context.Context) error { @@ -590,6 +590,10 @@ func (ex *connExecutor) execStmtInOpenState( ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res) return ev, payload, nil + case *tree.ShowCommitTimestamp: + ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit) + return ev, payload, nil + case *tree.Prepare: // This is handling the SQL statement "PREPARE". See execPrepare for // handling of the protocol-level command for preparing statements. @@ -777,6 +781,9 @@ func (ex *connExecutor) execStmtInOpenState( // the timestamps of the transaction accordingly. func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) error { if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + if _, ok := stmt.(*tree.ShowCommitTimestamp); ok { + return nil + } return errors.AssertionFailedf( "cannot handle AOST clause without a transaction", ) @@ -1670,7 +1677,7 @@ var eventStartExplicitTxn fsm.Event = eventTxnStart{ImplicitTxn: fsm.False} // the cursor is not advanced. This means that the statement will run again in // stateOpen, at each point its results will also be flushed. func (ex *connExecutor) execStmtInNoTxnState( - ctx context.Context, ast tree.Statement, + ctx context.Context, ast tree.Statement, res RestrictedCommandResult, ) (_ fsm.Event, payload fsm.EventPayload) { switch s := ast.(type) { case *tree.BeginTransaction: @@ -1693,6 +1700,8 @@ func (ex *connExecutor) execStmtInNoTxnState( historicalTs, ex.transitionCtx, ex.QualityOfService()) + case *tree.ShowCommitTimestamp: + return ex.execShowCommitTimestampInNoTxnState(ctx, s, res) case *tree.CommitTransaction, *tree.ReleaseSavepoint, *tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint: return ex.makeErrEvent(errNoTransactionInProgress, ast) @@ -1815,7 +1824,15 @@ func (ex *connExecutor) execStmtInCommitWaitState( ex.incrementExecutedStmtCounter(ast) } }() - switch ast.(type) { + switch s := ast.(type) { + case *tree.ReleaseSavepoint: + if ex.extraTxnState.shouldAcceptReleaseCockroachRestartInCommitWait && + s.Savepoint == commitOnReleaseSavepointName { + ex.extraTxnState.shouldAcceptReleaseCockroachRestartInCommitWait = false + return nil, nil + } + case *tree.ShowCommitTimestamp: + return ex.execShowCommitTimestampInCommitWaitState(ctx, s, res) case *tree.CommitTransaction, *tree.RollbackTransaction: // Reply to a rollback with the COMMIT tag, by analogy to what we do when we // get a COMMIT in state Aborted. @@ -1828,13 +1845,11 @@ func (ex *connExecutor) execStmtInCommitWaitState( return nil }, ) - default: - ev = eventNonRetriableErr{IsCommit: fsm.False} - payload = eventNonRetriableErrPayload{ + } + return eventNonRetriableErr{IsCommit: fsm.False}, + eventNonRetriableErrPayload{ err: sqlerrors.NewTransactionCommittedError(), } - return ev, payload - } } // runObserverStatement executes the given observer statement. @@ -2257,6 +2272,8 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { } ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1) } + // If we have a commitTimestamp, we should use it. + ex.previousTransactionCommitTimestamp.Forward(ev.commitTimestamp) } } @@ -2320,6 +2337,8 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.extraTxnState.shouldExecuteOnTxnRestart = true ex.statsCollector.StartTransaction() + + ex.previousTransactionCommitTimestamp = hlc.Timestamp{} } func (ex *connExecutor) recordTransactionFinish( diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 23d20c762419..2cbdc92d6ef5 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -40,7 +40,13 @@ func (ex *connExecutor) execPrepare( // descriptors for type checking. This implicit txn will be open until // the Sync message is handled. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { - return ex.beginImplicitTxn(ctx, parseCmd.AST) + // The one exception is that we must not open a new transaction when + // preparing SHOW COMMIT TIMESTAMP. If we did, it would destroy the + // information about the previous transaction. We expect to execute + // this command in NoTxn. + if _, ok := parseCmd.AST.(*tree.ShowCommitTimestamp); !ok { + return ex.beginImplicitTxn(ctx, parseCmd.AST) + } } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { if !ex.isAllowedInAbortedTxn(parseCmd.AST) { return retErr(sqlerrors.NewTransactionAbortedError("" /* customMsg */)) @@ -315,7 +321,12 @@ func (ex *connExecutor) execBind( // SQL EXECUTE command (which also needs to bind and resolve types) is // handled separately in conn_executor_exec. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { - return ex.beginImplicitTxn(ctx, ps.AST) + // The one critical exception is that we must not open a transaction when + // executing SHOW COMMIT TIMESTAMP as it would destroy the information + // about the previously committed transaction. + if _, ok := ps.AST.(*tree.ShowCommitTimestamp); !ok { + return ex.beginImplicitTxn(ctx, ps.AST) + } } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { if !ex.isAllowedInAbortedTxn(ps.AST) { return retErr(sqlerrors.NewTransactionAbortedError("" /* customMsg */)) diff --git a/pkg/sql/conn_executor_show_commit_timestamp.go b/pkg/sql/conn_executor_show_commit_timestamp.go new file mode 100644 index 000000000000..279a15aed0b4 --- /dev/null +++ b/pkg/sql/conn_executor_show_commit_timestamp.go @@ -0,0 +1,147 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/fsm" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// execShowCommitTimestampInOpenState deals with the special statement +// SHOW COMMIT TIMESTAMP while in an open transaction. This statement, only +// allowed in an explicit transaction, will internally commit the underlying +// sql transaction and then write out the timestamp at which that transaction +// committed. The transaction state machine will move to the commitWait state +// much like when you do RELEASE cockroach_restart. One can still use RELEASE +// cockroach_restart; we support this so that SHOW COMMIT TIMESTAMP composes +// well with middleware which internally injects such savepoints and manages +// transaction controls above the client. +func (ex *connExecutor) execShowCommitTimestampInOpenState( + ctx context.Context, s *tree.ShowCommitTimestamp, res RestrictedCommandResult, canAutoCommit bool, +) (fsm.Event, fsm.EventPayload) { + + // We do not allow SHOW COMMIT TIMESTAMP in the middle of an implicit + // transaction -- it's not clear what it would mean. Given we don't move + // to Open from NoTxn when this statement was executed alone, the only + // possible way to get here is if the statement occurred in the middle of + // a multi-statement implicit transaction or via a multi-statement + // transaction using the extended protocol. There's logic in the + // connExecutor to detect if this statement is the last statement of an + // implicit transaction using the simple protocol and to instead treat + // it as though it were not part of the transaction. The advantage of that + // approach is that it allows a pairing of statements like INSERT and + // SHOW COMMIT TIMESTAMP to still hit 1PC for the INSERT. + implicit := ex.implicitTxn() + if implicit && !canAutoCommit { + return ex.makeErrEvent(pgerror.Newf( + pgcode.InvalidTransactionState, + "cannot use SHOW COMMIT TIMESTAMP in multi-statement implicit transaction"), + s) + } + + res.ResetStmtType((*tree.CommitTransaction)(nil)) + err := ex.commitSQLTransactionInternal(ctx) + if err == nil { + + if err := writeShowCommitTimestampRow( + ctx, res, ex.state.mu.txn.CommitTimestamp(), + ); err != nil { + return ex.makeErrEvent(err, s) + } + + // If we have a SAVEPOINT cockroach_restart, then we need to note that + // fact now, as the SAVEPOINT stack will be destroyed as the state + // machine moves into COMMIT. This state in extraTxnState will be cleaned + // up as we process any statement in CommitWait. + if entry, _ := ex.extraTxnState.savepoints.find( + commitOnReleaseSavepointName, + ); entry != nil && entry.commitOnRelease { + ex.extraTxnState.shouldAcceptReleaseCockroachRestartInCommitWait = true + } + + // If this is an implicit transaction, we must have gotten here through + // the extended protocol and this must be an auto-commit statement. The + // event to move the state machine will follow. + if implicit { + return nil, nil + } + + return eventTxnCommittedWithShowCommitTimestamp{}, nil + } + + // Committing the transaction failed. We'll go to state RestartWait if + // it's a retriable error, or to state RollbackWait otherwise. + if errIsRetriable(err) { + rc, canAutoRetry := ex.getRewindTxnCapability() + ev := eventRetriableErr{ + IsCommit: fsm.FromBool(false /* isCommit */), + CanAutoRetry: fsm.FromBool(canAutoRetry), + } + payload := eventRetriableErrPayload{err: err, rewCap: rc} + return ev, payload + } + + ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)} + payload := eventNonRetriableErrPayload{err: err} + return ev, payload +} + +// execShowCommitTimestampInCommitWaitState deals with the special statement +// SHOW COMMIT TIMESTAMP while in the commitWait state. One can reach this +// point either by issuing SHOW COMMIT TIMESTAMP multiple times or by issuing +// it after RELEASE cockroach_restart. +func (ex *connExecutor) execShowCommitTimestampInCommitWaitState( + ctx context.Context, s *tree.ShowCommitTimestamp, res RestrictedCommandResult, +) (fsm.Event, fsm.EventPayload) { + if err := writeShowCommitTimestampRow( + ctx, res, ex.state.mu.txn.CommitTimestamp(), + ); err != nil { + return ex.makeErrEvent(err, s) + } + return nil, nil +} + +// execShowCommitTimestampInNoTxnState deals with the special statement +// SHOW COMMIT TIMESTAMP while in the noTxn state. One can reach this +// by executing the statement after successfully committing an implicit +// or explicit transaction. An error will be returned if either no transaction +// has been previously committed on this session or if the last transaction +// created encountered an error. +func (ex *connExecutor) execShowCommitTimestampInNoTxnState( + ctx context.Context, s *tree.ShowCommitTimestamp, res RestrictedCommandResult, +) (fsm.Event, fsm.EventPayload) { + ts := ex.previousTransactionCommitTimestamp + if ts.IsEmpty() { + return ex.makeErrEvent(pgerror.Newf( + pgcode.InvalidTransactionState, "no previous transaction", + ), s) + } + if err := writeShowCommitTimestampRow( + ctx, res, ts, + ); err != nil { + return ex.makeErrEvent(err, s) + } + return nil, nil +} + +func writeShowCommitTimestampRow( + ctx context.Context, res RestrictedCommandResult, ts hlc.Timestamp, +) error { + res.SetColumns(ctx, colinfo.ShowCommitTimestampColumns) + return res.AddRow(ctx, tree.Datums{eval.TimestampToDecimalDatum(ts)}) +} diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 077dede964d8..663bc5c1fe09 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -203,20 +203,26 @@ type eventTxnRestart struct{} // generated by releasing regular savepoints. type eventTxnReleased struct{} +// eventTxnCommittedWithShowCommitTimestamp is generated after a successful +// SHOW COMMIT TIMESTAMP in an explicit transaction. It moves the state to +// CommitWait. +type eventTxnCommittedWithShowCommitTimestamp struct{} + // payloadWithError is a common interface for the payloads that wrap an error. type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinishCommitted) Event() {} -func (eventTxnFinishAborted) Event() {} -func (eventSavepointRollback) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnRestart) Event() {} -func (eventTxnReleased) Event() {} -func (eventTxnUpgradeToExplicit) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinishCommitted) Event() {} +func (eventTxnFinishAborted) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnRestart) Event() {} +func (eventTxnReleased) Event() {} +func (eventTxnCommittedWithShowCommitTimestamp) Event() {} +func (eventTxnUpgradeToExplicit) Event() {} // TxnStateTransitions describe the transitions used by a connExecutor's // fsm.Machine. Args.Extended is a txnState, which is muted by the Actions. @@ -348,6 +354,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, + eventTxnCommittedWithShowCommitTimestamp{}: { + Description: "SHOW COMMIT TIMESTAMP", + Next: stateCommitWait{}, + Action: moveToCommitWaitAfterInternalCommit, + }, // This is the case where we auto-retry explicit transactions. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { // Rewind and auto-retry - the transaction should stay in the Open state. @@ -361,20 +372,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventTxnReleased{}: { Description: "RELEASE SAVEPOINT cockroach_restart", Next: stateCommitWait{}, - Action: func(args fsm.Args) error { - ts := args.Extended.(*txnState) - txnID := func() uuid.UUID { - ts.mu.Lock() - defer ts.mu.Unlock() - return ts.mu.txn.ID() - }() - ts.setAdvanceInfo( - advanceOne, - noRewind, - txnEvent{eventType: txnCommit, txnID: txnID}, - ) - return nil - }, + Action: moveToCommitWaitAfterInternalCommit, }, }, @@ -523,8 +521,10 @@ func noTxnToOpen(args fsm.Args) error { // finishTxn finishes the transaction. It also calls setAdvanceInfo() with the // given event. func (ts *txnState) finishTxn(ev txnEventType) error { - finishedTxnID := ts.finishSQLTxn() - ts.setAdvanceInfo(advanceOne, noRewind, txnEvent{eventType: ev, txnID: finishedTxnID}) + finishedTxnID, commitTimestamp := ts.finishSQLTxn() + ts.setAdvanceInfo(advanceOne, noRewind, txnEvent{ + eventType: ev, txnID: finishedTxnID, commitTimestamp: commitTimestamp, + }) return nil } @@ -536,7 +536,7 @@ func cleanupAndFinishOnError(args fsm.Args) error { defer ts.mu.Unlock() _ = ts.mu.txn.Rollback(ts.Ctx) }() - finishedTxnID := ts.finishSQLTxn() + finishedTxnID, _ := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, noRewind, @@ -560,6 +560,25 @@ func prepareTxnForRetry(args fsm.Args) error { return nil } +func moveToCommitWaitAfterInternalCommit(args fsm.Args) error { + ts := args.Extended.(*txnState) + txnID, commitTimestamp := func() (uuid.UUID, hlc.Timestamp) { + ts.mu.Lock() + defer ts.mu.Unlock() + return ts.mu.txn.ID(), ts.mu.txn.CommitTimestamp() + }() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{ + eventType: txnCommit, + txnID: txnID, + commitTimestamp: commitTimestamp, + }, + ) + return nil +} + func prepareTxnForRetryWithRewind(args fsm.Args) error { pl := args.Payload.(eventRetriableErrPayload) ts := args.Extended.(*txnState) @@ -591,7 +610,7 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateInternalError{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - finishedTxnID := ts.finishSQLTxn() + finishedTxnID, _ := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, noRewind, @@ -604,7 +623,7 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateInternalError{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - finishedTxnID := ts.finishSQLTxn() + finishedTxnID, _ := ts.finishSQLTxn() ts.setAdvanceInfo( skipBatch, noRewind, diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index f0861ed00df9..f0583982cb3c 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -140,6 +140,15 @@ type ExecStmt struct { // LastInBatch indicates if this command contains the last query in a // simple protocol Query message that contains a batch of 1 or more queries. LastInBatch bool + // LastInBatchBeforeShowCommitTimestamp indicates that this command contains + // the second-to-last query in a simple protocol Query message that contains + // a batch of 2 or more queries and the last query is SHOW COMMIT TIMESTAMP. + // Detecting this case allows us to treat this command as the LastInBatch + // such that the SHOW COMMIT TIMESTAMP statement can return the timestamp of + // the transaction which applied to all the other statements in the batch. + // Note that SHOW COMMIT TIMESTAMP is not permitted in any other position in + // such a multi-statement implicit transaction. + LastInBatchBeforeShowCommitTimestamp bool } // command implements the Command interface. diff --git a/pkg/sql/logictest/testdata/logic_test/show_commit_timestamp b/pkg/sql/logictest/testdata/logic_test/show_commit_timestamp new file mode 100644 index 000000000000..3864b5509ff6 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/show_commit_timestamp @@ -0,0 +1,345 @@ +statement ok +create table foo (i int primary key) + +subtest basic + +statement ok +begin; +insert into foo values (1) + +let $commit_ts +show commit timestamp + +statement ok +commit + +let $commit_ts_after_txn +show commit timestamp + +query B +select $commit_ts_after_txn = $commit_ts +---- +true + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts +---- +1 + +subtest after_release_cockroach_restart + +statement ok +begin; +savepoint cockroach_restart; +insert into foo values (2); +release cockroach_restart + +let $commit_ts +show commit timestamp + +let $commit_ts_again +show commit timestamp + +statement ok +commit + +let $commit_ts_after_txn +show commit timestamp + +query BB +select $commit_ts_after_txn = $commit_ts, $commit_ts = $commit_ts_again +---- +true true + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts +---- +2 + +subtest after_release_cockroach_restart_txn + +statement ok +begin; +savepoint cockroach_restart; +insert into foo values (3); +release cockroach_restart; +commit + +let $commit_ts +show commit timestamp + +let $commit_ts_again +show commit timestamp + +query BB +select $commit_ts_after_txn = $commit_ts, $commit_ts = $commit_ts_again +---- +false true + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts +---- +3 + +subtest implicit_txn + +statement ok +insert into foo values (4); + +let $commit_ts +show commit timestamp + +let $commit_ts_again +show commit timestamp + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts +---- +4 + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts_again +---- +4 + +query I +select * from foo where crdb_internal_mvcc_timestamp = ($commit_ts) + 0.0000000001 +---- + +query I +select * from foo where crdb_internal_mvcc_timestamp = ($commit_ts) + 1 +---- + +subtest invalid_transaction_state + +statement ok +begin; +rollback + +statement error pgcode 25000 no previous transaction +show commit timestamp + +statement ok +insert into foo values (5) + +statement ok +show commit timestamp + +statement error pgcode 22012 division by zero +begin; +select 1/0; + +statement error pgcode 25P02 current transaction is aborted, commands ignored until end of transaction block +show commit timestamp + +statement ok +rollback + +statement error pgcode 25000 no previous transaction +show commit timestamp + +subtest multistatement_explicit_transaction + +let $commit_ts +insert into foo values (6); +insert into foo values (7); +show commit timestamp + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts order by i +---- +6 +7 + +statement error pgcode 25000 cannot use SHOW COMMIT TIMESTAMP in multi-statement implicit transaction +insert into foo values (8); +show commit timestamp; +insert into foo values (9) + + +# Note that this will become two transaction and we'll get the commit timestamp +# of the second one. +let $commit_ts +insert into foo values(8); +insert into foo values(9); +begin; +insert into foo values(10); +commit; +insert into foo values(11); +insert into foo values (12); +show commit timestamp + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts order by i +---- +11 +12 + +# Ensure that show commit timestamp can be used in upgraded explicit +# transactions. + +let $commit_ts +insert into foo values(13); +insert into foo values(14); +begin; +insert into foo values(15); +show commit timestamp; + +# Despite this error, the transaction has committed at this point and we +# have the causality token. + +statement error pgcode 25000 current transaction is committed, commands ignored until end of transaction block +insert into foo values(16); + +statement ok +commit + +query I +select * from foo where crdb_internal_mvcc_timestamp = $commit_ts order by i +---- +13 +14 +15 + +subtest udf + +statement error pgcode 0A000 unimplemented: SHOW COMMIT TIMESTAMP usage inside a function definition +create function f() returns decimal volatile language sql as $$ show commit timestamp $$; + +subtest prepare + +# You cannot prepare SHOW COMMIT TIMESTAMP because it is not preparable. + +statement error pgcode 42601 at or near "commit": syntax error +prepare s as show commit timestamp; + +subtest cte + +# You cannot use SHOW COMMIT TIMESTAMP because it is not a row source, nor +# is it preparable, so it cannot be used in a CTE or square brackets. + +statement error pgcode 42601 at or near "commit": syntax error +with committs as (show commit timestamp) select * from committs; + +statement error pgcode 42601 at or near "commit": syntax error +select * from [show commit timestamp] + +# Test that jobs still run and are waited for as a part of committing +# internally. +subtest ddl + +statement ok +drop table foo; +create table foo (i int primary key) + +statement ok +insert into foo values (1) + +statement ok +begin; +alter table foo add column j int default 42 + +let $commit_ts +show commit timestamp + +statement ok +commit; + +query II +select * from foo +---- +1 42 + +# Note that the causality token of the DDL is the timestamp when the user +# transaction commits, it does not tell you about the timestamp when the +# asynchronous schema change job[s] complete. +query I +select * from foo as of system time $commit_ts +---- +1 + +statement ok +drop table foo; +create table foo (i int primary key); +insert into foo values (1); + +statement ok +begin; +alter table foo add check (i <= 0) + +# Verify that SHOW COMMIT TIMESTAMP will wait for the schema changes. + +statement error pgcode XXA00 transaction committed but schema change aborted with error: \(23514\): validation of CHECK "i <= 0:::INT8" failed on row: i=1 +show commit timestamp + +statement ok +rollback; +drop table foo + +# Above we tested that we can issue RELEASE SAVEPOINT cockroach_restart after +# SHOW COMMIT TIMESTAMP if we had set it up. Here test that it is not allowed +# if it had not been added. +subtest disallow_cockroach_restart_without_savepoint + +statement ok +begin; +show commit timestamp + +statement error pgcode 25000 current transaction is committed, commands ignored until end of transaction block +release savepoint cockroach_restart; + +statement ok +rollback + +# Test that the causality token is the same timestamp as ends up on rows, +# even if the transaction gets pushed. +subtest causality_token_equals_mvcc_timestamp + +user testuser + +statement ok +create table foo (i int primary key); + +statement ok +begin; +insert into foo values (1), (3); + +let $ts1 +show commit timestamp + +statement ok +commit + +statement ok +begin; +insert into foo values (2), (4); + +user root + +statement ok +begin priority high; select * from foo; commit; + +user testuser + +let $ts2 +show commit timestamp + +statement ok +commit + +query IT + SELECT i, + CASE + WHEN ts = $ts1 THEN 'ts1' + WHEN ts = $ts2 THEN 'ts2' + END + FROM (SELECT i, crdb_internal_mvcc_timestamp AS ts FROM foo) +ORDER BY i ASC; +---- +1 ts1 +2 ts2 +3 ts1 +4 ts2 + +statement ok +drop table foo diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 94f4d3f8b381..5eb3f62fd7b2 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -1612,6 +1612,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 5b839dc8d5c6..9fd43d834555 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -1612,6 +1612,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index 486e0ae9b14a..766a79aac782 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -1626,6 +1626,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index 32853b12a6de..295312885c2d 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -1591,6 +1591,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 963d2cd5da83..8eb077e4c337 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -1619,6 +1619,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index 722b6c8a85d1..0135cbe3eee1 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -1752,6 +1752,13 @@ func TestLogic_shift( runLogicTest(t, "shift") } +func TestLogic_show_commit_timestamp( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "show_commit_timestamp") +} + func TestLogic_show_completions( t *testing.T, ) { diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 9a051d585c76..7f35ba4cb9ab 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -355,6 +355,8 @@ func TestContextualHelp(t *testing.T) { {`SHOW COLUMNS FROM ??`, `SHOW COLUMNS`}, {`SHOW COLUMNS FROM foo ??`, `SHOW COLUMNS`}, + {`SHOW COMMIT TIMESTAMP ??`, `SHOW COMMIT TIMESTAMP`}, + {`SHOW CONSTRAINTS FROM ??`, `SHOW CONSTRAINTS`}, {`SHOW CONSTRAINTS FROM foo ??`, `SHOW CONSTRAINTS`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 507b76550dc7..523169b32c73 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1185,6 +1185,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { %type show_stmt %type show_backup_stmt %type show_columns_stmt +%type show_commit_timestamp_stmt %type show_constraints_stmt %type show_create_stmt %type show_create_schedules_stmt @@ -1678,29 +1679,30 @@ stmt: } stmt_without_legacy_transaction: - preparable_stmt // help texts in sub-rule -| analyze_stmt // EXTEND WITH HELP: ANALYZE + preparable_stmt // help texts in sub-rule +| analyze_stmt // EXTEND WITH HELP: ANALYZE | copy_from_stmt | comment_stmt -| execute_stmt // EXTEND WITH HELP: EXECUTE -| deallocate_stmt // EXTEND WITH HELP: DEALLOCATE -| discard_stmt // EXTEND WITH HELP: DISCARD -| grant_stmt // EXTEND WITH HELP: GRANT -| prepare_stmt // EXTEND WITH HELP: PREPARE -| revoke_stmt // EXTEND WITH HELP: REVOKE -| savepoint_stmt // EXTEND WITH HELP: SAVEPOINT -| reassign_owned_by_stmt // EXTEND WITH HELP: REASSIGN OWNED BY -| drop_owned_by_stmt // EXTEND WITH HELP: DROP OWNED BY -| release_stmt // EXTEND WITH HELP: RELEASE -| refresh_stmt // EXTEND WITH HELP: REFRESH -| nonpreparable_set_stmt // help texts in sub-rule -| transaction_stmt // help texts in sub-rule -| close_cursor_stmt // EXTEND WITH HELP: CLOSE -| declare_cursor_stmt // EXTEND WITH HELP: DECLARE -| fetch_cursor_stmt // EXTEND WITH HELP: FETCH -| move_cursor_stmt // EXTEND WITH HELP: MOVE +| execute_stmt // EXTEND WITH HELP: EXECUTE +| deallocate_stmt // EXTEND WITH HELP: DEALLOCATE +| discard_stmt // EXTEND WITH HELP: DISCARD +| grant_stmt // EXTEND WITH HELP: GRANT +| prepare_stmt // EXTEND WITH HELP: PREPARE +| revoke_stmt // EXTEND WITH HELP: REVOKE +| savepoint_stmt // EXTEND WITH HELP: SAVEPOINT +| reassign_owned_by_stmt // EXTEND WITH HELP: REASSIGN OWNED BY +| drop_owned_by_stmt // EXTEND WITH HELP: DROP OWNED BY +| release_stmt // EXTEND WITH HELP: RELEASE +| refresh_stmt // EXTEND WITH HELP: REFRESH +| nonpreparable_set_stmt // help texts in sub-rule +| transaction_stmt // help texts in sub-rule +| close_cursor_stmt // EXTEND WITH HELP: CLOSE +| declare_cursor_stmt // EXTEND WITH HELP: DECLARE +| fetch_cursor_stmt // EXTEND WITH HELP: FETCH +| move_cursor_stmt // EXTEND WITH HELP: MOVE | reindex_stmt | unlisten_stmt +| show_commit_timestamp_stmt // EXTEND WITH HELP: SHOW COMMIT TIMESTAMP // %Help: ALTER // %Category: Group @@ -6297,8 +6299,8 @@ zone_value: // SHOW ROLES, SHOW SCHEMAS, SHOW SEQUENCES, SHOW SESSION, SHOW SESSIONS, // SHOW STATISTICS, SHOW SYNTAX, SHOW TABLES, SHOW TRACE, SHOW TRANSACTION, // SHOW TRANSACTIONS, SHOW TRANSFER, SHOW TYPES, SHOW USERS, SHOW LAST QUERY STATISTICS, -// SHOW SCHEDULES, SHOW LOCALITY, SHOW ZONE CONFIGURATION, SHOW FULL TABLE SCANS, -// SHOW CREATE EXTERNAL CONNECTIONS, SHOW TENANT +// SHOW SCHEDULES, SHOW LOCALITY, SHOW ZONE CONFIGURATION, SHOW COMMIT TIMESTAMP, +// SHOW FULL TABLE SCANS, SHOW CREATE EXTERNAL CONNECTIONS, SHOW TENANT show_stmt: show_backup_stmt // EXTEND WITH HELP: SHOW BACKUP | show_columns_stmt // EXTEND WITH HELP: SHOW COLUMNS @@ -6999,6 +7001,22 @@ show_indexes_stmt: } | SHOW KEYS error // SHOW HELP: SHOW INDEXES +// %Help: SHOW COMMIT TIMESTAMP - show timestamp commit timestamp of last transaction +// %Category: Misc +// %Text: SHOW COMMIT TIMESTAMP +// +// Shows the commit timestamp of the last committed transaction if not currently +// in a transaction. If currently in a transaction, implicitly commits the +// transaction, returning any errors which may have occurred during the commit. +// The transaction state will remain open from the perspective of the client, +// meaning that a COMMIT must be issued to move the connection back to a state +// where new statements may be issued. +show_commit_timestamp_stmt: + SHOW COMMIT TIMESTAMP + { + $$.val = &tree.ShowCommitTimestamp{} + } + // %Help: SHOW CONSTRAINTS - list constraints // %Category: DDL // %Text: SHOW CONSTRAINTS FROM diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 79d2d63f5b4b..016131aea48b 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -878,14 +878,27 @@ func (c *conn) handleSimpleQuery( return nil } + // Determine whether there is only SHOW COMMIT TIMESTAMP after this + // statement in the batch. That case should be treated as though it + // were the last statement in the batch. + lastBeforeShowCommitTimestamp := func() bool { + n := len(stmts) + isShowCommitTimestamp := func(s parser.Statement) bool { + _, ok := s.AST.(*tree.ShowCommitTimestamp) + return ok + } + return n > 1 && i == n-2 && isShowCommitTimestamp(stmts[n-1]) + } + if err := c.stmtBuf.Push( ctx, sql.ExecStmt{ - Statement: stmts[i], - TimeReceived: timeReceived, - ParseStart: startParse, - ParseEnd: endParse, - LastInBatch: i == len(stmts)-1, + Statement: stmts[i], + TimeReceived: timeReceived, + ParseStart: startParse, + ParseEnd: endParse, + LastInBatch: i == len(stmts)-1, + LastInBatchBeforeShowCommitTimestamp: lastBeforeShowCommitTimestamp(), }); err != nil { return err } diff --git a/pkg/sql/pgwire/testdata/pgtest/show_commit_timestamp b/pkg/sql/pgwire/testdata/pgtest/show_commit_timestamp new file mode 100644 index 000000000000..02acdee616ba --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/show_commit_timestamp @@ -0,0 +1,68 @@ +# This test relies on a CoockroachDB-specific feature, so everything +# is marked as crdb_only. + +only crdb +---- + +# Run a statement so we can show the commit timestamp. + +send +Parse {"Query": "SELECT 1"} +Bind +Execute +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Grab the commit timestamp of the previous transaction using the simple +# query protocol and a let binding. + +let $commit_ts +Query {"String": "SHOW COMMIT TIMESTAMP"} +---- + +# Use the extended protocol to parse, bind, and execute SHOW COMMIT TIMESTAMP. + +send +Parse {"Query": "SHOW COMMIT TIMESTAMP"} +Bind +Execute +Sync +---- + +# Observe that the timestamp is the same as the expectation. + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"$commit_ts"}]} +{"Type":"CommandComplete","CommandTag":"SHOW COMMIT TIMESTAMP 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Show that you can use portals to execute SHOW COMMIT TIMESTAMP. + +send +Parse {"Name": "s0", "Query": "SHOW COMMIT TIMESTAMP"} +Bind {"DestinationPortal": "p0", "PreparedStatement": "s0"} +Execute {"Portal": "p0"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"$commit_ts"}]} +{"Type":"CommandComplete","CommandTag":"SHOW COMMIT TIMESTAMP 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index e35f8520f571..225f55969cfe 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -110,6 +110,11 @@ func (p *planner) prepareUsingOptimizer(ctx context.Context) (planFlags, error) } stmt.Prepared.Columns = colinfo.ExplainPlanColumns return opc.flags, nil + + case *tree.ShowCommitTimestamp: + stmt.Prepared.Columns = colinfo.ShowCommitTimestampColumns + return opc.flags, nil + case *tree.DeclareCursor: // Build memo for the purposes of typing placeholders. // TODO(jordan): converting DeclareCursor to not be an opaque statement diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index a3e97b8d1ac5..adc28680c1d8 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -1029,3 +1029,23 @@ func (node *ShowCreateExternalConnections) Format(ctx *FmtCtx) { } var _ Statement = &ShowCreateExternalConnections{} + +// ShowCommitTimestamp represents a SHOW COMMIT TIMESTAMP statement. +// +// If the current session is in an open transaction state, this statement will +// implicitly commit the underlying kv transaction and return the HLC timestamp +// at which it committed. The transaction state machine will be left in a state +// such that only COMMIT or RELEASE cockroach_savepoint; COMMIT are acceptable. +// The statement may also be sent after RELEASE cockroach_savepoint; and before +// COMMIT. +// +// If the current session is not in an open transaction state, this statement +// will return the commit timestamp of the previous transaction, assuming there +// was one. +type ShowCommitTimestamp struct{} + +func (s ShowCommitTimestamp) Format(ctx *FmtCtx) { + ctx.Printf("SHOW COMMIT TIMESTAMP") +} + +var _ Statement = (*ShowCommitTimestamp)(nil) diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index efc69a5597f7..1ad57e25e699 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -1878,6 +1878,17 @@ func (*ShowCreateExternalConnections) StatementTag() string { return "SHOW CREATE EXTERNAL CONNECTIONS" } +// StatementReturnType implements the Statement interface. +func (*ShowCommitTimestamp) StatementReturnType() StatementReturnType { return Rows } + +// StatementType implements the Statement interface. +func (*ShowCommitTimestamp) StatementType() StatementType { return TypeTCL } + +// StatementTag returns a short string identifying the type of statement. +func (*ShowCommitTimestamp) StatementTag() string { + return "SHOW COMMIT TIMESTAMP" +} + // StatementReturnType implements the Statement interface. func (*Split) StatementReturnType() StatementReturnType { return Rows } @@ -2205,6 +2216,7 @@ func (n *ShowZoneConfig) String() string { return AsString( func (n *ShowFingerprints) String() string { return AsString(n) } func (n *ShowDefaultPrivileges) String() string { return AsString(n) } func (n *ShowCompletions) String() string { return AsString(n) } +func (n *ShowCommitTimestamp) String() string { return AsString(n) } func (n *Split) String() string { return AsString(n) } func (n *Unsplit) String() string { return AsString(n) } func (n *Truncate) String() string { return AsString(n) } diff --git a/pkg/sql/tests/BUILD.bazel b/pkg/sql/tests/BUILD.bazel index 36dc88573e66..dce12ddbbee4 100644 --- a/pkg/sql/tests/BUILD.bazel +++ b/pkg/sql/tests/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "repair_test.go", "rsg_test.go", "schema_changes_in_parallel_test.go", + "show_commit_timestamp_test.go", "split_test.go", "system_table_test.go", "table_split_test.go", @@ -110,9 +111,11 @@ go_test( "//pkg/util/tracing", "//pkg/util/uuid", "@com_github_cockroachdb_cockroach_go_v2//crdb", + "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_google_btree//:btree", + "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgx_v4//:pgx", "@com_github_kr_pretty//:pretty", "@com_github_lib_pq//:pq", diff --git a/pkg/sql/tests/show_commit_timestamp_test.go b/pkg/sql/tests/show_commit_timestamp_test.go new file mode 100644 index 000000000000..f8ab5228e6a1 --- /dev/null +++ b/pkg/sql/tests/show_commit_timestamp_test.go @@ -0,0 +1,183 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + gosql "database/sql" + "fmt" + "math/big" + "net/url" + "testing" + + "github.com/cockroachdb/cockroach-go/v2/crdb" + "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/require" +) + +// TestShowCommitTimestamp exercises the integration of SHOW COMMIT TIMESTAMP +// with cockroach-go and with the extended wire protocol features as used by +// pgx. +func TestShowCommitTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + resetTable := func(t *testing.T) { + t.Helper() + tdb.Exec(t, ` +DROP TABLE IF EXISTS foo; +CREATE TABLE foo (i INT PRIMARY KEY)`) + } + checkResults := func(t *testing.T, commitTimestamps []string, expTS ...int) { + t.Helper() + var exp [][]string + for i, e := range expTS { + exp = append(exp, []string{fmt.Sprint(i), commitTimestamps[e]}) + } + const checkQuery = "select i, crdb_internal_mvcc_timestamp from foo order by i asc" + tdb.CheckQueryResults(t, checkQuery, exp) + } + const showCommitTimestamp = "SHOW COMMIT TIMESTAMP" + t.Run("cockroach-go", func(t *testing.T) { + resetTable(t) + var commitTimestamp string + require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error { + if _, err := tx.Exec("INSERT INTO foo VALUES (0), (1)"); err != nil { + return err + } + if _, err := tx.Exec("INSERT INTO foo VALUES (2)"); err != nil { + return err + } + return tx.QueryRow(showCommitTimestamp).Scan(&commitTimestamp) + })) + checkResults(t, []string{commitTimestamp}, 0, 0, 0) + }) + + testutils.RunTrueAndFalse(t, "pgx batch; simple", func(t *testing.T, simple bool) { + resetTable(t) + pgURL, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "", url.User("root")) + defer cleanup() + conf, err := pgx.ParseConfig(pgURL.String()) + require.NoError(t, err) + conf.PreferSimpleProtocol = simple + conn, err := pgx.ConnectConfig(ctx, conf) + require.NoError(t, err) + defer func() { require.NoError(t, conn.Close(ctx)) }() + + var b pgx.Batch + stmts := []string{ + "INSERT INTO foo VALUES (0)", + "INSERT INTO foo VALUES (1)", + "INSERT INTO foo VALUES (2)", + "BEGIN", + "INSERT INTO foo VALUES (3)", + showCommitTimestamp, + "COMMIT", + "BEGIN", + "SAVEPOINT cockroach_restart", + "INSERT INTO foo VALUES (4)", + "INSERT INTO foo VALUES (5)", + "RELEASE cockroach_restart", + showCommitTimestamp, + "COMMIT", + "INSERT INTO foo VALUES (6)", + "INSERT INTO foo VALUES (7)", + showCommitTimestamp, + } + for _, s := range stmts { + b.Queue(s) + } + res := conn.SendBatch(ctx, &b) + var commitTimestamps []string + for _, s := range stmts { + if s != showCommitTimestamp { + _, err = res.Exec() + require.NoError(t, err) + } else { + // Support for scanning numerics into strings was not added until + // a later version of pgx than was in use at the time of writing. + var r big.Rat + require.NoError(t, res.QueryRow().Scan(&r)) + commitTimestamps = append(commitTimestamps, r.FloatString(10)) + } + } + require.NoError(t, res.Close()) + require.Len(t, commitTimestamps, 3) + checkResults(t, commitTimestamps, 0, 0, 0, 0, 1, 1, 2, 2) + }) + testutils.RunTrueAndFalse(t, "pgx with crdb; simple", func(t *testing.T, simple bool) { + resetTable(t) + pgURL, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "", url.User("root")) + defer cleanup() + conf, err := pgx.ParseConfig(pgURL.String()) + require.NoError(t, err) + conf.PreferSimpleProtocol = simple + conn, err := pgx.ConnectConfig(ctx, conf) + require.NoError(t, err) + defer func() { require.NoError(t, conn.Close(ctx)) }() + + { + _, err := conn.Exec(ctx, "select 1/0") + require.ErrorContains(t, err, "division by zero") + } + { + _, err := conn.Exec(ctx, showCommitTimestamp) + pgErr := new(pgconn.PgError) + require.True(t, errors.As(err, &pgErr)) + require.Equal(t, pgcode.InvalidTransactionState.String(), pgErr.Code) + require.ErrorContains(t, err, "no previous transaction") + } + { + _, err := conn.Exec(ctx, "insert into foo values (0)") + require.NoError(t, err) + } + var ts string + { + var tsRat big.Rat + require.NoError(t, conn.QueryRow(ctx, showCommitTimestamp).Scan(&tsRat)) + ts = tsRat.FloatString(10) + } + checkResults(t, []string{ts}, 0) + var txTs string + require.NoError(t, crdbpgx.ExecuteTx(ctx, conn, pgx.TxOptions{}, func(tx pgx.Tx) (err error) { + if _, err = tx.Exec(ctx, "insert into foo values (1), (2)"); err != nil { + return err + } + if _, err = tx.Exec(ctx, "insert into foo values (3)"); err != nil { + return err + } + var tsRat big.Rat + if err = tx.QueryRow(ctx, showCommitTimestamp).Scan(&tsRat); err != nil { + return err + } + txTs = tsRat.FloatString(10) + return nil + })) + + checkResults(t, []string{ts, txTs}, 0, 1, 1, 1) + }) +} diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index ff7830260a3b..09120f23974c 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -254,7 +254,7 @@ func (ts *txnState) resetForNewSQLTxn( // the current SQL txn. This needs to be called before resetForNewSQLTxn() is // called for starting another SQL txn. The ID of the finalized transaction is // returned. -func (ts *txnState) finishSQLTxn() (txnID uuid.UUID) { +func (ts *txnState) finishSQLTxn() (txnID uuid.UUID, commitTimestamp hlc.Timestamp) { ts.mon.Stop(ts.Ctx) sp := tracing.SpanFromContext(ts.Ctx) if sp == nil { @@ -267,16 +267,18 @@ func (ts *txnState) finishSQLTxn() (txnID uuid.UUID) { sp.Finish() ts.Ctx = nil - txnID = func() (txnID uuid.UUID) { + ts.recordingThreshold = 0 + return func() (txnID uuid.UUID, timestamp hlc.Timestamp) { ts.mu.Lock() defer ts.mu.Unlock() txnID = ts.mu.txn.ID() + if ts.mu.txn.IsCommitted() { + timestamp = ts.mu.txn.CommitTimestamp() + } ts.mu.txn = nil ts.mu.txnStart = time.Time{} - return txnID + return txnID, timestamp }() - ts.recordingThreshold = 0 - return txnID } // finishExternalTxn is a stripped-down version of finishSQLTxn used by @@ -390,6 +392,11 @@ type txnEvent struct { // When a transaction commits or aborts, txnID is set to the ID of the // transaction that just finished execution. txnID uuid.UUID + + // commitTimestamp is populated with the timestamp of the recently finished + // transaction corresponding to txnID. It will only be populated if that + // transaction committed. + commitTimestamp hlc.Timestamp } //go:generate stringer -type=txnEventType diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index bdb0cf593d06..2f717c197352 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -44,6 +44,7 @@ digraph finite_state_machine { "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = SHOW COMMIT TIMESTAMP>] "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] "Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] @@ -54,6 +55,7 @@ digraph finite_state_machine { "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = Retriable err on COMMIT>] "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = SHOW COMMIT TIMESTAMP>] "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] "Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] diff --git a/pkg/sql/txnstatetransitions_diagram.png b/pkg/sql/txnstatetransitions_diagram.png index 7abc456f3393..137472f3bd51 100644 Binary files a/pkg/sql/txnstatetransitions_diagram.png and b/pkg/sql/txnstatetransitions_diagram.png differ diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index 40bad130717f..ea6a0607c767 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -12,6 +12,7 @@ Aborted{WasUpgraded:false} TxnFinishAborted{} TxnRestart{} missing events: + TxnCommittedWithShowCommitTimestamp{} TxnFinishCommitted{} TxnReleased{} TxnStart{ImplicitTxn:false} @@ -29,6 +30,7 @@ Aborted{WasUpgraded:true} TxnFinishAborted{} TxnRestart{} missing events: + TxnCommittedWithShowCommitTimestamp{} TxnFinishCommitted{} TxnReleased{} TxnStart{ImplicitTxn:false} @@ -45,6 +47,7 @@ CommitWait{} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} SavepointRollback{} + TxnCommittedWithShowCommitTimestamp{} TxnFinishAborted{} TxnReleased{} TxnRestart{} @@ -63,6 +66,7 @@ NoTxn{} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} SavepointRollback{} + TxnCommittedWithShowCommitTimestamp{} TxnFinishAborted{} TxnFinishCommitted{} TxnReleased{} @@ -76,6 +80,7 @@ Open{ImplicitTxn:false, WasUpgraded:false} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + TxnCommittedWithShowCommitTimestamp{} TxnFinishAborted{} TxnFinishCommitted{} TxnReleased{} @@ -93,6 +98,7 @@ Open{ImplicitTxn:false, WasUpgraded:true} RetriableErr{CanAutoRetry:false, IsCommit:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} + TxnCommittedWithShowCommitTimestamp{} TxnFinishAborted{} TxnFinishCommitted{} TxnReleased{} @@ -115,6 +121,7 @@ Open{ImplicitTxn:true, WasUpgraded:false} TxnUpgradeToExplicit{} missing events: SavepointRollback{} + TxnCommittedWithShowCommitTimestamp{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} @@ -132,6 +139,7 @@ Open{ImplicitTxn:true, WasUpgraded:true} RetriableErr{CanAutoRetry:true, IsCommit:false} RetriableErr{CanAutoRetry:true, IsCommit:true} SavepointRollback{} + TxnCommittedWithShowCommitTimestamp{} TxnReleased{} TxnRestart{} TxnStart{ImplicitTxn:false} diff --git a/pkg/testutils/pgtest/datadriven.go b/pkg/testutils/pgtest/datadriven.go index 73767b64d3c7..304a69233aa0 100644 --- a/pkg/testutils/pgtest/datadriven.go +++ b/pkg/testutils/pgtest/datadriven.go @@ -14,6 +14,7 @@ import ( "context" "encoding/json" "fmt" + "os" "strings" "testing" @@ -49,7 +50,8 @@ func WalkWithNewServer( // "let": Run a query that returns a single row with a single column, and // save it in the variable named in the command argument. This variable can // be used in future "send" commands and is replaced by simple string -// substitution. +// substitution. The same variable may also be used for replacement in +// the expected output. // // "send": Sends messages to a server. Takes a newline-delimited list of // pgproto3.FrontendMessage types. Can fill in values by adding a space then @@ -97,7 +99,10 @@ func RunTest(t *testing.T, path, addr, user string) { return d.Expected } require.GreaterOrEqual(t, len(d.CmdArgs), 1, "at least one argument required for let") - require.Truef(t, strings.HasPrefix(d.CmdArgs[0].Key, "$"), "let argument must begin with '$'") + require.Regexp( + t, `^\$[0-9A-Za-z_]+`, d.CmdArgs[0].Key, + "let argument must begin with '$' and only contain word characters", + ) lines := strings.Split(d.Input, "\n") require.Len(t, lines, 1, "only one input command permitted for let") require.Truef(t, strings.HasPrefix(lines[0], "Query "), "let must use a Query command") @@ -155,7 +160,20 @@ func RunTest(t *testing.T, path, addr, user string) { if err != nil { t.Fatalf("%s: %+v", d.Pos, err) } - return MsgsToJSONWithIgnore(msgs, d) + out := MsgsToJSONWithIgnore(msgs, d) + // If the expected output with the variables replaced with their current + // value matches the expected output, allow the expected output to be + // used directly. Otherwise, return the generated output. + expanded := os.Expand(d.Expected, func(s string) string { + if v, ok := vars["$"+s]; ok { + return v + } + return s + }) + if expanded == out { + return d.Expected + } + return out default: t.Fatalf("unknown command %s", d.Cmd) return "" diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index e388a56df282..d9aa6e5f6a84 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -478,8 +478,8 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte // Call f on another goroutine. taskStarted = true // Another goroutine now takes ownership of the alloc, if any. go func() { - defer sp.Finish() defer s.runPostlude() + defer sp.Finish() defer s.recover(ctx) if alloc != nil { defer alloc.Release()