Skip to content

Commit

Permalink
kv,internal: evolve the step execution API
Browse files Browse the repository at this point in the history
This patch reworks the TCS Step API to make the behavior
explicitly configurable with a bool (instead of implicit
enable at the first call of Step).

This is necessary because SQL needs push/pop semantics on the stpping
mode.

Release note: None
  • Loading branch information
knz committed Jan 15, 2020
1 parent 0046d62 commit e58c722
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 29 deletions.
14 changes: 11 additions & 3 deletions pkg/internal/client/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,18 @@ func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg
}

// Step is part of the TxnSender interface.
func (m *MockTransactionalSender) Step() error { panic("unimplemented") }
func (m *MockTransactionalSender) Step() error {
// At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires
// the ability to run simple statements that do not access storage,
// and that requires a non-panicky Step().
return nil
}

// DisableStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) DisableStepping() error { panic("unimplemented") }
// ConfigureStepping is part of the TxnSender interface.
func (m *MockTransactionalSender) ConfigureStepping(SteppingMode) (SteppingMode, error) {
// See Step() above.
return SteppingDisabled, nil
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
Expand Down
35 changes: 24 additions & 11 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,37 @@ type TxnSender interface {
// operations observe the data at the time the snapshot was
// established and ignore writes performed since.
//
// Before the first step is taken, the transaction operates as if
// there was a step after every write: each read to a key is able to
// see the latest write before it. This makes the step behavior
// opt-in and backward-compatible with existing code which does not
// need it.
// Step() can only be called after stepping mode has been enabled
// using ConfigureStepping(SteppingEnabled).
//
// The method is idempotent.
Step() error

// DisableStepping disables the sequencing point behavior and
// ensures that every read can read the latest write. The effect
// remains disabled until the next call to Step(). The method is
// idempotent.
// ConfigureStepping sets the sequencing point behavior.
//
// Note that a Sender is initially in the non-stepping mode,
// i.e. uses reads-own-writes by default.
DisableStepping() error
// i.e. uses reads-own-writes by default. This makes the step
// behavior opt-in and backward-compatible with existing code which
// does not need it.
//
// Calling ConfigureStepping(true) when the stepping mode is
// currently disabled implies calling Step(), for convenience.
ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error)
}

// SteppingMode is the argument type to ConfigureStepping.
type SteppingMode bool

const (
// SteppingDisabled is the default mode, where each read can
// observe the latest write.
SteppingDisabled SteppingMode = false

// SteppingEnabled can be set to indicate that read operations
// operate on a snapshot taken at the latest Step() invocation.
SteppingEnabled SteppingMode = true
)

// TxnStatusOpt represents options for TxnSender.GetMeta().
type TxnStatusOpt int

Expand Down
41 changes: 40 additions & 1 deletion pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Txn struct {
}

// NewTxn returns a new RootTxn.
// Note: for SQL usage, prefer NewTxnWithSteppingEnabled() below.
//
// If the transaction is used to send any operations, CommitOrCleanup() or
// CleanupOnError() should eventually be called to commit/rollback the
Expand Down Expand Up @@ -109,6 +110,14 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
}

// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL.
func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
// ConfigureStepping is guaranteed to not return an error on root txns.
_, _ = txn.ConfigureStepping(SteppingEnabled)
return txn
}

// NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized.
// Do not use this directly; use NewTxn() instead.
// This function exists for testing only.
Expand Down Expand Up @@ -766,7 +775,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}
}

cause := errors.Cause(err)
cause := errors.UnwrapAll(err)

var retryable bool
switch t := cause.(type) {
Expand Down Expand Up @@ -1179,3 +1188,33 @@ func (txn *Txn) Active() bool {
defer txn.mu.Unlock()
return txn.mu.sender.Active()
}

// Step enables step-wise execution in the transaction, or
// performs a step if step-wise execution is already enabled.
//
// In step-wise execution, reads operate at a snapshot established at
// the last step, instead of the latest write if not yet enabled.
func (txn *Txn) Step() error {
if txn.typ != RootTxn {
return errors.AssertionFailedf("txn.Step() only allowed in RootTxn")
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.Step()
}

// ConfigureStepping configures step-wise execution in the
// transaction.
//
// This function is guaranteed to not return an error if it previously
// succeeded once with some txn and mode, then provided its own return
// value as input for a second call. This simplifies the implementation
// of push/pop semantics.
func (txn *Txn) ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error) {
if txn.typ != RootTxn {
return false, errors.AssertionFailedf("txn.DisableStepping() only allowed in RootTxn")
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.ConfigureStepping(mode)
}
14 changes: 10 additions & 4 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,18 @@ func (tc *TxnCoordSender) Step() error {
}

// DisableStepping is part of the TxnSender interface.
func (tc *TxnCoordSender) DisableStepping() error {
func (tc *TxnCoordSender) ConfigureStepping(
mode client.SteppingMode,
) (prevMode client.SteppingMode, err error) {
if tc.typ != client.RootTxn {
return errors.AssertionFailedf("cannot call DisableStepping() in leaf txn")
return client.SteppingDisabled, errors.AssertionFailedf("cannot call DisableStepping() in leaf txn")
}
tc.mu.Lock()
defer tc.mu.Unlock()
tc.interceptorAlloc.txnSeqNumAllocator.disableSteppingLocked()
return nil
prevEnabled := tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode == client.SteppingEnabled)
prevMode = client.SteppingDisabled
if prevEnabled {
prevMode = client.SteppingEnabled
}
return prevMode, nil
}
21 changes: 13 additions & 8 deletions pkg/kv/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,26 @@ func (s *txnSeqNumAllocator) importLeafFinalState(tfs *roachpb.LeafTxnFinalState
// stepLocked bumps the read seqnum to the current write seqnum.
// Used by the TxnCoordSender's Step() method.
func (s *txnSeqNumAllocator) stepLocked() error {
if s.steppingModeEnabled && s.readSeq > s.writeSeq {
if !s.steppingModeEnabled {
return errors.AssertionFailedf("stepping mode is not enabled")
}
if s.readSeq > s.writeSeq {
return errors.AssertionFailedf(
"cannot step() after mistaken initialization (%d,%d)", s.writeSeq, s.readSeq)
}
s.steppingModeEnabled = true
s.readSeq = s.writeSeq
return nil
}

// disableSteppingLocked cancels the stepping behavior and
// restores read-latest-write behavior.
// Used by the TxnCoordSender's DisableStepping() method.
func (s *txnSeqNumAllocator) disableSteppingLocked() {
s.steppingModeEnabled = false
s.readSeq = 0
// configureSteppingLocked configures the stepping mode.
// Used by the TxnCoordSender's ConfigureStepping() method.
func (s *txnSeqNumAllocator) configureSteppingLocked(enabled bool) (prevEnabled bool) {
prevEnabled = s.steppingModeEnabled
s.steppingModeEnabled = enabled
if !prevEnabled && enabled {
s.readSeq = s.writeSeq
}
return prevEnabled
}

// epochBumpedLocked is part of the txnInterceptor interface.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/txn_interceptor_seq_num_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) {
txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

s.configureSteppingLocked(true /* enabled */)

for i := 1; i <= 3; i++ {
if err := s.stepLocked(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -195,8 +197,8 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) {
})
}

// Check that step-wise execution is disabled by DisableStepping().
s.disableSteppingLocked()
// Check that step-wise execution is disabled by ConfigureStepping(SteppingDisabled).
s.configureSteppingLocked(false /* enabled */)
currentStepSeqNum := s.writeSeq

var ba roachpb.BatchRequest
Expand Down

0 comments on commit e58c722

Please sign in to comment.