diff --git a/pkg/internal/client/mock_transactional_sender.go b/pkg/internal/client/mock_transactional_sender.go index 8a0416f578f9..8d595278975d 100644 --- a/pkg/internal/client/mock_transactional_sender.go +++ b/pkg/internal/client/mock_transactional_sender.go @@ -135,6 +135,21 @@ func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool return false } +// CreateSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) CreateSavepoint(context.Context) (SavepointToken, error) { + panic("unimplemented") +} + +// RollbackToSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) RollbackToSavepoint(context.Context, SavepointToken) error { + panic("unimplemented") +} + +// ReleaseSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointToken) error { + panic("unimplemented") +} + // Epoch is part of the TxnSender interface. func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") } diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 47f6ca66d969..eaadf27ba791 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -133,6 +133,35 @@ type TxnSender interface { // TxnStatus exports the txn's status. TxnStatus() roachpb.TransactionStatus + // CreateSavepoint establishes a savepoint. + // This method is only valid when called on RootTxns. + // + // Committing (or aborting) the transaction causes every open + // savepoint to be released (or, respectively, rolled back) + // implicitly. + CreateSavepoint(context.Context) (SavepointToken, error) + + // RollbackToSavepoint rolls back to the given savepoint. + // All savepoints "under" the savepoint being rolled back + // are also rolled back and their token must not be used any more. + // The token of the savepoint being rolled back remains valid + // and can be reused later (e.g. to release or roll back again). + // Aborting the txn implicitly rolls back all savepoints + // that are still open. + // + // This method is only valid when called on RootTxns. + RollbackToSavepoint(context.Context, SavepointToken) error + + // ReleaseSavepoint releases the given savepoint. + // The savepoint must not have been rolled back or released already. + // All savepoints "under" the savepoint being released + // are also released and their token must not be used any more. + // Committing the txn implicitly releases all savepoints + // that are still open. + // + // This method is only valid when called on RootTxns. + ReleaseSavepoint(context.Context, SavepointToken) error + // SetFixedTimestamp makes the transaction run in an unusual way, at // a "fixed timestamp": Timestamp and ReadTimestamp are set to ts, // there's no clock uncertainty, and the txn's deadline is set to ts @@ -274,6 +303,12 @@ const ( SteppingEnabled SteppingMode = true ) +// SavepointToken represents a savepoint. +type SavepointToken interface { + // SavepointToken is a marker interface. + SavepointToken() +} + // TxnStatusOpt represents options for TxnSender.GetMeta(). type TxnStatusOpt int diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 513759af2c80..36f1780868bb 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1217,3 +1217,35 @@ func (txn *Txn) ConfigureStepping(ctx context.Context, mode SteppingMode) (prevM defer txn.mu.Unlock() return txn.mu.sender.ConfigureStepping(ctx, mode) } + +// CreateSavepoint establishes a savepoint. +// This method is only valid when called on RootTxns. +func (txn *Txn) CreateSavepoint(ctx context.Context) (SavepointToken, error) { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.CreateSavepoint(ctx) +} + +// RollbackToSavepoint rolls back to the given savepoint. +// All savepoints "under" the savepoint being rolled back +// are also rolled back and their token must not be used any more. +// The token of the savepoint being rolled back remains valid +// and can be reused later (e.g. to release or roll back again). +// +// This method is only valid when called on RootTxns. +func (txn *Txn) RollbackToSavepoint(ctx context.Context, s SavepointToken) error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.RollbackToSavepoint(ctx, s) +} + +// ReleaseSavepoint releases the given savepoint. The savepoint +// must not have been rolled back or released already. +// All savepoints "under" the savepoint being released +// are also released and their token must not be used any more. +// This method is only valid when called on RootTxns. +func (txn *Txn) ReleaseSavepoint(ctx context.Context, s SavepointToken) error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.ReleaseSavepoint(ctx, s) +} diff --git a/pkg/kv/testdata/savepoints b/pkg/kv/testdata/savepoints new file mode 100644 index 000000000000..c973d48ae72f --- /dev/null +++ b/pkg/kv/testdata/savepoints @@ -0,0 +1,374 @@ +subtest release + +begin +---- +0 + +put k a +---- + +get k +---- +"k" -> a + +savepoint x +---- +1 + +put k b +---- + +get k +---- +"k" -> b + +release x +---- +2 + +get k +---- +"k" -> b + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> b + +commit +---- + +subtest end + + +subtest rollback + +begin +---- +0 + +put k a +---- + +get k +---- +"k" -> a + +savepoint x +---- +1 + +put k b +---- + +get k +---- +"k" -> b + +rollback x +---- +2 [2-2] + +get k +---- +"k" -> a + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> a + +commit +---- + +subtest end + +subtest rollback_after_nested_release + +begin +---- +0 + +put k ar +---- + +savepoint x +---- +1 + +put k br +---- + +savepoint y +---- +2 + +put k cr +---- + +release y +---- +3 + +put k dr +---- + +rollback x +---- +4 [2-4] + +get k +---- +"k" -> ar + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> ar + +commit +---- + +subtest end + +subtest disjoin_rollbacks + +begin +---- +0 + +put a d1 +---- + +put b d1 +---- + +savepoint x +---- +2 + +put a d2 +---- + +rollback x +---- +3 [3-3] + +put c d1 +---- + +savepoint x +---- +4 [3-3] + +put b 2 +---- + +rollback x +---- +5 [3-3][5-5] + +put d 1 +---- + + +get a +---- +"a" -> d1 + +get b +---- +"b" -> d1 + +get c +---- +"c" -> d1 + +get d +---- +"d" -> 1 + +commit +---- + +begin +---- +0 + +get a +---- +"a" -> d1 + +get b +---- +"b" -> d1 + +get c +---- +"c" -> d1 + +get d +---- +"d" -> 1 + +commit +---- + +subtest end + + +subtest rollback_with_no_op + +begin +---- +0 + +put k nop +---- + +savepoint x +---- +1 + +rollback x +---- +1 + +release x +---- +1 + +commit +---- + +subtest end + +subtest double_rollback_ok + +begin +---- +0 + +put k init +---- + +commit +---- + +begin +---- +0 + +savepoint x +---- +0 + +put k da +---- + +rollback x +---- +1 [1-1] + +rollback x +---- +1 [1-1] + +get k +---- +"k" -> init + +put k db +---- + +rollback x +---- +2 [1-2] + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> init + +commit +---- + +subtest end + +subtest rollback_across_retry + +# TODO(knz): change this test when rolling back across retries becomes +# supported. + +begin +---- +0 + +savepoint x +---- +0 + +retry +---- +synthetic error: TransactionRetryWithProtoRefreshError: forced retry +epoch: 0 -> 1 + +release x +---- +(*withstack.withStack) cannot release savepoint across transaction retries + +rollback x +---- +(*withstack.withStack) cannot rollback savepoint across transaction retries + +subtest end + +subtest rollback_across_abort + +begin +---- +0 + +savepoint x +---- +0 + +abort +---- +(*roachpb.TransactionRetryWithProtoRefreshError) +txn id changed + +release x +---- +(*withstack.withStack) cannot release savepoint across transaction retries + +rollback x +---- +(*withstack.withStack) cannot rollback savepoint across transaction retries + + +subtest end diff --git a/pkg/kv/txn_coord_sender_savepoints.go b/pkg/kv/txn_coord_sender_savepoints.go new file mode 100644 index 000000000000..f83ea41b92f5 --- /dev/null +++ b/pkg/kv/txn_coord_sender_savepoints.go @@ -0,0 +1,186 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kv + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// savepointToken captures the state in the TxnCoordSender necessary +// to restore that state upon a savepoint rollback. +// +// TODO(knz,andrei): Currently this definition is only sufficient for +// just a few cases of rollbacks. This should be extended to cover +// more ground: +// +// - We also need the current size of txnSpanRefresher.refreshSpans the +// list of tracked reads, such that upon rollback we tell the +// refresher interceptor to discard further reads. +// - We also need something about in-flight writes +// (txnPipeliner.ifWrites). There I guess we need to take some sort of +// snapshot of the current in-flight writes and, on rollback, discard +// in-flight writes that are not part of the savepoint. But, on +// rollback, I don't think we should (nor am I sure that we could) +// simply overwrite the set of in-flight writes with the ones from the +// savepoint because writes that have been verified since the snapshot +// has been taken should continue to be verified. Basically, on +// rollback I think we need to intersect the savepoint with the +// current set of in-flight writes. +type savepointToken struct { + // seqNum is currently the only field that helps to "restore" + // anything upon a rollback. When used, it does not change anything + // in the TCS; instead it simply configures the txn to ignore all + // seqnums from this value until the most recent seqnum emitted by + // the TCS. + seqNum enginepb.TxnSeq + + // txnID is used to verify that a rollback is not used to paper + // over a txn abort error. + txnID uuid.UUID + // epoch is used to verify that a savepoint rollback is not + // used to paper over a retry error. + // TODO(knz,andrei): expand savepoint rollbacks to recover + // from retry errors. + // TODO(knz,andrei): remove the epoch mechanism entirely in + // favor of seqnums and savepoint rollbacks. + epoch enginepb.TxnEpoch +} + +var _ client.SavepointToken = (*savepointToken)(nil) + +// SavepointToken implements the client.SavepointToken interface. +func (s *savepointToken) SavepointToken() {} + +// CreateSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) { + if tc.typ != client.RootTxn { + return nil, errors.AssertionFailedf("cannot get savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + + if err := tc.assertNotFinalized(); err != nil { + return nil, err + } + + if tc.mu.txnState != txnPending { + return nil, ErrSavepointOperationInErrorTxn + } + + return &savepointToken{ + txnID: tc.mu.txn.ID, + epoch: tc.mu.txn.Epoch, + seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + }, nil +} + +// RollbackToSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.SavepointToken) error { + if tc.typ != client.RootTxn { + return errors.AssertionFailedf("cannot rollback savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + + if err := tc.assertNotFinalized(); err != nil { + return err + } + + st, err := tc.checkSavepointLocked(s, "rollback") + if err != nil { + return err + } + + // TODO(knz): handle recoverable errors. + if tc.mu.txnState == txnError { + return unimplemented.New("rollback_error", "savepoint rollback after error") + } + + if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + // No operations since savepoint was taken. No-op. + return nil + } + + tc.mu.txn.IgnoredSeqNums = roachpb.AddIgnoredSeqNumRange( + tc.mu.txn.IgnoredSeqNums, + enginepb.IgnoredSeqNumRange{ + Start: st.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + }) + + return nil +} + +// ReleaseSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s client.SavepointToken) error { + if tc.typ != client.RootTxn { + return errors.AssertionFailedf("cannot release savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + + if tc.mu.txnState != txnPending { + return ErrSavepointOperationInErrorTxn + } + + _, err := tc.checkSavepointLocked(s, "release") + return err +} + +type errSavepointOperationInErrorTxn struct{} + +// ErrSavepointOperationInErrorTxn is reported when CreateSavepoint() +// or ReleaseSavepoint() is called over a txn currently in error. +var ErrSavepointOperationInErrorTxn error = errSavepointOperationInErrorTxn{} + +func (err errSavepointOperationInErrorTxn) Error() string { + return "cannot create or release savepoint after an error has occurred" +} + +func (tc *TxnCoordSender) assertNotFinalized() error { + if tc.mu.txnState == txnFinalized { + return errors.AssertionFailedf("operation invalid for finalized txns") + } + return nil +} + +func (tc *TxnCoordSender) checkSavepointLocked( + s client.SavepointToken, opName string, +) (*savepointToken, error) { + st, ok := s.(*savepointToken) + if !ok { + return nil, errors.AssertionFailedf("expected savepointToken, got %T", s) + } + + if st.txnID != tc.mu.txn.ID { + return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) + } + + if st.epoch != tc.mu.txn.Epoch { + return nil, errors.Newf("cannot %s savepoint across transaction retries", opName) + } + + if st.seqNum < 0 || st.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + return nil, errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d", + st.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) + } + + return st, nil +} diff --git a/pkg/kv/txn_coord_sender_savepoints_test.go b/pkg/kv/txn_coord_sender_savepoints_test.go new file mode 100644 index 000000000000..8c7295576eb8 --- /dev/null +++ b/pkg/kv/txn_coord_sender_savepoints_test.go @@ -0,0 +1,156 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kv + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + abortKey := roachpb.Key("abort") + + datadriven.Walk(t, "testdata/savepoints", func(t *testing.T, path string) { + // We want to inject txn abort errors in some cases. + // + // We do this by injecting the error from "underneath" the + // TxnCoordSender, from storage. + params := base.TestServerArgs{} + var doAbort int64 + params.Knobs.Store = &storage.StoreTestingKnobs{ + EvalKnobs: storagebase.BatchEvalTestingKnobs{ + TestingEvalFilter: func(args storagebase.FilterArgs) *roachpb.Error { + if atomic.LoadInt64(&doAbort) != 0 && args.Req.Header().Key.Equal(abortKey) { + return roachpb.NewErrorWithTxn( + roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN), args.Hdr.Txn) + } + return nil + }, + }, + } + + // New database for each test file. + s, _, db := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + // Transient state during the test. + sp := make(map[string]client.SavepointToken) + var txn *client.Txn + + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + var buf strings.Builder + + ptxn := func() { + tc := txn.Sender().(*TxnCoordSender) + fmt.Fprintf(&buf, "%d ", tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) + if len(tc.mu.txn.IgnoredSeqNums) == 0 { + buf.WriteString("") + } + for _, r := range tc.mu.txn.IgnoredSeqNums { + fmt.Fprintf(&buf, "[%d-%d]", r.Start, r.End) + } + fmt.Fprintln(&buf) + } + + switch td.Cmd { + case "begin": + txn = client.NewTxn(ctx, db, 0) + ptxn() + + case "commit": + if err := txn.CommitOrCleanup(ctx); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } + + case "retry": + epochBefore := txn.Epoch() + retryErr := txn.GenerateForcedRetryableError(ctx, "forced retry") + epochAfter := txn.Epoch() + fmt.Fprintf(&buf, "synthetic error: %v\n", retryErr) + fmt.Fprintf(&buf, "epoch: %d -> %d\n", epochBefore, epochAfter) + + case "abort": + prevID := txn.ID() + atomic.StoreInt64(&doAbort, 1) + defer func() { atomic.StoreInt64(&doAbort, 00) }() + err := txn.Put(ctx, abortKey, []byte("value")) + fmt.Fprintf(&buf, "(%T)\n", err) + changed := "changed" + if prevID == txn.ID() { + changed = "not changed" + } + fmt.Fprintf(&buf, "txn id %s\n", changed) + + case "put": + if err := txn.Put(ctx, + roachpb.Key(td.CmdArgs[0].Key), + []byte(td.CmdArgs[1].Key)); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } + + case "get": + v, err := txn.Get(ctx, td.CmdArgs[0].Key) + if err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ba, _ := v.Value.GetBytes() + fmt.Fprintf(&buf, "%v -> %v\n", v.Key, string(ba)) + } + + case "savepoint": + spt, err := txn.CreateSavepoint(ctx) + if err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + sp[td.CmdArgs[0].Key] = spt + ptxn() + } + + case "release": + spn := td.CmdArgs[0].Key + spt := sp[spn] + if err := txn.ReleaseSavepoint(ctx, spt); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ptxn() + } + + case "rollback": + spn := td.CmdArgs[0].Key + spt := sp[spn] + if err := txn.RollbackToSavepoint(ctx, spt); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ptxn() + } + + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return buf.String() + }) + }) +} diff --git a/pkg/roachpb/ignored_seqnums.go b/pkg/roachpb/ignored_seqnums.go new file mode 100644 index 000000000000..70433209ff69 --- /dev/null +++ b/pkg/roachpb/ignored_seqnums.go @@ -0,0 +1,60 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + +// AddIgnoredSeqNumRange adds the given range to the given list of +// ignored seqnum ranges. +// +// The following invariants are assumed to hold and are preserved: +// - the list contains no overlapping ranges +// - the list contains no contiguous ranges +// - the list is sorted, with larger seqnums at the end +// +// Additionally, the caller must ensure: +// +// 1) if the new range overlaps with some range in the list, then it +// also overlaps with every subsequent range in the list. +// +// 2) the new range's "end" seqnum is larger or equal to the "end" +// seqnum of the last element in the list. +// +// For example: +// current list [3 5] [10 20] [22 24] +// new item: [8 26] +// final list: [3 5] [8 26] +// +// current list [3 5] [10 20] [22 24] +// new item: [28 32] +// final list: [3 5] [10 20] [22 24] [28 32] +// +// This corresponds to savepoints semantics: +// +// - Property 1 says that a rollback to an earlier savepoint +// rolls back over all writes following that savepoint. +// - Property 2 comes from that the new range's 'end' seqnum is the +// current write seqnum and thus larger than or equal to every +// previously seen value. +func AddIgnoredSeqNumRange( + list []enginepb.IgnoredSeqNumRange, newRange enginepb.IgnoredSeqNumRange, +) []enginepb.IgnoredSeqNumRange { + // Truncate the list at the last element not included in the new range. + i := 0 + for ; i < len(list); i++ { + if list[i].End < newRange.Start { + continue + } + break + } + list = list[:i] + return append(list, newRange) +} diff --git a/pkg/roachpb/ignored_seqnums_test.go b/pkg/roachpb/ignored_seqnums_test.go new file mode 100644 index 000000000000..ba5335d4924c --- /dev/null +++ b/pkg/roachpb/ignored_seqnums_test.go @@ -0,0 +1,53 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/stretchr/testify/require" +) + +func TestAddIgnoredSeqNumRange(t *testing.T) { + type r = enginepb.IgnoredSeqNumRange + + mr := func(a, b enginepb.TxnSeq) r { + return r{Start: a, End: b} + } + + testData := []struct { + list []r + newRange r + exp []r + }{ + {[]r{}, + mr(1, 2), + []r{mr(1, 2)}}, + {[]r{mr(1, 2)}, + mr(1, 4), + []r{mr(1, 4)}}, + {[]r{mr(1, 2), mr(3, 6)}, + mr(8, 10), + []r{mr(1, 2), mr(3, 6), mr(8, 10)}}, + {[]r{mr(1, 2), mr(5, 6)}, + mr(3, 8), + []r{mr(1, 2), mr(3, 8)}}, + {[]r{mr(1, 2), mr(5, 6)}, + mr(1, 8), + []r{mr(1, 8)}}, + } + + for _, tc := range testData { + nl := AddIgnoredSeqNumRange(tc.list, tc.newRange) + require.Equal(t, tc.exp, nl) + } +}