Skip to content

Commit

Permalink
sql: make SQL statements operate on a read snapshot
Browse files Browse the repository at this point in the history
Previously, all individual KV reads performed by a SQL statement were
able to observe the most recent KV writes that it performed itself.

This is in violation of PostgreSQL's dialect semantics, which mandate
that statements can only observe data as per a read snapshot taken at
the instant a statement begins execution.

Moreover, this invalid behavior causes a real observable bug: a
statement that reads and writes to the same table may never complete,
as the read part may become able to consume the rows that it itself
writes. Or worse, it could cause logical operations to be performed
multiple times: https://en.wikipedia.org/wiki/Halloween_Problem

This patch fixes it (partially) by exploiting the new KV `Step()` API
which decouples the read and write sequence numbers.

The fix is not complete however; additional sequence points must also
be introduced prior to FK existence checks and cascading actions. See
[cockroachdb#42864](cockroachdb#42864) and
[cockroachdb#33475](cockroachdb#33475) for
details.

For now, this patch excludes any mutation that 1) involves a foreign
key and 2) does not uyse the new CBO-driven FK logic, from the
new (fixed) semantics. When a mutation involves a FK without CBO
involvement, the previous (broken) semantics still apply.

Release note (bug fix): SQL mutation statements that target tables
with no foreign key relationships now correctly read data as per the
state of the database when the statement started execution. This is
required for compatibility with PostgreSQL and to ensure deterministic
behavior when certain operations are parallelized. Prior to this fix,
a statement [could incorrectly operate multiple
times](https://en.wikipedia.org/wiki/Halloween_Problem) on data that
itself was writing, and potentially never terminate. This fix is
limited to tables without FK relationships, and for certain operations
on tables with FK relationships; in other cases, the fix is not
active and the bug is still present. A full fix will be provided
in a later release.
  • Loading branch information
knz committed Jan 20, 2020
1 parent ef92982 commit 2f26c54
Show file tree
Hide file tree
Showing 40 changed files with 480 additions and 122 deletions.
4 changes: 2 additions & 2 deletions pkg/internal/client/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (m *MockTransactionalSender) Step() error {
}

// ConfigureStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) ConfigureStepping(SteppingMode) (SteppingMode, error) {
func (m *MockTransactionalSender) ConfigureStepping(SteppingMode) SteppingMode {
// See Step() above.
return SteppingDisabled, nil
return SteppingDisabled
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ type TxnSender interface {
// behavior opt-in and backward-compatible with existing code which
// does not need it.
//
// Calling ConfigureStepping(true) when the stepping mode is
// currently disabled implies calling Step(), for convenience.
ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error)
// Calling ConfigureStepping(SteppingEnabled) when the stepping mode
// is currently disabled implies calling Step(), for convenience.
ConfigureStepping(mode SteppingMode) (prevMode SteppingMode)
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
16 changes: 5 additions & 11 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL.
func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
// ConfigureStepping is guaranteed to not return an error on root txns.
_, _ = txn.ConfigureStepping(SteppingEnabled)
_ = txn.ConfigureStepping(SteppingEnabled)
return txn
}

Expand Down Expand Up @@ -1189,8 +1188,8 @@ func (txn *Txn) Active() bool {
return txn.mu.sender.Active()
}

// Step enables step-wise execution in the transaction, or
// performs a step if step-wise execution is already enabled.
// Step performs a sequencing step. Step-wise execution must be
// already enabled.
//
// In step-wise execution, reads operate at a snapshot established at
// the last step, instead of the latest write if not yet enabled.
Expand All @@ -1205,14 +1204,9 @@ func (txn *Txn) Step() error {

// ConfigureStepping configures step-wise execution in the
// transaction.
//
// This function is guaranteed to not return an error if it previously
// succeeded once with some txn and mode, then provided its own return
// value as input for a second call. This simplifies the implementation
// of push/pop semantics.
func (txn *Txn) ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error) {
func (txn *Txn) ConfigureStepping(mode SteppingMode) (prevMode SteppingMode) {
if txn.typ != RootTxn {
return false, errors.AssertionFailedf("txn.DisableStepping() only allowed in RootTxn")
panic(errors.AssertionFailedf("txn.ConfigureStepping() only allowed in RootTxn"))
}
txn.mu.Lock()
defer txn.mu.Unlock()
Expand Down
13 changes: 4 additions & 9 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,19 +1086,14 @@ func (tc *TxnCoordSender) Step() error {
return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked()
}

// DisableStepping is part of the TxnSender interface.
// ConfigureStepping is part of the TxnSender interface.
func (tc *TxnCoordSender) ConfigureStepping(
mode client.SteppingMode,
) (prevMode client.SteppingMode, err error) {
) (prevMode client.SteppingMode) {
if tc.typ != client.RootTxn {
return client.SteppingDisabled, errors.AssertionFailedf("cannot call DisableStepping() in leaf txn")
panic(errors.AssertionFailedf("cannot call ConfigureStepping() in leaf txn"))
}
tc.mu.Lock()
defer tc.mu.Unlock()
prevEnabled := tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode == client.SteppingEnabled)
prevMode = client.SteppingDisabled
if prevEnabled {
prevMode = client.SteppingEnabled
}
return prevMode, nil
return tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode)
}
24 changes: 20 additions & 4 deletions pkg/kv/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kv
import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -141,14 +142,29 @@ func (s *txnSeqNumAllocator) stepLocked() error {
}

// configureSteppingLocked configures the stepping mode.
// Used by the TxnCoordSender's ConfigureStepping() method.
func (s *txnSeqNumAllocator) configureSteppingLocked(enabled bool) (prevEnabled bool) {
prevEnabled = s.steppingModeEnabled
//
// When enabling stepping from the non-enabled state, the read seqnum
// is set to the current write seqnum, as if a snapshot was taken at
// the point stepping was enabled.
//
// The read seqnum is otherwise not modified when trying to enable
// stepping when it was previously enabled already. This is the
// behavior needed to provide the documented API semantics of
// sender.ConfigureStepping() (see client/sender.go).
func (s *txnSeqNumAllocator) configureSteppingLocked(
newMode client.SteppingMode,
) (prevMode client.SteppingMode) {
prevEnabled := s.steppingModeEnabled
enabled := newMode == client.SteppingEnabled
s.steppingModeEnabled = enabled
if !prevEnabled && enabled {
s.readSeq = s.writeSeq
}
return prevEnabled
prevMode = client.SteppingDisabled
if prevEnabled {
prevMode = client.SteppingEnabled
}
return prevMode
}

// epochBumpedLocked is part of the txnInterceptor interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode,
return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER INDEX performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterIndexNode) ReadingOwnWrites() {}

func (n *alterIndexNode) startExec(params runParams) error {
// Commands can either change the descriptor directly (for
// alterations that don't require a backfill) or add a mutation to
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (p *planner) AlterSequence(ctx context.Context, n *tree.AlterSequence) (pla
return &alterSequenceNode{n: n, seqDesc: seqDesc}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER SEQUENCE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterSequenceNode) ReadingOwnWrites() {}

func (n *alterSequenceNode) startExec(params runParams) error {
desc := n.seqDesc

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode,
}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER TABLE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterTableNode) ReadingOwnWrites() {}

func (n *alterTableNode) startExec(params runParams) error {
// Commands can either change the descriptor directly (for
// alterations that don't require a backfill) or add a mutation to
Expand Down
45 changes: 42 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,40 @@ func (ex *connExecutor) execStmtInOpenState(
discardRows = s.DiscardRows
}

// For regular statements (the ones that get to this point), we don't return
// any event unless an an error happens.
// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

// The first order of business is to create a sequencing point. As
// per PostgreSQL's dialect specs, the "read" part of statements
// always see the data as per a snapshot of the database taken the
// instant the statement begins to run. In particular a mutation
// does not see its own writes. If a query contains multiple
// mutations using CTEs (WITH) or a read part following a mutation,
// all still operate on the same read snapshot.
//
// (To communicate data between CTEs and a main query, the result
// set / RETURNING can be used instead. However this is not relevant
// here.)
//
// This is not the only place where a sequencing point is placed. There
// are also sequencing point after every stage of constraint checks
// and cascading actions at the _end_ of a statement's execution.
//
// TODO(knz): At the time of this writing CockroachDB performs
// cascading actions and the corresponding FK existence checks
// interleaved with mutations. This is incorrect; the correct
// behavior, as described in issue
// https://github.com/cockroachdb/cockroach/issues/33475, is to
// execute cascading actions no earlier than after all the "main
// effects" of the current statement (including all its CTEs) have
// completed. There should be a sequence point between the end of
// the main execution and the start of the cascading actions, as
// well as in-between very stage of cascading actions.
// This TODO can be removed when the cascading code is reorganized
// accordingly and the missing call to Step() is introduced.
if err := ex.state.mu.txn.Step(); err != nil {
return makeErrEvent(err)
}

p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
Expand Down Expand Up @@ -423,6 +455,13 @@ func (ex *connExecutor) execStmtInOpenState(
}

txn := ex.state.mu.txn

// Re-enable stepping mode after the execution has completed.
// This is necessary because until https://github.com/cockroachdb/cockroach/issues/33475 is fixed
// any mutation with FK work unconditionally disables
// stepping mode when it starts.
_ = txn.ConfigureStepping(client.SteppingEnabled)

if !os.ImplicitTxn.Get() && txn.IsSerializablePushAndRefreshNotPossible() {
rc, canAutoRetry := ex.getRewindTxnCapability()
if canAutoRetry {
Expand Down Expand Up @@ -553,7 +592,7 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error

// Create a new transaction to retry with a higher timestamp than the
// timestamps used in the retry loop above.
ex.state.mu.txn = client.NewTxn(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID)
ex.state.mu.txn = client.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID)
if err := ex.state.mu.txn.SetUserPriority(userPriority); err != nil {
return err
}
Expand Down
72 changes: 71 additions & 1 deletion pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLExecutor = aborter.executorKnobs()
s, mainDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
defer s.Stopper().Stop(context.Background())
{
pgURL, cleanup := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "TestSessionFinishRollsBackTxn", url.User(security.RootUser))
Expand Down Expand Up @@ -348,6 +349,75 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT);
}
}

