diff --git a/pkg/internal/client/mock_transactional_sender.go b/pkg/internal/client/mock_transactional_sender.go index 8a0416f578f9..8d595278975d 100644 --- a/pkg/internal/client/mock_transactional_sender.go +++ b/pkg/internal/client/mock_transactional_sender.go @@ -135,6 +135,21 @@ func (m *MockTransactionalSender) IsSerializablePushAndRefreshNotPossible() bool return false } +// CreateSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) CreateSavepoint(context.Context) (SavepointToken, error) { + panic("unimplemented") +} + +// RollbackToSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) RollbackToSavepoint(context.Context, SavepointToken) error { + panic("unimplemented") +} + +// ReleaseSavepoint is part of the client.TxnSender interface. +func (m *MockTransactionalSender) ReleaseSavepoint(context.Context, SavepointToken) error { + panic("unimplemented") +} + // Epoch is part of the TxnSender interface. func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") } diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 47f6ca66d969..a52f0eef2716 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -133,6 +133,24 @@ type TxnSender interface { // TxnStatus exports the txn's status. TxnStatus() roachpb.TransactionStatus + // CreateSavepoint establishes a savepoint. + // This method is only valid when called on RootTxns. + CreateSavepoint(context.Context) (SavepointToken, error) + + // RollbackToSavepoint rolls back to the given savepoint. The + // savepoint must not have been rolled back or released already. + // All savepoints "under" the savepoint being rolled back + // are also rolled back and their token must not be used any more. + // This method is only valid when called on RootTxns. + RollbackToSavepoint(context.Context, SavepointToken) error + + // ReleaseSavepoint releases the given savepoint. The savepoint + // must not have been rolled back or released already. + // All savepoints "under" the savepoint being released + // are also released and their token must not be used any more. + // This method is only valid when called on RootTxns. + ReleaseSavepoint(context.Context, SavepointToken) error + // SetFixedTimestamp makes the transaction run in an unusual way, at // a "fixed timestamp": Timestamp and ReadTimestamp are set to ts, // there's no clock uncertainty, and the txn's deadline is set to ts @@ -274,6 +292,12 @@ const ( SteppingEnabled SteppingMode = true ) +// SavepointToken represents a savepoint. +type SavepointToken interface { + // SavepointToken is a marker interface. + SavepointToken() +} + // TxnStatusOpt represents options for TxnSender.GetMeta(). type TxnStatusOpt int diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 513759af2c80..a7f63cc6a632 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1217,3 +1217,33 @@ func (txn *Txn) ConfigureStepping(ctx context.Context, mode SteppingMode) (prevM defer txn.mu.Unlock() return txn.mu.sender.ConfigureStepping(ctx, mode) } + +// CreateSavepoint establishes a savepoint. +// This method is only valid when called on RootTxns. +func (txn *Txn) CreateSavepoint(ctx context.Context) (SavepointToken, error) { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.CreateSavepoint(ctx) +} + +// RollbackToSavepoint rolls back to the given savepoint. The +// savepoint must not have been rolled back or released already. +// All savepoints "under" the savepoint being rolled back +// are also rolled back and their token must not be used any more. +// This method is only valid when called on RootTxns. +func (txn *Txn) RollbackToSavepoint(ctx context.Context, s SavepointToken) error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.RollbackToSavepoint(ctx, s) +} + +// ReleaseSavepoint releases the given savepoint. The savepoint +// must not have been rolled back or released already. +// All savepoints "under" the savepoint being released +// are also released and their token must not be used any more. +// This method is only valid when called on RootTxns. +func (txn *Txn) ReleaseSavepoint(ctx context.Context, s SavepointToken) error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.ReleaseSavepoint(ctx, s) +} diff --git a/pkg/kv/testdata/savepoints b/pkg/kv/testdata/savepoints new file mode 100644 index 000000000000..0046fddf5fd0 --- /dev/null +++ b/pkg/kv/testdata/savepoints @@ -0,0 +1,353 @@ +subtest release + +begin +---- +0 + +put k a +---- + +get k +---- +"k" -> a + +savepoint x +---- +1 + +put k b +---- + +get k +---- +"k" -> b + +release x +---- +2 + +get k +---- +"k" -> b + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> b + +commit +---- + +subtest end + + +subtest rollback + +begin +---- +0 + +put k a +---- + +get k +---- +"k" -> a + +savepoint x +---- +1 + +put k b +---- + +get k +---- +"k" -> b + +rollback x +---- +2 [2-2] + +get k +---- +"k" -> a + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> a + +commit +---- + +subtest end + +subtest rollback_after_nested_release + +begin +---- +0 + +put k a +---- + +savepoint x +---- +1 + +put k b +---- + +savepoint y +---- +2 + +put k c +---- + +release y +---- +3 + +put k d +---- + +rollback x +---- +4 [2-4] + +get k +---- +"k" -> a + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> a + +commit +---- + +subtest end + +subtest disjoin_rollbacks + +begin +---- +0 + +put a 1 +---- + +put b 1 +---- + +savepoint x +---- +2 + +put a 2 +---- + +rollback x +---- +3 [3-3] + +put c 1 +---- + +savepoint x +---- +4 [3-3] + +put b 2 +---- + +rollback x +---- +5 [3-3][5-5] + +put d 1 +---- + + +get a +---- +"a" -> 1 + +get b +---- +"b" -> 1 + +get c +---- +"c" -> 1 + +get d +---- +"d" -> 1 + +commit +---- + +begin +---- +0 + +get a +---- +"a" -> 1 + +get b +---- +"b" -> 1 + +get c +---- +"c" -> 1 + +get d +---- +"d" -> 1 + +commit +---- + +subtest end + + +subtest rollback_with_no_op + +begin +---- +0 + +put k a +---- + +savepoint x +---- +1 + +rollback x +---- +1 + +rollback x +---- +1 + +commit +---- + +subtest end + +subtest no_double_rollback + +begin +---- +0 + +savepoint x +---- +0 + +put k a +---- + +rollback x +---- +1 [1-1] + +rollback x +---- +(*assert.withAssertionFailure) savepoint is already rolled back + +commit +---- + +begin +---- +0 + +get k +---- +"k" -> a + +commit +---- + +subtest end + +subtest rollback_across_retry + +# TODO(knz): change this test when rolling back across retries becomes +# supported. + +begin +---- +0 + +savepoint x +---- +0 + +retry +---- +synthetic error: TransactionRetryWithProtoRefreshError: forced retry +epoch: 0 -> 1 + +release x +---- +(*assert.withAssertionFailure) savepoint from wrong txn epoch: got 0, expected 1 + +rollback x +---- +(*assert.withAssertionFailure) savepoint from wrong txn epoch: got 0, expected 1 + +subtest end + +subtest rollback_across_abort + +begin +---- +0 + +savepoint x +---- +0 + +retry +---- +synthetic error: XXXX (FIXME) +id: 0 -> 1 + +release x +---- +(*assert.withAssertionFailure) savepoint from wrong txn ID: got X, expected Y + +rollback x +---- +(*assert.withAssertionFailure) savepoint from wrong txn ID: got X, expected Y + + +subtest end diff --git a/pkg/kv/txn_coord_sender_savepoints.go b/pkg/kv/txn_coord_sender_savepoints.go new file mode 100644 index 000000000000..cae43fe75fe7 --- /dev/null +++ b/pkg/kv/txn_coord_sender_savepoints.go @@ -0,0 +1,175 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kv + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// savepointToken captures the state in the TxnCoordSender necessary +// to restore that state upon a savepoint rollback. +// +// TODO(knz,andrei): Currently this definition is only sufficient for +// just a few cases of rollbacks. This should be extended to cover +// more ground: +// +// - We also need the current size of txnSpanRefresher.refreshSpans the +// list of tracked reads, such that upon rollback we tell the +// refresher interceptor to discard further reads. +// - We also need something about in-flight writes +// (txnPipeliner.ifWrites). There I guess we need to take some sort of +// snapshot of the current in-flight writes and, on rollback, discard +// in-flight writes that are not part of the savepoint. But, on +// rollback, I don't think we should (nor am I sure that we could) +// simply overwrite the set of in-flight writes with the ones from the +// savepoint because writes that have been verified since the snapshot +// has been taken should continue to be verified. Basically, on +// rollback I think we need to intersect the savepoint with the +// current set of in-flight writes. +type savepointToken struct { + // seqNum is currently the only field that helps to "restore" + // anything upon a rollback. When used, it does not change anything + // in the TCS; instead it simply configures the txn to ignore all + // seqnums from this value until the most recent seqnum emitted by + // the TCS. + seqNum enginepb.TxnSeq + + // txnID is used to verify that a rollback is not used to paper + // over a txn abort error. + txnID uuid.UUID + // epoch is used to verify that a savepoint rollback is not + // used to paper over a retry error. + // TODO(knz,andrei): expand savepoint rollbacks to recover + // from retry errors. + // TODO(knz,andrei): remove the epoch mechanism entirely in + // favor of seqnums and savepoint rollbacks. + epoch enginepb.TxnEpoch +} + +var _ client.SavepointToken = (*savepointToken)(nil) + +// SavepointToken implements the client.SavepointToken interface. +func (s *savepointToken) SavepointToken() {} + +// CreateSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (client.SavepointToken, error) { + if tc.typ != client.RootTxn { + return nil, errors.AssertionFailedf("cannot get savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.txnState != txnPending || tc.mu.closed { + return nil, errors.AssertionFailedf("cannot get savepoint in this txn state (%v, %v)", + tc.mu.txnState, tc.mu.closed) + } + return &savepointToken{ + txnID: tc.mu.txn.ID, + epoch: tc.mu.txn.Epoch, + seqNum: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + }, nil +} + +// RollbackToSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s client.SavepointToken) error { + if tc.typ != client.RootTxn { + return errors.AssertionFailedf("cannot rollback savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + + st, err := tc.checkSavepointLocked(true /*allowError*/, s) + if err != nil { + return err + } + + // TODO(knz): handle recoverable errors. + if tc.mu.txnState == txnError { + return unimplemented.New("rollback_error", "savepoint rollback after error") + } + + if st.seqNum == tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + // No operations since savepoint was taken. No-op. + return nil + } + + tc.mu.txn.IgnoredSeqNums = roachpb.AddIgnoredSeqNumRange( + tc.mu.txn.IgnoredSeqNums, + enginepb.IgnoredSeqNumRange{ + Start: st.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + }) + + return nil +} + +// ReleaseSavepoint is part of the client.TxnSender interface. +func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s client.SavepointToken) error { + if tc.typ != client.RootTxn { + return errors.AssertionFailedf("cannot release savepoint in non-root txn") + } + + tc.mu.Lock() + defer tc.mu.Unlock() + + _, err := tc.checkSavepointLocked(false /*allowError*/, s) + if err != nil { + return err + } + + // Release is otherwise a no-op. + + return nil +} + +func (tc *TxnCoordSender) checkSavepointLocked( + allowError bool, s client.SavepointToken, +) (*savepointToken, error) { + if (tc.mu.txnState != txnPending && + (!allowError || tc.mu.txnState != txnError)) || + tc.mu.closed { + return nil, errors.AssertionFailedf("cannot run savepoint operation in this txn state (%v, %v)", + tc.mu.txnState, tc.mu.closed) + } + + st, ok := s.(*savepointToken) + if !ok { + return nil, errors.AssertionFailedf("expected savepointToken, got %T", s) + } + + if st.txnID != tc.mu.txn.ID { + return nil, errors.AssertionFailedf("savepoint from wrong txn ID: got %s, expected %s", + st.txnID, tc.mu.txn.ID) + } + + if st.epoch != tc.mu.txn.Epoch { + return nil, errors.AssertionFailedf("savepoint from wrong txn epoch: got %d, expected %d", + st.epoch, tc.mu.txn.Epoch) + } + + if st.seqNum < 0 || st.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + return nil, errors.AssertionFailedf("invalid savepoint: got %d, expected 0-%d", + st.seqNum, tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) + } + + if enginepb.TxnSeqIsIgnored(st.seqNum+1, tc.mu.txn.IgnoredSeqNums) { + return nil, errors.AssertionFailedf("savepoint is already rolled back") + } + + return st, nil +} diff --git a/pkg/kv/txn_coord_sender_savepoints_test.go b/pkg/kv/txn_coord_sender_savepoints_test.go new file mode 100644 index 000000000000..ddc32cff6114 --- /dev/null +++ b/pkg/kv/txn_coord_sender_savepoints_test.go @@ -0,0 +1,124 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kv + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +func TestSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, "testdata/savepoints", func(t *testing.T, path string) { + // New database for each test file. + s := createTestDB(t) + defer s.Stop() + ctx := context.Background() + + // Transient state during the test. + sp := make(map[string]client.SavepointToken) + var txn *client.Txn + + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + var buf strings.Builder + + ptxn := func() { + tc := txn.Sender().(*TxnCoordSender) + fmt.Fprintf(&buf, "%d ", tc.interceptorAlloc.txnSeqNumAllocator.writeSeq) + if len(tc.mu.txn.IgnoredSeqNums) == 0 { + buf.WriteString("") + } + for _, r := range tc.mu.txn.IgnoredSeqNums { + fmt.Fprintf(&buf, "[%d-%d]", r.Start, r.End) + } + fmt.Fprintln(&buf) + } + + switch td.Cmd { + case "begin": + txn = client.NewTxn(ctx, s.DB, 0) + ptxn() + + case "commit": + if err := txn.CommitOrCleanup(ctx); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } + + case "retry": + epochBefore := txn.Epoch() + retryErr := txn.GenerateForcedRetryableError(ctx, "forced retry") + epochAfter := txn.Epoch() + fmt.Fprintf(&buf, "synthetic error: %v\n", retryErr) + fmt.Fprintf(&buf, "epoch: %d -> %d\n", epochBefore, epochAfter) + + case "abort": + // TODO(knz): add code missing here: simulate an abort error + // that bumps the txn ID. + t.Error("unsupported yet") + + case "put": + if err := txn.Put(ctx, + roachpb.Key(td.CmdArgs[0].Key), + []byte(td.CmdArgs[1].Key)); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } + + case "get": + v, err := txn.Get(ctx, td.CmdArgs[0].Key) + if err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ba, _ := v.Value.GetBytes() + fmt.Fprintf(&buf, "%v -> %v\n", v.Key, string(ba)) + } + + case "savepoint": + spt, err := txn.CreateSavepoint(ctx) + if err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + sp[td.CmdArgs[0].Key] = spt + ptxn() + } + + case "release": + spn := td.CmdArgs[0].Key + spt := sp[spn] + if err := txn.ReleaseSavepoint(ctx, spt); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ptxn() + } + + case "rollback": + spn := td.CmdArgs[0].Key + spt := sp[spn] + if err := txn.RollbackToSavepoint(ctx, spt); err != nil { + fmt.Fprintf(&buf, "(%T) %v\n", err, err) + } else { + ptxn() + } + + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return buf.String() + }) + }) +} diff --git a/pkg/roachpb/ignored_seqnums.go b/pkg/roachpb/ignored_seqnums.go new file mode 100644 index 000000000000..70433209ff69 --- /dev/null +++ b/pkg/roachpb/ignored_seqnums.go @@ -0,0 +1,60 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + +// AddIgnoredSeqNumRange adds the given range to the given list of +// ignored seqnum ranges. +// +// The following invariants are assumed to hold and are preserved: +// - the list contains no overlapping ranges +// - the list contains no contiguous ranges +// - the list is sorted, with larger seqnums at the end +// +// Additionally, the caller must ensure: +// +// 1) if the new range overlaps with some range in the list, then it +// also overlaps with every subsequent range in the list. +// +// 2) the new range's "end" seqnum is larger or equal to the "end" +// seqnum of the last element in the list. +// +// For example: +// current list [3 5] [10 20] [22 24] +// new item: [8 26] +// final list: [3 5] [8 26] +// +// current list [3 5] [10 20] [22 24] +// new item: [28 32] +// final list: [3 5] [10 20] [22 24] [28 32] +// +// This corresponds to savepoints semantics: +// +// - Property 1 says that a rollback to an earlier savepoint +// rolls back over all writes following that savepoint. +// - Property 2 comes from that the new range's 'end' seqnum is the +// current write seqnum and thus larger than or equal to every +// previously seen value. +func AddIgnoredSeqNumRange( + list []enginepb.IgnoredSeqNumRange, newRange enginepb.IgnoredSeqNumRange, +) []enginepb.IgnoredSeqNumRange { + // Truncate the list at the last element not included in the new range. + i := 0 + for ; i < len(list); i++ { + if list[i].End < newRange.Start { + continue + } + break + } + list = list[:i] + return append(list, newRange) +} diff --git a/pkg/roachpb/ignored_seqnums_test.go b/pkg/roachpb/ignored_seqnums_test.go new file mode 100644 index 000000000000..ba5335d4924c --- /dev/null +++ b/pkg/roachpb/ignored_seqnums_test.go @@ -0,0 +1,53 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/stretchr/testify/require" +) + +func TestAddIgnoredSeqNumRange(t *testing.T) { + type r = enginepb.IgnoredSeqNumRange + + mr := func(a, b enginepb.TxnSeq) r { + return r{Start: a, End: b} + } + + testData := []struct { + list []r + newRange r + exp []r + }{ + {[]r{}, + mr(1, 2), + []r{mr(1, 2)}}, + {[]r{mr(1, 2)}, + mr(1, 4), + []r{mr(1, 4)}}, + {[]r{mr(1, 2), mr(3, 6)}, + mr(8, 10), + []r{mr(1, 2), mr(3, 6), mr(8, 10)}}, + {[]r{mr(1, 2), mr(5, 6)}, + mr(3, 8), + []r{mr(1, 2), mr(3, 8)}}, + {[]r{mr(1, 2), mr(5, 6)}, + mr(1, 8), + []r{mr(1, 8)}}, + } + + for _, tc := range testData { + nl := AddIgnoredSeqNumRange(tc.list, tc.newRange) + require.Equal(t, tc.exp, nl) + } +}