Skip to content

Commit

Permalink
sql: add mechanism to retrieve causality token
Browse files Browse the repository at this point in the history
Fixes #79591

Relates to #7945

Release note (<category, see below>): A new adds a new special sentinel builtin
`crdb_internal.commit_with_causality_token` has been added which can be used
exclusively as the only expression in a `SELECT` statement at the end of an
explicit transaction for the purpose of committing the the transaction and
retrieving the now committed transaction's commit timestamp. This timestamp
can then be used as the input to, say, an AS OF SYSTEM TIME clause or a
changefeed in order to observe the exact snapshot at which the transaction
committed. Note that after issuing such a statement successfully, the client
must still issue a COMMIT to reset the connection state for the next
transaction.
  • Loading branch information
ajwerner committed May 3, 2022
1 parent 91aa127 commit 13359eb
Show file tree
Hide file tree
Showing 12 changed files with 483 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3001,6 +3001,7 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.cluster_setting_encoded_default"></a><code>crdb_internal.cluster_setting_encoded_default(setting: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the encoded default value of the given cluster setting.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.commit_with_causality_token"></a><code>crdb_internal.commit_with_causality_token() &rarr; <a href="decimal.html">decimal</a></code></td><td></td></tr>
<tr><td><a name="crdb_internal.completed_migrations"></a><code>crdb_internal.completed_migrations() &rarr; <a href="string.html">string</a>[]</code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.create_join_token"></a><code>crdb_internal.create_join_token() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Creates a join token for use when adding a new node to a secure cluster.</p>
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"comment_on_table.go",
"compact_sql_stats.go",
"conn_executor.go",
"conn_executor_commit_with_causality_token.go",
"conn_executor_exec.go",
"conn_executor_prepare.go",
"conn_executor_savepoints.go",
Expand Down Expand Up @@ -494,6 +495,7 @@ go_test(
"comment_on_database_test.go",
"comment_on_index_test.go",
"comment_on_table_test.go",
"commit_with_causality_token_test.go",
"conn_executor_internal_test.go",
"conn_executor_savepoints_test.go",
"conn_executor_test.go",
Expand Down
67 changes: 67 additions & 0 deletions pkg/sql/commit_with_causality_token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestIsSelectWithCausalityToken tests whether given sql statements match the
// criteria for isSelectWithCausalityToken.
func TestIsSelectWithCausalityToken(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, tc := range []struct {
in string
exp bool
}{
{"Select crdb_internal.commit_with_causality_token()", true},
{"select crdb_inteRnal.comMit_wiTh_cauSality_toKen()", true},
{`select "crdb_internal".comMit_wiTh_cauSality_toKen()`, true},
{"select crdb_inteRnal.comMit_wiTh_cauSality_toKen(), 1", false},
{"select crdb_internal.commit_with_causality_token() from crdb_internal.ranges_no_leases", false},
{"select crdb_internal.commit_with_causality_token() from generate_series(1, 100)", false},
{`select distinct "crdb_internal".comMit_wiTh_cauSality_toKen()`, false},
{`select "crdb_inteRnal".comMit_wiTh_cauSality_toKen()`, false},
{"(select crdb_internal.commit_with_causality_token())", false},
{"(select crdb_inteRnal.comMit_wiTh_cauSality_toKen())", false},
{`(select "crdb_internal".comMit_wiTh_cauSality_toKen())`, false},
{`(select "crdb_inteRnal".comMit_wiTh_cauSality_toKen())`, false},
{`((select "crdb_internal".comMit_wiTh_cauSality_toKen()))`, false},
{`SELECT ((select "crdb_internal".comMit_wiTh_cauSality_toKen()))`, false},
{"SELECT crdb_internal.commit_with_causality_token() FOR UPDATE ", false},
{"Select crdb_internal.commit_with_causality_token", false},
{"with a as (select 1) select crdb_internal.commit_with_causality_token()", false},
{"(select crdb_internal.commit_with_causality_token() limit 0)", false},
{"(select crdb_internal.commit_with_causality_token()) limit 0", false},
{"select crdb_internal.commit_with_causality_token() limit 0", false},
{"(select crdb_internal.commit_with_causality_token() where true)", false},
{"select crdb_internal.commit_with_causality_token() where true", false},
{"select crdb_internal.commit_with_causality_token() having true", false},
{"select crdb_internal.commit_with_causality_token() order by 1", false},
{"(select crdb_internal.commit_with_causality_token()) order by 1", false},
{"(select crdb_internal.commit_with_causality_token() order by 1)", false},
} {
t.Run(tc.in, func(t *testing.T) {
stmts, err := parser.Parse(tc.in)
require.NoError(t, err)
require.Len(t, stmts, 1)
require.Equalf(
t, tc.exp, isSelectCommitWithCausalityToken(stmts[0].AST),
"%s", stmts[0].SQL,
)
})
}
}
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,12 @@ type connExecutor struct {
rowsWrittenLogged bool
rowsReadLogged bool

// shouldAcceptReleaseSavepointCockroachRestart is set to true
// when entering the commitOrReleaseWaitState. If set to true, the
// commitOrReleaseWaitState will allow one instance of RELEASE SAVEPOINT
// cockroach_restart.
shouldAcceptReleaseSavepointCockroachRestart bool

// hasAdminRole is used to cache if the user running the transaction
// has admin privilege. hasAdminRoleCache is set for the first statement
// in a transaction.
Expand Down
114 changes: 114 additions & 0 deletions pkg/sql/conn_executor_commit_with_causality_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
)

// handleCommitWithCausalityToken deals with the special statement
// SELECT commit_with_causality_token while in the OpenState of an
// explicit transaction.
func (ex *connExecutor) handleCommitWithCausalityToken(
ctx context.Context, res RestrictedCommandResult,
) (fsm.Event, fsm.EventPayload, error) {
res.ResetStmtType((*tree.CommitTransaction)(nil))
err := ex.commitSQLTransactionInternal(ctx)
if err == nil {

res.SetColumns(ctx, colinfo.ResultColumns{
{
Name: "causality_token",
Typ: types.Decimal,
Hidden: false,
},
})
if err := res.AddRow(ctx, tree.Datums{
eval.TimestampToDecimalDatum(ex.planner.Txn().CommitTimestamp()),
}); err != nil {
return nil, nil, err
}

// If we have a SAVEPOINT cockroach_restart, then we need to note that
// fact now, as the SAVEPOINT stack will be destroyed as the state
// machine moves into COMMIT. This state in extraTxnState will be cleaned
// up as we process any statement in CommitWait.
if entry, _ := ex.extraTxnState.savepoints.find(
commitOnReleaseSavepointName,
); entry != nil && entry.commitOnRelease {
ex.extraTxnState.shouldAcceptReleaseSavepointCockroachRestart = true
}

return eventTxnCommittedWithCausalityToken{}, nil, nil
}

// Committing the transaction failed. We'll go to state RestartWait if
// it's a retriable error, or to state RollbackWait otherwise.
if errIsRetriable(err) {
rc, canAutoRetry := ex.getRewindTxnCapability()
ev := eventRetriableErr{
IsCommit: fsm.FromBool(false /* isCommit */),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
payload := eventRetriableErrPayload{err: err, rewCap: rc}
return ev, payload, nil
}

ev := eventNonRetriableErr{IsCommit: fsm.FromBool(false)}
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil
}

// isSelectWithCausalityToken returns true if the statement is exactly the
// following, modulo capitalization:
//
// SELECT crdb_internal.commit_with_causality_token
//
func isSelectCommitWithCausalityToken(ast tree.Statement) bool {
sel, ok := ast.(*tree.Select)
if !ok {
return false
}
selStmt := sel.Select
if sel.With != nil || sel.Locking != nil || sel.Limit != nil || sel.OrderBy != nil {
return false
}
// We intentionally don't open up ParenSelect clauses.
sc, ok := selStmt.(*tree.SelectClause)
if !ok {
return false
}
// TODO(ajwerner): Find a more exhaustive way to do this.
if len(sc.From.Tables) != 0 || len(sc.Exprs) != 1 || sc.Distinct ||
sc.Where != nil || sc.GroupBy != nil || sc.Having != nil || sc.Window != nil ||
sc.From.AsOf.Expr != nil {
return false
}
funcExpr, isFuncExpr := sc.Exprs[0].Expr.(*tree.FuncExpr)
if !isFuncExpr || len(funcExpr.Exprs) != 0 {
return false
}
name, isName := funcExpr.Func.FunctionReference.(*tree.UnresolvedName)
if !isName || name.NumParts != 2 ||
name.Parts[1] != catconstants.CRDBInternalSchemaName ||
name.Parts[0] != builtins.CommitWithCausalityTokenName {
return false
}
return true
}
13 changes: 13 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ func (ex *connExecutor) execStmtInOpenState(

p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)

// Handle the special SELECT crdb_internal.commit_with_causality_token.
if !p.extendedEvalCtx.TxnImplicit && isSelectCommitWithCausalityToken(ast) {
return ex.handleCommitWithCausalityToken(ctx, res)
}

// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

Expand Down Expand Up @@ -1691,6 +1696,14 @@ func (ex *connExecutor) execStmtInCommitWaitState(
ex.incrementExecutedStmtCounter(ast)
}
}()
defer func() {
ex.extraTxnState.shouldAcceptReleaseSavepointCockroachRestart = false
}()
if s, ok := ast.(*tree.ReleaseSavepoint); ok &&
s.Savepoint == commitOnReleaseSavepointName &&
ex.extraTxnState.shouldAcceptReleaseSavepointCockroachRestart {
return nil, nil
}
switch ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
// Reply to a rollback with the COMMIT tag, by analogy to what we do when we
Expand Down
40 changes: 31 additions & 9 deletions pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,26 @@ type eventTxnRestart struct{}
// generated by releasing regular savepoints.
type eventTxnReleased struct{}

// eventTxnCommittedWithCausalityToken is generated after a successful
// SELECT crdb_internal.commit_with_causality_token(). It moves the state
// to CommitWait.
type eventTxnCommittedWithCausalityToken struct{}

// payloadWithError is a common interface for the payloads that wrap an error.
type payloadWithError interface {
errorCause() error
}

func (eventTxnStart) Event() {}
func (eventTxnFinishCommitted) Event() {}
func (eventTxnFinishAborted) Event() {}
func (eventSavepointRollback) Event() {}
func (eventNonRetriableErr) Event() {}
func (eventRetriableErr) Event() {}
func (eventTxnRestart) Event() {}
func (eventTxnReleased) Event() {}
func (eventTxnUpgradeToExplicit) Event() {}
func (eventTxnStart) Event() {}
func (eventTxnFinishCommitted) Event() {}
func (eventTxnFinishAborted) Event() {}
func (eventSavepointRollback) Event() {}
func (eventNonRetriableErr) Event() {}
func (eventRetriableErr) Event() {}
func (eventTxnRestart) Event() {}
func (eventTxnReleased) Event() {}
func (eventTxnCommittedWithCausalityToken) Event() {}
func (eventTxnUpgradeToExplicit) Event() {}

// TxnStateTransitions describe the transitions used by a connExecutor's
// fsm.Machine. Args.Extended is a txnState, which is muted by the Actions.
Expand Down Expand Up @@ -337,6 +343,22 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
return nil
},
},
eventTxnCommittedWithCausalityToken{}: {
Description: "SELECT crdb_internal.commit_with_causality_token()",
Next: stateCommitWait{},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
ts.mu.Lock()
txnID := ts.mu.txn.ID()
ts.mu.Unlock()
ts.setAdvanceInfo(
advanceOne,
noRewind,
txnEvent{eventType: txnCommit, txnID: txnID},
)
return nil
},
},
eventTxnReleased{}: {
Description: "RELEASE SAVEPOINT cockroach_restart",
Next: stateCommitWait{},
Expand Down
Loading

0 comments on commit 13359eb

Please sign in to comment.