func TestHalloweenProblemAvoidance(t *testing.T) {
defer leaktest.AfterTest(t)()

// Populate a sufficiently large number of rows. We want at least as
// many rows as an insert can batch in its output buffer (to force a
// buffer flush), plus as many rows as a fetcher can fetch at once
// (to force a read buffer update), plus some more.
//
// Instead of customizing the working set size of the test up to the
// default settings for the SQL package, we scale down the config
// of the SQL package to the test. The reason for this is that
// race-enable builds are very slow and the default batch sizes
// would cause the test duration to exceed the timeout.
//
// We are also careful to override these defaults before starting
// the server, so as to not risk updating them concurrently with
// some background SQL activity.
const smallerKvBatchSize = 10
defer row.TestingSetKVBatchSize(smallerKvBatchSize)()
const smallerInsertBatchSize = 5
defer sql.TestingSetInsertBatchSize(smallerInsertBatchSize)()
numRows := smallerKvBatchSize + smallerInsertBatchSize + 10

params, _ := tests.CreateTestServerParams()
params.Insecure = true
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (x FLOAT);
`); err != nil {
t.Fatal(err)
}

if _, err := db.Exec(
`INSERT INTO t.test(x) SELECT generate_series(1, $1)::FLOAT`,
numRows); err != nil {
t.Fatal(err)
}

// Now slightly modify the values in duplicate rows.
// We choose a float +0.1 to ensure that none of the derived
// values become duplicate of already-present values.
if _, err := db.Exec(`
INSERT INTO t.test(x)
-- the if ensures that no row is processed two times.
SELECT IF(x::INT::FLOAT = x,
x,
crdb_internal.force_error(
'NOOPE', 'insert saw its own writes: ' || x::STRING || ' (it is halloween today)')::FLOAT)
+ 0.1
FROM t.test
`); err != nil {
t.Fatal(err)
}

// Finally verify that no rows has been operated on more than once.
row := db.QueryRow(`SELECT count(DISTINCT x) FROM t.test`)
var cnt int
if err := row.Scan(&cnt); err != nil {
t.Fatal(err)
}

if cnt != 2*numRows {
t.Fatalf("expected %d rows in final table, got %d", 2*numRows, cnt)
}
}

func TestAppNameStatisticsInitialization(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (c *copyMachine) preparePlanner(ctx context.Context) func(context.Context,
stmtTs := c.txnOpt.stmtTimestamp
autoCommit := false
if txn == nil {
txn = client.NewTxn(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get())
txn = client.NewTxnWithSteppingEnabled(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get())
txnTs = c.p.execCfg.Clock.PhysicalTime()
stmtTs = txnTs
autoCommit = true
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func MakeIndexDescriptor(n *tree.CreateIndex) (*sqlbase.IndexDescriptor, error)
return &indexDesc, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because CREATE INDEX performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *createIndexNode) ReadingOwnWrites() {}

func (n *createIndexNode) startExec(params runParams) error {
_, dropped, err := n.tableDesc.FindIndexByName(string(n.n.Name))
if err == nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/create_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (p *planner) CreateSequence(ctx context.Context, n *tree.CreateSequence) (p
}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because CREATE SEQUENCE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *createSequenceNode) ReadingOwnWrites() {}

func (n *createSequenceNode) startExec(params runParams) error {
// TODO(arul): Allow temporary sequences once temp tables work for regular tables.
if n.n.Temporary {
Expand Down
Loading

0 comments on commit 2f26c54

Please sign in to comment.