From e58c7227127100758b3bd367ce2e590bd71007f7 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Nov 2019 09:17:08 +0100 Subject: [PATCH] kv,internal: evolve the step execution API This patch reworks the TCS Step API to make the behavior explicitly configurable with a bool (instead of implicit enable at the first call of Step). This is necessary because SQL needs push/pop semantics on the stpping mode. Release note: None --- .../client/mock_transactional_sender.go | 14 +++++-- pkg/internal/client/sender.go | 35 +++++++++++----- pkg/internal/client/txn.go | 41 ++++++++++++++++++- pkg/kv/txn_coord_sender.go | 14 +++++-- pkg/kv/txn_interceptor_seq_num_allocator.go | 21 ++++++---- .../txn_interceptor_seq_num_allocator_test.go | 6 ++- 6 files changed, 102 insertions(+), 29 deletions(-) diff --git a/pkg/internal/client/mock_transactional_sender.go b/pkg/internal/client/mock_transactional_sender.go index 60658038e314..475bb37ba314 100644 --- a/pkg/internal/client/mock_transactional_sender.go +++ b/pkg/internal/client/mock_transactional_sender.go @@ -164,10 +164,18 @@ func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg } // Step is part of the TxnSender interface. -func (m *MockTransactionalSender) Step() error { panic("unimplemented") } +func (m *MockTransactionalSender) Step() error { + // At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires + // the ability to run simple statements that do not access storage, + // and that requires a non-panicky Step(). + return nil +} -// DisableStepping is part of the TxnSender interface. -func (m *MockTransactionalSender) DisableStepping() error { panic("unimplemented") } +// ConfigureStepping is part of the TxnSender interface. +func (m *MockTransactionalSender) ConfigureStepping(SteppingMode) (SteppingMode, error) { + // See Step() above. + return SteppingDisabled, nil +} // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index f7201650fc54..88e86a39e0a8 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -239,24 +239,37 @@ type TxnSender interface { // operations observe the data at the time the snapshot was // established and ignore writes performed since. // - // Before the first step is taken, the transaction operates as if - // there was a step after every write: each read to a key is able to - // see the latest write before it. This makes the step behavior - // opt-in and backward-compatible with existing code which does not - // need it. + // Step() can only be called after stepping mode has been enabled + // using ConfigureStepping(SteppingEnabled). + // // The method is idempotent. Step() error - // DisableStepping disables the sequencing point behavior and - // ensures that every read can read the latest write. The effect - // remains disabled until the next call to Step(). The method is - // idempotent. + // ConfigureStepping sets the sequencing point behavior. // // Note that a Sender is initially in the non-stepping mode, - // i.e. uses reads-own-writes by default. - DisableStepping() error + // i.e. uses reads-own-writes by default. This makes the step + // behavior opt-in and backward-compatible with existing code which + // does not need it. + // + // Calling ConfigureStepping(true) when the stepping mode is + // currently disabled implies calling Step(), for convenience. + ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error) } +// SteppingMode is the argument type to ConfigureStepping. +type SteppingMode bool + +const ( + // SteppingDisabled is the default mode, where each read can + // observe the latest write. + SteppingDisabled SteppingMode = false + + // SteppingEnabled can be set to indicate that read operations + // operate on a snapshot taken at the latest Step() invocation. + SteppingEnabled SteppingMode = true +) + // TxnStatusOpt represents options for TxnSender.GetMeta(). type TxnStatusOpt int diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index fa344fecd5dc..9d83aa59d74c 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -76,6 +76,7 @@ type Txn struct { } // NewTxn returns a new RootTxn. +// Note: for SQL usage, prefer NewTxnWithSteppingEnabled() below. // // If the transaction is used to send any operations, CommitOrCleanup() or // CleanupOnError() should eventually be called to commit/rollback the @@ -109,6 +110,14 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn { return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn) } +// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL. +func NewTxnWithSteppingEnabled(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn { + txn := NewTxn(ctx, db, gatewayNodeID) + // ConfigureStepping is guaranteed to not return an error on root txns. + _, _ = txn.ConfigureStepping(SteppingEnabled) + return txn +} + // NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized. // Do not use this directly; use NewTxn() instead. // This function exists for testing only. @@ -766,7 +775,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) } } - cause := errors.Cause(err) + cause := errors.UnwrapAll(err) var retryable bool switch t := cause.(type) { @@ -1179,3 +1188,33 @@ func (txn *Txn) Active() bool { defer txn.mu.Unlock() return txn.mu.sender.Active() } + +// Step enables step-wise execution in the transaction, or +// performs a step if step-wise execution is already enabled. +// +// In step-wise execution, reads operate at a snapshot established at +// the last step, instead of the latest write if not yet enabled. +func (txn *Txn) Step() error { + if txn.typ != RootTxn { + return errors.AssertionFailedf("txn.Step() only allowed in RootTxn") + } + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.Step() +} + +// ConfigureStepping configures step-wise execution in the +// transaction. +// +// This function is guaranteed to not return an error if it previously +// succeeded once with some txn and mode, then provided its own return +// value as input for a second call. This simplifies the implementation +// of push/pop semantics. +func (txn *Txn) ConfigureStepping(mode SteppingMode) (prevMode SteppingMode, err error) { + if txn.typ != RootTxn { + return false, errors.AssertionFailedf("txn.DisableStepping() only allowed in RootTxn") + } + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.ConfigureStepping(mode) +} diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index b0de358397ce..8837d6c77f94 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -1087,12 +1087,18 @@ func (tc *TxnCoordSender) Step() error { } // DisableStepping is part of the TxnSender interface. -func (tc *TxnCoordSender) DisableStepping() error { +func (tc *TxnCoordSender) ConfigureStepping( + mode client.SteppingMode, +) (prevMode client.SteppingMode, err error) { if tc.typ != client.RootTxn { - return errors.AssertionFailedf("cannot call DisableStepping() in leaf txn") + return client.SteppingDisabled, errors.AssertionFailedf("cannot call DisableStepping() in leaf txn") } tc.mu.Lock() defer tc.mu.Unlock() - tc.interceptorAlloc.txnSeqNumAllocator.disableSteppingLocked() - return nil + prevEnabled := tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode == client.SteppingEnabled) + prevMode = client.SteppingDisabled + if prevEnabled { + prevMode = client.SteppingEnabled + } + return prevMode, nil } diff --git a/pkg/kv/txn_interceptor_seq_num_allocator.go b/pkg/kv/txn_interceptor_seq_num_allocator.go index 3dfa1f184c47..37f28d4da928 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator.go @@ -129,21 +129,26 @@ func (s *txnSeqNumAllocator) importLeafFinalState(tfs *roachpb.LeafTxnFinalState // stepLocked bumps the read seqnum to the current write seqnum. // Used by the TxnCoordSender's Step() method. func (s *txnSeqNumAllocator) stepLocked() error { - if s.steppingModeEnabled && s.readSeq > s.writeSeq { + if !s.steppingModeEnabled { + return errors.AssertionFailedf("stepping mode is not enabled") + } + if s.readSeq > s.writeSeq { return errors.AssertionFailedf( "cannot step() after mistaken initialization (%d,%d)", s.writeSeq, s.readSeq) } - s.steppingModeEnabled = true s.readSeq = s.writeSeq return nil } -// disableSteppingLocked cancels the stepping behavior and -// restores read-latest-write behavior. -// Used by the TxnCoordSender's DisableStepping() method. -func (s *txnSeqNumAllocator) disableSteppingLocked() { - s.steppingModeEnabled = false - s.readSeq = 0 +// configureSteppingLocked configures the stepping mode. +// Used by the TxnCoordSender's ConfigureStepping() method. +func (s *txnSeqNumAllocator) configureSteppingLocked(enabled bool) (prevEnabled bool) { + prevEnabled = s.steppingModeEnabled + s.steppingModeEnabled = enabled + if !prevEnabled && enabled { + s.readSeq = s.writeSeq + } + return prevEnabled } // epochBumpedLocked is part of the txnInterceptor interface. diff --git a/pkg/kv/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/txn_interceptor_seq_num_allocator_test.go index 562040f1f8b0..a90ef569435e 100644 --- a/pkg/kv/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/txn_interceptor_seq_num_allocator_test.go @@ -117,6 +117,8 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + s.configureSteppingLocked(true /* enabled */) + for i := 1; i <= 3; i++ { if err := s.stepLocked(); err != nil { t.Fatal(err) @@ -195,8 +197,8 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { }) } - // Check that step-wise execution is disabled by DisableStepping(). - s.disableSteppingLocked() + // Check that step-wise execution is disabled by ConfigureStepping(SteppingDisabled). + s.configureSteppingLocked(false /* enabled */) currentStepSeqNum := s.writeSeq var ba roachpb.BatchRequest