diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index be82406212ba..2301f181a2bb 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -1187,8 +1187,11 @@ func (tc *TxnCoordSender) OrigTimestamp() hlc.Timestamp { func (tc *TxnCoordSender) CommitTimestamp() hlc.Timestamp { tc.mu.Lock() defer tc.mu.Unlock() + txn := &tc.mu.txn tc.mu.txn.OrigTimestampWasObserved = true - return tc.mu.txn.OrigTimestamp + commitTS := txn.OrigTimestamp + commitTS.Forward(txn.RefreshedTimestamp) + return commitTS } // CommitTimestampFixed is part of the client.TxnSender interface. diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index 4732a3054d4b..2291b0402f3f 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // TestTxnDBBasics verifies that a simple transaction can be run and @@ -589,3 +590,52 @@ func TestTxnResolveIntentsFromMultipleEpochs(t *testing.T) { } } } + +// Test that txn.CommitTimestamp() reflects refreshes. +func TestTxnCommitTimestampAdvancedByRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + // We're going to inject an uncertainty error, expect the refresh to succeed, + // and then check that the txn.CommitTimestamp() value reflects the refresh. + injected := false + var refreshTS hlc.Timestamp + errKey := roachpb.Key("inject_err") + s := createTestDBWithContextAndKnobs(t, client.DefaultDBContext(), &storage.StoreTestingKnobs{ + TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error { + if g, ok := ba.GetArg(roachpb.Get); ok && g.(*roachpb.GetRequest).Key.Equal(errKey) { + if injected { + return nil + } + injected = true + txn := ba.Txn.Clone() + refreshTS = txn.Timestamp.Add(0, 1) + pErr := roachpb.NewReadWithinUncertaintyIntervalError( + txn.OrigTimestamp, + refreshTS, + txn) + return roachpb.NewErrorWithTxn(pErr, txn) + } + return nil + }, + }) + defer s.Stop() + + err := s.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + _, err := txn.Get(ctx, errKey) + if err != nil { + return err + } + if !injected { + return errors.Errorf("didn't inject err") + } + commitTS := txn.CommitTimestamp() + // We expect to have refreshed just after the timestamp injected by the error. + expTS := refreshTS.Add(0, 1) + if !commitTS.Equal(expTS) { + return errors.Errorf("expected refreshTS: %s, got: %s", refreshTS, commitTS) + } + return nil + }) + require.NoError(t, err) +}