From aead1ca7526fd8bdca2db844afd3229aad99bcd2 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 28 Jun 2022 11:02:58 -0400 Subject: [PATCH 1/3] kvserver: add Time Bound Iteration to DeleteRange Previously, a kv client could only pass an AOST timestamp to a DeleteRange request. Now, the user can pass a lower bound timestamp, causing the kvserver to leverage time bound iteration while issuing delete requests. Specifically, the server uses an MVCCIncrementalIterator to iterate over the target span at the client provided time bounds, track a continuous run of keys in that time bound, and flush the run via point and MVCC range tombstones depending on the size of the run. In a future pr, this operation will replace the use of RevertRange during IMPORT INTO rollbacks to make them MVCC compatible. Informs #70428 Release note: none --- pkg/kv/kvserver/batcheval/cmd_delete_range.go | 58 ++- .../batcheval/cmd_delete_range_test.go | 303 +++++++++----- pkg/roachpb/api.proto | 28 ++ pkg/storage/mvcc.go | 301 +++++++++++++- pkg/storage/mvcc_history_test.go | 37 ++ .../mvcc_histories/delete_range_predicate | 382 ++++++++++++++++++ 6 files changed, 999 insertions(+), 110 deletions(-) create mode 100644 pkg/storage/testdata/mvcc_histories/delete_range_predicate diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index f0c0c52cd82e..eb4a0c0eb1f9 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -58,6 +58,8 @@ func declareKeysDeleteRange( } } +const maxDeleteRangeBatchBytes = 32 << 20 + // DeleteRange deletes the range of key/value pairs specified by // start and end keys. func DeleteRange( @@ -67,7 +69,14 @@ func DeleteRange( h := cArgs.Header reply := resp.(*roachpb.DeleteRangeResponse) - // Use experimental MVCC range tombstone if requested. + if args.Predicates != (roachpb.DeleteRangePredicates{}) && !args.UseRangeTombstone { + // This ensures predicate based DeleteRange piggybacks on the version gate, + // roachpb api flags, and latch declarations used by the UseRangeTombstone. + return result.Result{}, errors.AssertionFailedf( + "UseRangeTombstones must be passed with predicate based Delete Range") + } + + // Use MVCC range tombstone if requested. if args.UseRangeTombstone { if cArgs.Header.Txn != nil { return result.Result{}, ErrTransactionUnsupported @@ -79,14 +88,55 @@ func DeleteRange( return result.Result{}, errors.AssertionFailedf( "ReturnKeys can't be used with range tombstones") } - desc := cArgs.EvalCtx.Desc() leftPeekBound, rightPeekBound := rangeTombstonePeekBounds( args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()) maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV) - err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats, - args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, maxIntents) + if args.Predicates == (roachpb.DeleteRangePredicates{}) { + // If no predicate parameters are passed, use the fast path. + err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats, + args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, maxIntents) + return result.Result{}, err + } + + if h.MaxSpanRequestKeys == 0 { + // In production, MaxSpanRequestKeys must be greater than zero to ensure + // the DistSender serializes predicate based DeleteRange requests across + // ranges. This ensures that only one resumeSpan gets returned to the + // client. + // + // Also, note that DeleteRangeUsingTombstone requests pass the isAlone + // flag in roachpb.api.proto, ensuring the requests do not intermingle with + // other types of requests, preventing further resume span muddling. + return result.Result{}, errors.AssertionFailedf( + "MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange") + } + // TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have + // been further optimized. + defaultRangeTombstoneThreshold := int64(64) + resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats, + args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, + args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes, + defaultRangeTombstoneThreshold, maxIntents) + + if resumeSpan != nil { + reply.ResumeSpan = resumeSpan + reply.ResumeReason = roachpb.RESUME_KEY_LIMIT + + // Note: While MVCCPredicateDeleteRange _could_ return reply.NumKeys, as + // the number of keys iterated through, doing so could lead to a + // significant performance drawback. The DistSender would have used + // NumKeys to subtract the number of keys processed by one range from the + // MaxSpanRequestKeys limit given to the next range. In this case, that's + // specifically not what we want, because this request does not use the + // normal meaning of MaxSpanRequestKeys (i.e. number of keys to return). + // Here, MaxSpanRequest keys is used to limit the number of tombstones + // written. Thus, setting NumKeys would just reduce the limit available to + // the next range for no good reason. + } + // Return result is always empty, since the reply is populated into the + // passed in resp pointer. return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go b/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go index 9692e243dc69..4e454a1001ea 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go @@ -12,6 +12,8 @@ package batcheval import ( "context" + "fmt" + "math" "testing" "github.com/cockroachdb/cockroach/pkg/keys" @@ -27,8 +29,8 @@ import ( "github.com/stretchr/testify/require" ) -// TestDeleteRangeTombstone tests DeleteRange range tombstones directly, using -// only a Pebble engine. +// TestDeleteRangeTombstone tests DeleteRange range tombstones and predicated based DeleteRange +// directly, using only a Pebble engine. // // MVCC range tombstone logic is tested exhaustively in the MVCC history tests, // this just tests the RPC plumbing. @@ -76,6 +78,11 @@ func TestDeleteRangeTombstone(t *testing.T) { inline bool returnKeys bool expectErr interface{} // error type, substring, or true (any) + + // The fields below test predicate based delete range rpc plumbing. + predicateStartTime int64 // if set, the test will only run with predicate based delete range + onlyPointKeys bool // if set UsingRangeTombstone arg is set to false + maxBatchSize int64 // if predicateStartTime is set, then MaxBatchSize must be set }{ "above points succeed": { start: "a", @@ -142,130 +149,216 @@ func TestDeleteRangeTombstone(t *testing.T) { ts: 1e9, expectErr: &roachpb.WriteTooOldError{}, }, + "predicate without UsingRangeTombstone error": { + start: "a", + end: "f", + ts: 10e9, + predicateStartTime: 1, + maxBatchSize: maxDeleteRangeBatchBytes, + onlyPointKeys: true, + expectErr: "UseRangeTombstones must be passed with predicate based Delete Range", + }, + "predicate maxBatchSize error": { + start: "a", + end: "f", + ts: 10e9, + predicateStartTime: 1, + maxBatchSize: 0, + expectErr: "MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange", + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { - ctx := context.Background() - st := cluster.MakeTestingClusterSettings() - engine := storage.NewDefaultInMemForTesting() - defer engine.Close() - - writeInitialData(t, ctx, engine) + for _, runWithPredicates := range []bool{false, true} { + if tc.predicateStartTime > 0 && !runWithPredicates { + continue + } + t.Run(fmt.Sprintf("Predicates=%v", runWithPredicates), func(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + engine := storage.NewDefaultInMemForTesting() + defer engine.Close() - rangeKey := storage.MVCCRangeKey{ - StartKey: roachpb.Key(tc.start), - EndKey: roachpb.Key(tc.end), - Timestamp: hlc.Timestamp{WallTime: tc.ts}, - } + writeInitialData(t, ctx, engine) - // Prepare the request and environment. - evalCtx := &MockEvalCtx{ - ClusterSettings: st, - Desc: &roachpb.RangeDescriptor{ - StartKey: roachpb.RKey(rangeStart), - EndKey: roachpb.RKey(rangeEnd), - }, - } - - h := roachpb.Header{ - Timestamp: rangeKey.Timestamp, - } - if tc.txn { - txn := roachpb.MakeTransaction("txn", nil /* baseKey */, roachpb.NormalUserPriority, rangeKey.Timestamp, 0, 0) - h.Txn = &txn - } + rangeKey := storage.MVCCRangeKey{ + StartKey: roachpb.Key(tc.start), + EndKey: roachpb.Key(tc.end), + Timestamp: hlc.Timestamp{WallTime: tc.ts}, + } - req := &roachpb.DeleteRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: rangeKey.StartKey, - EndKey: rangeKey.EndKey, - }, - UseRangeTombstone: true, - Inline: tc.inline, - ReturnKeys: tc.returnKeys, - } + // Prepare the request and environment. + evalCtx := &MockEvalCtx{ + ClusterSettings: st, + Desc: &roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(rangeStart), + EndKey: roachpb.RKey(rangeEnd), + }, + } - ms := computeStats(t, engine, rangeStart, rangeEnd, rangeKey.Timestamp.WallTime) + h := roachpb.Header{ + Timestamp: rangeKey.Timestamp, + } + if tc.txn { + txn := roachpb.MakeTransaction("txn", nil /* baseKey */, roachpb.NormalUserPriority, rangeKey.Timestamp, 0, 0) + h.Txn = &txn + } + var predicates roachpb.DeleteRangePredicates + if runWithPredicates { + predicates = roachpb.DeleteRangePredicates{ + StartTime: hlc.Timestamp{WallTime: 1}, + } + h.MaxSpanRequestKeys = math.MaxInt64 + } + if tc.predicateStartTime > 0 { + predicates = roachpb.DeleteRangePredicates{ + StartTime: hlc.Timestamp{WallTime: tc.predicateStartTime}, + } + h.MaxSpanRequestKeys = tc.maxBatchSize + } - // Use a spanset batch to assert latching of all accesses. In particular, - // the additional seeks necessary to check for adjacent range keys that we - // may merge with (for stats purposes) which should not cross the range - // bounds. - var latchSpans, lockSpans spanset.SpanSet - declareKeysDeleteRange(evalCtx.Desc, &h, req, &latchSpans, &lockSpans, 0) - batch := spanset.NewBatchAt(engine.NewBatch(), &latchSpans, h.Timestamp) - defer batch.Close() + req := &roachpb.DeleteRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: rangeKey.StartKey, + EndKey: rangeKey.EndKey, + }, + UseRangeTombstone: !tc.onlyPointKeys, + Inline: tc.inline, + ReturnKeys: tc.returnKeys, + Predicates: predicates, + } - // Run the request. - resp := &roachpb.DeleteRangeResponse{} - _, err := DeleteRange(ctx, batch, CommandArgs{ - EvalCtx: evalCtx.EvalContext(), - Stats: &ms, - Now: now, - Header: h, - Args: req, - }, resp) + ms := computeStats(t, engine, rangeStart, rangeEnd, rangeKey.Timestamp.WallTime) - // Check the error. - if tc.expectErr != nil { - require.Error(t, err) - if b, ok := tc.expectErr.(bool); ok && b { - // any error is fine - } else if expectMsg, ok := tc.expectErr.(string); ok { - require.Contains(t, err.Error(), expectMsg) - } else if e, ok := tc.expectErr.(error); ok { - require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) - } else { - require.Fail(t, "invalid expectErr", "expectErr=%v", tc.expectErr) - } - return - } - require.NoError(t, err) - require.NoError(t, batch.Commit(true)) + // Use a spanset batch to assert latching of all accesses. In particular, + // the additional seeks necessary to check for adjacent range keys that we + // may merge with (for stats purposes) which should not cross the range + // bounds. + var latchSpans, lockSpans spanset.SpanSet + declareKeysDeleteRange(evalCtx.Desc, &h, req, &latchSpans, &lockSpans, 0) + batch := spanset.NewBatchAt(engine.NewBatch(), &latchSpans, h.Timestamp) + defer batch.Close() - // Check that the range tombstone was written successfully. - iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ - KeyTypes: storage.IterKeyTypeRangesOnly, - LowerBound: rangeKey.StartKey, - UpperBound: rangeKey.EndKey, - }) - defer iter.Close() - iter.SeekGE(storage.MVCCKey{Key: rangeKey.StartKey}) + // Run the request. + resp := &roachpb.DeleteRangeResponse{} + _, err := DeleteRange(ctx, batch, CommandArgs{ + EvalCtx: evalCtx.EvalContext(), + Stats: &ms, + Now: now, + Header: h, + Args: req, + }, resp) - var seen storage.MVCCRangeKeyValue - for { - ok, err := iter.Valid() - require.NoError(t, err) - if !ok { - break - } - require.True(t, ok) - for _, rkv := range iter.RangeKeys() { - if rkv.RangeKey.Timestamp.Equal(rangeKey.Timestamp) { - if len(seen.RangeKey.StartKey) == 0 { - seen = rkv.Clone() + // Check the error. + if tc.expectErr != nil { + require.Error(t, err) + if b, ok := tc.expectErr.(bool); ok && b { + // any error is fine + } else if expectMsg, ok := tc.expectErr.(string); ok { + require.Contains(t, err.Error(), expectMsg) + } else if e, ok := tc.expectErr.(error); ok { + require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) } else { - seen.RangeKey.EndKey = rkv.RangeKey.EndKey.Clone() - require.Equal(t, seen.Value, rkv.Value) + require.Fail(t, "invalid expectErr", "expectErr=%v", tc.expectErr) } - break + return } - } - iter.Next() - } - require.Equal(t, rangeKey, seen.RangeKey) + require.NoError(t, err) + require.NoError(t, batch.Commit(true)) - value, err := storage.DecodeMVCCValue(seen.Value) - require.NoError(t, err) - require.True(t, value.IsTombstone()) - require.Equal(t, now, value.LocalTimestamp) + if runWithPredicates { + checkPredicateDeleteRange(t, engine, rangeKey) + } else { + checkDeleteRangeTombstone(t, engine, rangeKey, now) + } - // Check that range tombstone stats were updated correctly. - require.Equal(t, computeStats(t, engine, rangeStart, rangeEnd, rangeKey.Timestamp.WallTime), ms) + // Check that range tombstone stats were updated correctly. + require.Equal(t, computeStats(t, engine, rangeStart, rangeEnd, rangeKey.Timestamp.WallTime), ms) + }) + } }) } } +// checkDeleteRangeTombstone checks that the span targeted by the predicate +// based delete range operation only has point tombstones, as the size of the +// spans in this test are below rangeTombstoneThreshold +// +// the passed in rangekey contains info on the span PredicateDeleteRange +// operated on. The command should not have written an actual rangekey! +func checkPredicateDeleteRange(t *testing.T, engine storage.Reader, rKeyInfo storage.MVCCRangeKey) { + + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: rKeyInfo.StartKey, + UpperBound: rKeyInfo.EndKey, + }) + defer iter.Close() + + for iter.SeekGE(storage.MVCCKey{Key: rKeyInfo.StartKey}); ; iter.NextKey() { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + hasPoint, hashRange := iter.HasPointAndRange() + if !hasPoint && hashRange { + // PredicateDeleteRange should not have written any delete tombstones; + // therefore, any range key tombstones in the span should have been + // written before the request was issued. + for _, rKey := range iter.RangeKeys() { + require.Equal(t, true, rKey.RangeKey.Timestamp.Less(rKeyInfo.Timestamp)) + } + continue + } + value, err := storage.DecodeMVCCValue(iter.UnsafeValue()) + require.NoError(t, err) + require.True(t, value.IsTombstone()) + } +} + +// checkDeleteRangeTombstone checks that the range tombstone was written successfully. +func checkDeleteRangeTombstone( + t *testing.T, engine storage.Reader, rangeKey storage.MVCCRangeKey, now hlc.ClockTimestamp, +) { + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + KeyTypes: storage.IterKeyTypeRangesOnly, + LowerBound: rangeKey.StartKey, + UpperBound: rangeKey.EndKey, + }) + defer iter.Close() + iter.SeekGE(storage.MVCCKey{Key: rangeKey.StartKey}) + + var seen storage.MVCCRangeKeyValue + for { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + require.True(t, ok) + for _, rkv := range iter.RangeKeys() { + if rkv.RangeKey.Timestamp.Equal(rangeKey.Timestamp) { + if len(seen.RangeKey.StartKey) == 0 { + seen = rkv.Clone() + } else { + seen.RangeKey.EndKey = rkv.RangeKey.EndKey.Clone() + require.Equal(t, seen.Value, rkv.Value) + } + break + } + } + iter.Next() + } + rangeKey.StartKey.Equal(seen.RangeKey.StartKey) + require.Equal(t, rangeKey, seen.RangeKey) + + value, err := storage.DecodeMVCCValue(seen.Value) + require.NoError(t, err) + require.True(t, value.IsTombstone()) + require.Equal(t, now, value.LocalTimestamp) +} + // computeStats computes MVCC stats for the given range. // // TODO(erikgrinaker): This, storage.computeStats(), and engineStats() should be diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index cb94741f68a3..85afc74f43c3 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -356,6 +356,34 @@ message DeleteRangeRequest { // The caller must check the MVCCRangeTombstones version gate before using // this parameter, as it is new in 22.2. bool use_range_tombstone = 5; + + DeleteRangePredicates predicates = 6 [(gogoproto.nullable) = false]; +} + +// DeleteRangePredicates if specified, will conduct predicate based DeleteRange. +// Predicate based delete range will issue tombstones on live keys that match the +// filters provided by the caller. In particular, long runs of matched keys will +// get deleted with a range tombstone, while smaller runs will get deleted with +// point tombstones. Note that the keyspace across runs does not overlap. +// +// To pass DeleteRangePredicates, the client must also pass UseRangeTombstone. +message DeleteRangePredicates { + // StartTime specifies an exclusive lower bound to surface keys + // for deletion. If specified, DeleteRange will only issue tombstones to keys + // within the span [startKey, endKey) that also have MVCC versions with + // timestamps between (startTime, endTime), where endTime is the request timestamp. + // + // The main application for this is a rollback of IMPORT INTO on a non-empty + // table. Here, DeleteRange with startTime = ImportStartTime, must only delete + // keys written by the import. In other words, older, pre-import, data cannot + // be touched. Because IMPORT INTO takes a table offline and does not allow + // masking an existing key, this operation will not issue tombstones to + // pre-import data that were written at or below StartTime. + // + // In other words, this operation assumes that for a k@t in the importing table: + // - t must be < endTime + // - if t in (startTime, endTime), then there is no other k@t' where t' <= startTime. + util.hlc.Timestamp start_time = 6 [(gogoproto.nullable) = false]; } // A DeleteRangeResponse is the return value from the DeleteRange() diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 41c3e00f7f84..705ddc85bde9 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2321,8 +2321,16 @@ func MVCCClearTimeRange( }) defer iter.Close() + // clearedMetaKey is the latest surfaced key that will get cleared var clearedMetaKey MVCCKey - var clearedMeta, restoredMeta enginepb.MVCCMetadata + + // clearedMeta contains metadata on the clearedMetaKey + var clearedMeta enginepb.MVCCMetadata + + // restoredMeta contains metadata on the previous version the clearedMetaKey. + // Once the key in clearedMetaKey is cleared, the key represented in + // restoredMeta becomes the latest version of this MVCC key. + var restoredMeta enginepb.MVCCMetadata iter.SeekGE(MVCCKey{Key: key}) for { if ok, err := iter.Valid(); err != nil { @@ -2466,6 +2474,297 @@ func MVCCDeleteRange( return keys, res.ResumeSpan, res.NumKeys, nil } +// MVCCPredicateDeleteRange issues MVCC tombstones at endTime to live keys +// within the span [startKey, endKey) that also have MVCC versions that match +// the predicate filters. Long runs of matched keys will get deleted with a +// range Tombstone, while smaller runs will get deleted with point tombstones. +// The keyspaces of each run do not overlap. +// +// This operation is non-transactional, but will check for existing intents in +// the target key span, regardless of timestamp, and return a WriteIntentError +// containing up to maxIntents intents. +// +// MVCCPredicateDeleteRange will return with a resumeSpan if the number of tombstones +// written exceeds maxBatchSize or the size of the written tombstones exceeds maxByteSize. +// These constraints prevent overwhelming raft. +// +// If an MVCC key surfaced has a timestamp at or above endTime, +// MVCCPredicateDeleteRange returns a WriteTooOldError without a resumeSpan, +// even if tombstones were already written to disk. To resolve, the caller +// should retry the call at a higher timestamp, assuming they have the +// appropriate level of isolation (e.g. the span covers an offline table, in the +// case of IMPORT rollbacks). +// +// An example of how this works: Issuing DeleteRange[a,e)@3 with +// Predicate{StartTime=1} on the following keys would issue tombstones at a@3, +// b@3, and d@3. +// +// t3 +// t2 a2 b2 d2 e2 +// t1 b1 c1 +// a b c d e +func MVCCPredicateDeleteRange( + ctx context.Context, + rw ReadWriter, + ms *enginepb.MVCCStats, + startKey, endKey roachpb.Key, + endTime hlc.Timestamp, + localTimestamp hlc.ClockTimestamp, + leftPeekBound, rightPeekBound roachpb.Key, + predicates roachpb.DeleteRangePredicates, + maxBatchSize, maxBatchByteSize int64, + rangeTombstoneThreshold int64, + maxIntents int64, +) (*roachpb.Span, error) { + + if maxBatchSize == 0 { + // Set maxBatchSize to a large number to ensure MVCCPredicateDeleteRange + // doesn't return early due to batch size. Note that maxBatchSize is only + // set to 0 during testing. + maxBatchSize = math.MaxInt64 + } + + // batchSize is the number tombstones (point and range) that have been flushed. + var batchSize int64 + var batchByteSize int64 + + // runSize is the number tombstones (point and range) that will get flushed in + // the current run. + var runSize int64 + var runByteSize int64 + + var runStart, runEnd roachpb.Key + + buf := make([]roachpb.Key, rangeTombstoneThreshold) + + if ms == nil { + return nil, errors.AssertionFailedf( + "MVCCStats passed in to MVCCPredicateDeleteRange must be non-nil to ensure proper stats" + + " computation during Delete operations") + } + + // Check for any overlapping intents, and return them to be resolved. + if intents, err := ScanIntents(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + return nil, err + } else if len(intents) > 0 { + return nil, &roachpb.WriteIntentError{Intents: intents} + } + + // continueRun returns three bools: the first is true if the current run + // should continue; the second is true if the latest key is a point tombstone; + // the third is true if the latest key is a range tombstone. If a non-nil + // error is returned, the booleans are invalid. The run should continue if: + // + // 1) The latest version of the key is a point or range tombstone, with a + // timestamp below the client provided EndTime. Since the goal is to create + // long runs, any tombstoned key should continue the run. + // + // 2) The latest key is live, matches the predicates, and has a + // timestamp below EndTime. + continueRun := func(k MVCCKey, iter SimpleMVCCIterator, + ) (toContinue bool, isPointTombstone bool, isRangeTombstone bool, err error) { + hasPointKey, hasRangeKey := iter.HasPointAndRange() + if hasRangeKey { + // TODO (msbutler): cache the range keys while the range bounds remain + // constant, since iter.RangeKeys() is expensive. Manual caching may not be necessary if + // https://github.com/cockroachdb/cockroach/issues/84379 lands. + rangeKeys := iter.RangeKeys() + if endTime.LessEq(rangeKeys[0].RangeKey.Timestamp) { + return false, false, false, roachpb.NewWriteTooOldError(endTime, + rangeKeys[0].RangeKey.Timestamp.Next(), k.Key.Clone()) + } + if !hasPointKey { + // landed on bare range key. + return true, false, true, nil + } + if k.Timestamp.Less(rangeKeys[0].RangeKey.Timestamp) { + // The latest range tombstone shadows the point key; ok to continue run. + return true, false, true, nil + } + } + + // At this point, there exists a point key that shadows all range keys, + // if they exist. + vRaw := iter.UnsafeValue() + + if endTime.LessEq(k.Timestamp) { + return false, false, false, roachpb.NewWriteTooOldError(endTime, k.Timestamp.Next(), + k.Key.Clone()) + } + if len(vRaw) == 0 { + // The latest version of the key is a point tombstone. + return true, true, false, nil + } + + // The latest key is a live point key. Conduct predicate filtering. + if k.Timestamp.LessEq(predicates.StartTime) { + return false, false, false, nil + } + + // TODO (msbutler): use MVCCValueHeader to match on job ID predicate + _, err = DecodeMVCCValue(vRaw) + if err != nil { + return false, false, false, err + } + return true, false, false, nil + } + + // Create some reusable machinery for flushing a run with point tombstones + // that is typically used in a single MVCCPut call. + pointTombstoneIter := newMVCCIterator(rw, endTime, false /* rangeKeyMasking */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }) + defer pointTombstoneIter.Close() + pointTombstoneBuf := newPutBuffer() + defer pointTombstoneBuf.release() + + flushDeleteKeys := func() error { + if runSize == 0 { + return nil + } + if runSize >= rangeTombstoneThreshold || + // Even if we didn't get a large enough number of keys to switch to + // using range tombstones, the byte size of the keys we did get is now too large to + // encode them all within the byte size limit, so use a range tombstone anyway. + batchByteSize+runByteSize >= maxBatchByteSize { + if err := MVCCDeleteRangeUsingTombstone(ctx, rw, ms, + runStart, runEnd.Next(), endTime, localTimestamp, leftPeekBound, rightPeekBound, + maxIntents); err != nil { + return err + } + batchByteSize += int64(MVCCRangeKey{StartKey: runStart, EndKey: runEnd, Timestamp: endTime}.EncodedSize()) + batchSize++ + } else { + // Use Point tombstones + for i := int64(0); i < runSize; i++ { + if err := mvccPutInternal(ctx, rw, pointTombstoneIter, ms, buf[i], endTime, localTimestamp, noValue, + nil, pointTombstoneBuf, nil); err != nil { + return err + } + } + batchByteSize += runByteSize + batchSize += runSize + } + runSize = 0 + runStart = roachpb.Key{} + return nil + } + + // Using the IncrementalIterator with the time-bound iter optimization could + // potentially be a big win here -- the expected use-case for this is to run + // over an entire table's span with a very recent timestamp, issuing tombstones to + // writes of some failed IMPORT and that could very likely only have hit + // some small subset of the table's keyspace. + // + // The MVCCIncrementalIterator uses a non-time-bound iter as its source + // of truth, and only uses the TBI iterator as an optimization when finding + // the next KV to iterate over. This pattern allows us to quickly skip over + // swaths of uninteresting keys, but then iterates over the latest key of each MVCC key. + // + // Notice that the iterator's EndTime is set to hlc.MaxTimestamp, in order to + // detect and fail on any keys written at or after the client provided + // endTime. We don't _expect_ to hit intents or newer keys in the client + // provided span since the MVCCPredicateDeleteRange is only intended for + // non-live key spans, but there could be an intent leftover. + iter := NewMVCCIncrementalIterator(rw, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: predicates.StartTime, + EndTime: hlc.MaxTimestamp, + RangeKeyMaskingBelow: endTime, + KeyTypes: IterKeyTypePointsAndRanges, + }) + defer iter.Close() + + iter.SeekGE(MVCCKey{Key: startKey}) + for { + if ok, err := iter.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + k := iter.UnsafeKey() + toContinue, isPointTombstone, isRangeTombstone, err := continueRun(k, iter) + if err != nil { + return nil, err + } + + // If the latest version of the key is a tombstone at a timestamp < endtime, + // the timestamp could be less than predicates.startTime. In this case, the + // run can continue and Since there's no need to issue another tombstone, + // don't update runSize or buf. + if isRangeTombstone { + // Because range key information can be inferred at point keys, + // skip over the surfaced range key, and reason about shadowed keys at + // the surfaced point key. + // + // E.g. Scanning the keys below: + // 2 a2 + // 1 o---o + // a b + // + // would result in two surfaced keys: + // {a-b}@1; + // a2, {a-b}@1 + // + // Note that the range key gets surfaced before the point key, + // even though the point key shadows it. + iter.NextIgnoringTime() + } else if isPointTombstone { + // Since the latest version of this key is a point tombstone, skip over + // older versions of this key, and move the iterator to the next key + // even if it lies outside (startTime, endTime), to see if there's a + // need to flush. + iter.NextKeyIgnoringTime() + } else if toContinue { + // The latest version of the key is live, matches the predicate filters + // -- e.g. has a timestamp between (predicates.startTime, Endtime); + // therefore, plan to delete it. + if batchSize+runSize >= maxBatchSize || batchByteSize+runByteSize >= maxBatchByteSize { + // The matched key will be the start the resume span. + if err := flushDeleteKeys(); err != nil { + return nil, err + } + return &roachpb.Span{Key: k.Key.Clone(), EndKey: endKey}, nil + } + if runSize == 0 { + runStart = append(runStart[:0], k.Key...) + } + runEnd = append(runEnd[:0], k.Key...) + + if runSize < rangeTombstoneThreshold { + // Only buffer keys if there's a possibility of issuing point tombstones. + // + // To avoid unecessary memory allocation, overwrite the previous key at + // buffer's current position. No data corruption occurs because the + // buffer is flushed up to runSize. + buf[runSize] = append(buf[runSize][:0], runEnd...) + } + + runSize++ + runByteSize += int64(k.EncodedSize()) + + // Move the iterator to the next key in linear iteration even if it lies + // outside (startTime, endTime), to see if there's a need to flush. We can + // skip to the next key, as we don't care about older versions of the + // current key we're about to delete. + iter.NextKeyIgnoringTime() + } else { + // This key does not match. Flush the run of matching keys, + // to prevent issuing tombstones on keys that do not match the predicates. + if err := flushDeleteKeys(); err != nil { + return nil, err + } + // Move the incremental iterator to the next valid MVCC key that can be + // deleted. If TBI was enabled when initializing the incremental iterator, + // this step could jump over large swaths of keys that do not qualify for + // clearing. + iter.NextKey() + } + } + return nil, flushDeleteKeys() +} + // MVCCDeleteRangeUsingTombstone deletes the given MVCC keyspan at the given // timestamp using an MVCC range tombstone (rather than MVCC point tombstones). // This operation is non-transactional, but will check for existing intents and diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index ec8e44e2fe0e..bd753e7a3494 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -13,6 +13,7 @@ package storage import ( "context" "fmt" + "math" "path/filepath" "regexp" "sort" @@ -73,6 +74,7 @@ var sstIterVerify = util.ConstantWithMetamorphicTestBool("mvcc-histories-sst-ite // del [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= // del_range [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [end=] [max=] [returnKeys] // del_range_ts [ts=[,]] [localTs=[,]] k= end= +// del_range_pred [ts=[,]] [localTs=[,]] k= end= [startTime=,max=,maxBytes=,rangeThreshold=] // increment [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [inc=] // initput [t=] [ts=[,]] [resolve [status=]] k= v= [raw] [failOnTombstones] // merge [t=] [ts=[,]] [resolve [status=]] k= v= [raw] @@ -659,6 +661,7 @@ var commands = map[string]cmd{ "del": {typDataUpdate, cmdDelete}, "del_range": {typDataUpdate, cmdDeleteRange}, "del_range_ts": {typDataUpdate, cmdDeleteRangeTombstone}, + "del_range_pred": {typDataUpdate, cmdDeleteRangePredicate}, "export": {typReadOnly, cmdExport}, "get": {typReadOnly, cmdGet}, "increment": {typDataUpdate, cmdIncrement}, @@ -1019,6 +1022,40 @@ func cmdDeleteRangeTombstone(e *evalCtx) error { }) } +func cmdDeleteRangePredicate(e *evalCtx) error { + key, endKey := e.getKeyRange() + ts := e.getTs(nil) + localTs := hlc.ClockTimestamp(e.getTsWithName("localTs")) + + max := math.MaxInt64 + if e.hasArg("max") { + e.scanArg("max", &max) + } + + maxBytes := math.MaxInt64 + if e.hasArg("maxBytes") { + e.scanArg("maxBytes", &maxBytes) + } + predicates := roachpb.DeleteRangePredicates{ + StartTime: e.getTsWithName("startTime"), + } + rangeThreshold := 64 + if e.hasArg("rangeThreshold") { + e.scanArg("rangeThreshold", &rangeThreshold) + } + return e.withWriter("del_range_pred", func(rw ReadWriter) error { + resumeSpan, err := MVCCPredicateDeleteRange(e.ctx, rw, e.ms, key, endKey, ts, + localTs, nil, nil, predicates, int64(max), int64(maxBytes), int64(rangeThreshold), 0) + + if resumeSpan != nil { + e.results.buf.Printf("del_range_pred: resume span [%s,%s)\n", resumeSpan.Key, + resumeSpan.EndKey) + } + return err + }, + ) +} + func cmdGet(e *evalCtx) error { txn := e.getTxn(optional) key := e.getKey() diff --git a/pkg/storage/testdata/mvcc_histories/delete_range_predicate b/pkg/storage/testdata/mvcc_histories/delete_range_predicate new file mode 100644 index 000000000000..b3c8e31b4b0d --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/delete_range_predicate @@ -0,0 +1,382 @@ +# Tests MVCC Del Range with timestamp predicate. +# +# Set up some point keys, point tombstones x, range tombstones o--o, +# and intents []. +# +# 7 [i7] +# 6 +# 5 +# 4 x d4 f4 x h4 o-------------------o +# 3 b3 +# 2 a2 e2 g2 +# 1 d1 +# 0 +# a b c d e f g h i j k l m n o p +run ok +put k=a ts=2 v=a2 +del k=a ts=4 +put k=b ts=3 v=b3 +put k=d ts=1 v=d1 +put k=d ts=4 v=d4 +put k=e ts=2 v=e2 +put k=f ts=4 v=f4 +put k=g ts=2 v=g2 +del k=g ts=4 +put k=h ts=4 v=h4 +del_range_ts k=k end=p ts=4 +with t=A + txn_begin ts=7 + put k=i v=i7 +---- +>> at end: +txn: "A" meta={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=7.000000000,0 wto=false gul=0,0 +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 + +# Writing next to or above point keys and tombstones should work. +run stats ok +del_range_pred k=a end=i ts=5 startTime=3 rangeThreshold=2 +---- +>> del_range_pred k=a end=i ts=5 startTime=3 rangeThreshold=2 +stats: key_bytes=+12 val_count=+1 range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-3 live_bytes=-63 gc_bytes_age=+8455 +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +stats: key_count=8 key_bytes=160 val_count=12 val_bytes=111 range_key_count=2 range_key_bytes=27 range_val_count=2 live_count=3 live_bytes=111 gc_bytes_age=17863 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# error on intent, no tombstones should be written +run stats error +del_range_pred k=a end=p ts=6 startTime=1 +---- +>> del_range_pred k=a end=p ts=6 startTime=1 +stats: no change +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +stats: key_count=8 key_bytes=160 val_count=12 val_bytes=111 range_key_count=2 range_key_bytes=27 range_val_count=2 live_count=3 live_bytes=111 gc_bytes_age=17863 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 +error: (*roachpb.WriteIntentError:) conflicting intents on "i" + +# error encountering point key at d5. +# a tombstone should not get written at c5 or e5, since +# DeleteRange didn't flush before reaching d5. +run stats error +put k=c ts=2 v=c2 +del_range_pred k=c end=f ts=5 startTime=1 +---- +>> put k=c ts=2 v=c2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+21 +>> del_range_pred k=c end=f ts=5 startTime=1 +stats: no change +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +stats: key_count=9 key_bytes=174 val_count=13 val_bytes=118 range_key_count=2 range_key_bytes=27 range_val_count=2 live_count=4 live_bytes=132 gc_bytes_age=17863 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 +error: (*roachpb.WriteTooOldError:) WriteTooOldError: write for key "d" at timestamp 5.000000000,0 too old; wrote at 5.000000000,1 + +# error encountering range key at k4. +# a tombstones should not get written to j4 or q4 since +# DeleteRange did not flush before reaching rangekey {k-p}4. +run stats error +put k=j ts=2 v=j2 +put k=q ts=2 v=q2 +del_range_pred k=j end=r ts=4 startTime=1 rangeThreshold=2 +---- +>> put k=j ts=2 v=j2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+21 +>> put k=q ts=2 v=q2 +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+21 +>> del_range_pred k=j end=r ts=4 startTime=1 rangeThreshold=2 +stats: no change +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=202 val_count=15 val_bytes=132 range_key_count=2 range_key_bytes=27 range_val_count=2 live_count=6 live_bytes=174 gc_bytes_age=17863 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 +error: (*roachpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 4.000000000,0 too old; wrote at 4.000000000,1 + +# At this point the keyspace looks like this: +# 7 [i7] +# 6 +# 5 x o-----------o +# 4 x d4 f4 x h4 o-------------------o +# 3 b3 +# 2 a2 c2 e2 g2 j2 q2 +# 1 d1 +# 0 +# a b c d e f g h i j k l m n o p q + +# check that del_range will not write anything if no live keys are in its span +# and predicate ts. Note that the range keys bounds are [firstMatchingKey,LastMatchingKey.Next()]. +run stats ok +del_range_pred k=j end=r ts=5 startTime=2 rangeThreshold=2 +---- +>> del_range_pred k=j end=r ts=5 startTime=2 rangeThreshold=2 +stats: no change +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {k-p}/[4.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=202 val_count=15 val_bytes=132 range_key_count=2 range_key_bytes=27 range_val_count=2 live_count=6 live_bytes=174 gc_bytes_age=17863 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# try the same call as above, except with startTime set to 1 +# check that delrange properly continues the run over a range tombstone +run stats ok +del_range_pred k=j end=r ts=5 startTime=1 rangeThreshold=2 +---- +>> del_range_pred k=j end=r ts=5 startTime=1 rangeThreshold=2 +stats: range_key_count=+2 range_key_bytes=+36 range_val_count=+3 live_count=-2 live_bytes=-42 gc_bytes_age=+7406 +>> at end: +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {j-k}/[5.000000000,0=/] +rangekey: {k-p}/[5.000000000,0=/ 4.000000000,0=/] +rangekey: {p-q\x00}/[5.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=202 val_count=15 val_bytes=132 range_key_count=4 range_key_bytes=63 range_val_count=5 live_count=4 live_bytes=132 gc_bytes_age=25269 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# check that we flush with a range tombstone, if maxBytes is exceeded +# even though range tombstone threshold has not been met. +# Return a resume span. Note that the run extends past key d, since +# its latest value is a point tombstone, and is therefore not counted +# in runByteSize. +run stats ok +del_range_pred k=c end=i ts=6 startTime=1 maxBytes=1 +---- +>> del_range_pred k=c end=i ts=6 startTime=1 maxBytes=1 +del_range_pred: resume span ["e","i") +stats: range_key_count=+1 range_key_bytes=+14 range_val_count=+1 live_count=-1 live_bytes=-21 gc_bytes_age=+3290 +>> at end: +rangekey: c{-\x00}/[6.000000000,0=/] +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {j-k}/[5.000000000,0=/] +rangekey: {k-p}/[5.000000000,0=/ 4.000000000,0=/] +rangekey: {p-q\x00}/[5.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=202 val_count=15 val_bytes=132 range_key_count=5 range_key_bytes=77 range_val_count=6 live_count=3 live_bytes=111 gc_bytes_age=28559 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# check that we flush properly if maxBatchSize is exceeded. +# Since max is 1, write a tombstone to e, and as soon as it sees the +# next eligible key to delete (f), return a resume span. +# Note that we dont count shadowed tombstones in the batchSize +run stats ok +put k=f ts=6 v=f6 +del_range_pred k=c end=i ts=7 startTime=1 max=1 +---- +>> put k=f ts=6 v=f6 +stats: key_bytes=+12 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+21 gc_bytes_age=-190 +>> del_range_pred k=c end=i ts=7 startTime=1 max=1 +del_range_pred: resume span ["f","i") +stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-21 gc_bytes_age=+3069 +>> at end: +rangekey: c{-\x00}/[6.000000000,0=/] +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {j-k}/[5.000000000,0=/] +rangekey: {k-p}/[5.000000000,0=/ 4.000000000,0=/] +rangekey: {p-q\x00}/[5.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/7.000000000,0 -> / +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/6.000000000,0 -> /BYTES/f6 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=226 val_count=17 val_bytes=139 range_key_count=5 range_key_bytes=77 range_val_count=6 live_count=3 live_bytes=111 gc_bytes_age=31438 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# Run the same DeleteRange as above at ts 8 +# No resume span should get returned because the iterator goes through +# the whole span without encountering a second eligible key to delete +run stats ok +del_range_pred k=c end=i ts=8 startTime=1 max=1 +---- +>> del_range_pred k=c end=i ts=8 startTime=1 max=1 +stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-21 gc_bytes_age=+3036 +>> at end: +rangekey: c{-\x00}/[6.000000000,0=/] +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {j-k}/[5.000000000,0=/] +rangekey: {k-p}/[5.000000000,0=/ 4.000000000,0=/] +rangekey: {p-q\x00}/[5.000000000,0=/] +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/7.000000000,0 -> / +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/8.000000000,0 -> / +data: "f"/6.000000000,0 -> /BYTES/f6 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=238 val_count=18 val_bytes=139 range_key_count=5 range_key_bytes=77 range_val_count=6 live_count=2 live_bytes=90 gc_bytes_age=34474 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 + +# Write some new keys on a and b and ensure a run of point tombstones gets properly written +run stats ok +put k=a ts=5 v=a5 +put k=b ts=5 v=a5 +del_range_pred k=a end=c ts=6 startTime=1 +---- +>> put k=a ts=5 v=a5 +stats: key_bytes=+12 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+21 gc_bytes_age=-192 +>> put k=b ts=5 v=a5 +stats: key_bytes=+12 val_count=+1 val_bytes=+7 gc_bytes_age=+1805 +>> del_range_pred k=a end=c ts=6 startTime=1 +stats: key_bytes=+24 val_count=+2 live_count=-2 live_bytes=-42 gc_bytes_age=+6204 +>> at end: +rangekey: c{-\x00}/[6.000000000,0=/] +rangekey: {f-h\x00}/[5.000000000,0=/] +rangekey: {j-k}/[5.000000000,0=/] +rangekey: {k-p}/[5.000000000,0=/ 4.000000000,0=/] +rangekey: {p-q\x00}/[5.000000000,0=/] +data: "a"/6.000000000,0 -> / +data: "a"/5.000000000,0 -> /BYTES/a5 +data: "a"/4.000000000,0 -> / +data: "a"/2.000000000,0 -> /BYTES/a2 +data: "b"/6.000000000,0 -> / +data: "b"/5.000000000,0 -> /BYTES/a5 +data: "b"/3.000000000,0 -> /BYTES/b3 +data: "c"/2.000000000,0 -> /BYTES/c2 +data: "d"/5.000000000,0 -> / +data: "d"/4.000000000,0 -> /BYTES/d4 +data: "d"/1.000000000,0 -> /BYTES/d1 +data: "e"/7.000000000,0 -> / +data: "e"/2.000000000,0 -> /BYTES/e2 +data: "f"/8.000000000,0 -> / +data: "f"/6.000000000,0 -> /BYTES/f6 +data: "f"/4.000000000,0 -> /BYTES/f4 +data: "g"/4.000000000,0 -> / +data: "g"/2.000000000,0 -> /BYTES/g2 +data: "h"/4.000000000,0 -> /BYTES/h4 +meta: "i"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=7.000000000,0 min=0,0 seq=0} ts=7.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "i"/7.000000000,0 -> /BYTES/i7 +data: "j"/2.000000000,0 -> /BYTES/j2 +data: "q"/2.000000000,0 -> /BYTES/q2 +stats: key_count=11 key_bytes=286 val_count=22 val_bytes=153 range_key_count=5 range_key_bytes=77 range_val_count=6 live_count=1 live_bytes=69 gc_bytes_age=42291 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=93 From 878aa2525a082a02b4fd5558260cc31ddee0bb0b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 21 Jul 2022 22:53:13 +0000 Subject: [PATCH 2/3] backupccl: do not backup or restore system.jobs Previously system.jobs was backed up by cluster backups and restored by cluster restore, albeit with all but schema change jobs moved into a reverting state. However job records almost always include specific tables' IDs or IDs of other objects in their persisted state which means a restored job is only valid, even just for cancellation, if all of the objects that it could reference still have the same IDs after they're restored. At one point, this was true in cluster restore, but is becoming increasingly difficult to maintain as the portions of the ID space that are reserved changes. The only real reason to restore jobs, given that most were cancelled anyway, was so that schema changes mentioned in the descriptors of restored tables would be completed or rolled back -- absent a job, the mutation would otherwise just remain incomplete indefinitely. However during database or table RESTORE, jobs are not restored but rather the required schema change job is synthesized from the information in the table descriptor. This approach can be used during cluster restore as well instead of restoring the jobs table, eliminating the majority of ID-mentioning fields in restored system tables, and, in particular, those that are burried deep in binary payloads rather than easy to find columns (ala zones). Release note (sql change): Cluster BACKUP and RESTORE no longer includes job records, which previously were usually only restored in a cancelling state with the exception of schema changes which were restored to their initial running state. Instead any schema change jobs required for restored tables are recreated after restoring the tables. --- .../full_cluster_backup_restore_test.go | 61 ----------- pkg/ccl/backupccl/restore_job.go | 19 ++-- .../restore_mid_schema_change_test.go | 67 +++--------- pkg/ccl/backupccl/system_schema.go | 101 +----------------- 4 files changed, 21 insertions(+), 227 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index fcbcfb908474..04ae6cce272d 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -87,19 +87,6 @@ func TestFullClusterBackup(t *testing.T) { } } - // The claim_session_id field in jobs is a uuid and so needs to be excluded - // when comparing jobs pre/post restore. The span config reconciliation job - // too is something we exclude; because it's a singleton job, when restored - // into another cluster it self-terminates. - const jobsQuery = ` -SELECT id, status, created, payload, progress, created_by_type, created_by_id, claim_instance_id -FROM system.jobs -WHERE id NOT IN -( - SELECT job_id FROM [SHOW AUTOMATIC JOBS] - WHERE job_type = 'AUTO SPAN CONFIG RECONCILIATION' -) - ` // Pause SQL Stats compaction job to ensure the test is deterministic. sqlDB.Exec(t, `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR SQL STATISTICS]`) @@ -151,7 +138,6 @@ CREATE TABLE data2.foo (a int); // should appear in the restore. // This job will eventually fail since it will run from a new cluster. sqlDB.Exec(t, `BACKUP data.bank TO 'nodelocal://0/throwawayjob'`) - preBackupJobs := sqlDB.QueryStr(t, jobsQuery) // Populate system.settings. sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = 5`) sqlDB.Exec(t, `INSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())`, "some_key", "some_val") @@ -323,27 +309,6 @@ CREATE TABLE data2.foo (a int); sqlDBRestore.CheckQueryResults(t, grantCheck, sqlDB.QueryStr(t, grantCheck)) }) - t.Run("ensure that jobs are restored", func(t *testing.T) { - // Ensure that the jobs in the RESTORE cluster is a superset of the jobs - // that were in the BACKUP cluster (before the full cluster BACKUP job was - // run). There may be more jobs now because the restore can run jobs of - // its own. - newJobsStr := sqlDBRestore.QueryStr(t, jobsQuery) - newJobs := make(map[string][]string) - - for _, newJob := range newJobsStr { - // The first element of the slice is the job id. - newJobs[newJob[0]] = newJob - } - for _, oldJob := range preBackupJobs { - newJob, ok := newJobs[oldJob[0]] - if !ok { - t.Errorf("Expected to find job %+v in RESTORE cluster, but not found", oldJob) - } - require.Equal(t, oldJob, newJob) - } - }) - t.Run("zone_configs", func(t *testing.T) { // The restored zones should be a superset of the zones in the backed up // cluster. @@ -676,7 +641,6 @@ func TestClusterRestoreFailCleanup(t *testing.T) { {"comments"}, {"database_role_settings"}, {"external_connections"}, - {"jobs"}, {"locations"}, {"role_members"}, {"role_options"}, @@ -768,7 +732,6 @@ func TestClusterRestoreFailCleanup(t *testing.T) { {"comments"}, {"database_role_settings"}, {"external_connections"}, - {"jobs"}, {"locations"}, {"role_members"}, {"role_options"}, @@ -1043,30 +1006,6 @@ func TestReintroduceOfflineSpans(t *testing.T) { expectedCount = srcDB.QueryStr(t, checkQuery) destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount) }) - - t.Run("restore-canceled", func(t *testing.T) { - args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}}, - } - _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, args) - defer cleanupDst() - - destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc) - - // Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE - // job to cancel. - destDB.CheckQueryResultsRetry(t, ` - SELECT description, status FROM [SHOW JOBS] - WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`, - [][]string{}, - ) - // The cluster restore should succeed, but the table restore should have failed. - destDB.CheckQueryResults(t, - `SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`, - [][]string{{"canceled", "1"}, {"succeeded", "1"}}) - - destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`) - }) } // TestClusterRevisionDoesNotBackupOptOutSystemTables is a regression test for a diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 96907cae8350..af61e4136ca1 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1825,18 +1825,15 @@ func (r *restoreResumer) publishDescriptors( mutTable.RowLevelTTL.ScheduleID = j.ScheduleID() } newTables = append(newTables, mutTable.TableDesc()) - // For cluster restores, all the jobs are restored directly from the jobs - // table, so there is no need to re-create ongoing schema change jobs, - // otherwise we'll create duplicate jobs. - if details.DescriptorCoverage != tree.AllDescriptors || len(badIndexes) > 0 { - // Convert any mutations that were in progress on the table descriptor - // when the backup was taken, and convert them to schema change jobs. - if err := createSchemaChangeJobsFromMutations(ctx, - r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable, - ); err != nil { - return err - } + + // Convert any mutations that were in progress on the table descriptor + // when the backup was taken, and convert them to schema change jobs. + if err := createSchemaChangeJobsFromMutations(ctx, + r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable, + ); err != nil { + return err } + } // For all of the newly created types, make type schema change jobs for any // type descriptors that were backed up in the middle of a type schema change. diff --git a/pkg/ccl/backupccl/restore_mid_schema_change_test.go b/pkg/ccl/backupccl/restore_mid_schema_change_test.go index 124f1c19c73b..3b89d579de1e 100644 --- a/pkg/ccl/backupccl/restore_mid_schema_change_test.go +++ b/pkg/ccl/backupccl/restore_mid_schema_change_test.go @@ -95,7 +95,7 @@ func TestRestoreMidSchemaChange(t *testing.T) { for _, backupDir := range backupDirs { fullBackupDir, err := filepath.Abs(filepath.Join(fullClusterVersionDir, backupDir.Name())) require.NoError(t, err) - t.Run(backupDir.Name(), restoreMidSchemaChange(fullBackupDir, backupDir.Name(), isClusterRestore, blockLocation == "after")) + t.Run(backupDir.Name(), restoreMidSchemaChange(fullBackupDir, backupDir.Name(), isClusterRestore)) } }) } @@ -107,52 +107,25 @@ func TestRestoreMidSchemaChange(t *testing.T) { // expectedSCJobCount returns the expected number of schema change jobs // we expect to find. -func expectedSCJobCount(scName string, isClusterRestore, after bool) int { +func expectedSCJobCount(scName string) int { // The number of schema change under test. These will be the ones that are // synthesized in database restore. var expNumSCJobs int - var numBackgroundSCJobs int // Some test cases may have more than 1 background schema change job. switch scName { case "midmany": - numBackgroundSCJobs = 1 // the create table - // This test runs 3 schema changes on a single table. expNumSCJobs = 3 case "midmultitable": - numBackgroundSCJobs = 2 // this test creates 2 tables - expNumSCJobs = 2 // this test perform a schema change for each table + expNumSCJobs = 2 // this test perform a schema change for each table case "midprimarykeyswap": - // Create table + alter column is done in the prep stage of this test. - numBackgroundSCJobs = 2 // PK change + PK cleanup expNumSCJobs = 2 - if isClusterRestore && after { - expNumSCJobs = 1 - } case "midprimarykeyswapcleanup": - // This test performs an ALTER COLUMN, and the original ALTER PRIMARY - // KEY that is being cleaned up. - numBackgroundSCJobs = 3 expNumSCJobs = 1 default: // Most test cases only have 1 schema change under test. expNumSCJobs = 1 - // Most test cases have just a CREATE TABLE job that created the table - // under test. - numBackgroundSCJobs = 1 - } - - // We drop defaultdb and postgres for full cluster restores - numBackgroundDropDatabaseSCJobs := 2 - // Since we're doing a cluster restore, we need to account for all of - // the schema change jobs that existed in the backup. - if isClusterRestore { - expNumSCJobs += numBackgroundSCJobs + numBackgroundDropDatabaseSCJobs - - // If we're performing a cluster restore, we also need to include the drop - // crdb_temp_system job. - expNumSCJobs++ } return expNumSCJobs @@ -189,32 +162,16 @@ func getTablesInTest(scName string) (tableNames []string) { return } -func verifyMidSchemaChange( - t *testing.T, scName string, kvDB *kv.DB, sqlDB *sqlutils.SQLRunner, isClusterRestore, after bool, -) { +func verifyMidSchemaChange(t *testing.T, scName string, kvDB *kv.DB, sqlDB *sqlutils.SQLRunner) { tables := getTablesInTest(scName) // Check that we are left with the expected number of schema change jobs. - expNumSchemaChangeJobs := expectedSCJobCount(scName, isClusterRestore, after) - schemaChangeJobs := sqlDB.QueryStr(t, "SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE'") - require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs), - "Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs) - if isClusterRestore { - // Cluster restores should be restoring the exact job entries that were - // backed up, and therefore should not create jobs that contains "RESTORING" - // in the description. - schemaChangeJobs := sqlDB.QueryStr(t, - "SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description NOT LIKE '%RESTORING%'") - require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs), - "Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs) - } else { - // Non-cluster restores should create jobs with "RESTORE" in the job - // description. - schemaChangeJobs := sqlDB.QueryStr(t, - "SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description LIKE '%RESTORING%'") - require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs), - "Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs) - } + expNumSchemaChangeJobs := expectedSCJobCount(scName) + + synthesizedSchemaChangeJobs := sqlDB.QueryStr(t, + "SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description LIKE '%RESTORING%'") + require.Equal(t, expNumSchemaChangeJobs, len(synthesizedSchemaChangeJobs), + "Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, synthesizedSchemaChangeJobs) for _, tableName := range tables { validateTable(t, kvDB, sqlDB, "defaultdb", tableName) @@ -226,7 +183,7 @@ func verifyMidSchemaChange( } func restoreMidSchemaChange( - backupDir, schemaChangeName string, isClusterRestore bool, after bool, + backupDir, schemaChangeName string, isClusterRestore bool, ) func(t *testing.T) { return func(t *testing.T) { ctx := context.Background() @@ -265,6 +222,6 @@ func restoreMidSchemaChange( // Wait for all jobs to terminate. Some may fail since we don't restore // adding spans. sqlDB.CheckQueryResultsRetry(t, "SELECT * FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND NOT (status = 'succeeded' OR status = 'failed')", [][]string{}) - verifyMidSchemaChange(t, schemaChangeName, kvDB, sqlDB, isClusterRestore, after) + verifyMidSchemaChange(t, schemaChangeName, kvDB, sqlDB) } } diff --git a/pkg/ccl/backupccl/system_schema.go b/pkg/ccl/backupccl/system_schema.go index 17775dc69162..a28e32fa0692 100644 --- a/pkg/ccl/backupccl/system_schema.go +++ b/pkg/ccl/backupccl/system_schema.go @@ -12,19 +12,14 @@ import ( "context" fmt "fmt" "math" - "strings" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -155,98 +150,6 @@ func queryTableRowCount( return int64(*count), nil } -// jobsMigrationFunc resets the progress on schema change jobs, and marks all -// other jobs as reverting. -func jobsMigrationFunc( - ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string, -) (err error) { - executor := execCfg.InternalExecutor - - const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` + - `'` + string(jobs.StatusPauseRequested) + `', ` + - `'` + string(jobs.StatusPaused) + `')` - - jobsToRevert := make([]int64, 0) - query := `SELECT id, payload FROM ` + tempTableName + ` WHERE status IN ` + statesToRevert - it, err := executor.QueryIteratorEx( - ctx, "restore-fetching-job-payloads", txn, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - query) - if err != nil { - return errors.Wrap(err, "fetching job payloads") - } - defer func() { - closeErr := it.Close() - if err == nil { - err = closeErr - } - }() - for { - ok, err := it.Next(ctx) - if !ok { - if err != nil { - return err - } - break - } - - r := it.Cur() - id, payloadBytes := r[0], r[1] - rawJobID, ok := id.(*tree.DInt) - if !ok { - return errors.Errorf("job: failed to read job id as DInt (was %T)", id) - } - jobID := int64(*rawJobID) - - payload, err := jobs.UnmarshalPayload(payloadBytes) - if err != nil { - return errors.Wrap(err, "failed to unmarshal job to restore") - } - if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore { - jobsToRevert = append(jobsToRevert, jobID) - } - } - - // Update the status for other jobs. - var updateStatusQuery strings.Builder - fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", tempTableName) - fmt.Fprint(&updateStatusQuery, "(") - for i, job := range jobsToRevert { - if i > 0 { - fmt.Fprint(&updateStatusQuery, ", ") - } - fmt.Fprintf(&updateStatusQuery, "'%d'", job) - } - fmt.Fprint(&updateStatusQuery, ")") - - if _, err := executor.Exec(ctx, "updating-job-status", txn, updateStatusQuery.String(), jobs.StatusCancelRequested); err != nil { - return errors.Wrap(err, "updating restored jobs as reverting") - } - - return nil -} - -// When restoring the jobs table we don't want to remove existing jobs, since -// that includes the restore that we're running. -func jobsRestoreFunc( - ctx context.Context, - execCfg *sql.ExecutorConfig, - txn *kv.Txn, - systemTableName, tempTableName string, -) error { - executor := execCfg.InternalExecutor - - // When restoring jobs, don't clear the existing table. - - restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s) ON CONFLICT DO NOTHING;", - systemTableName, tempTableName) - opName := systemTableName + "-data-insert" - if _, err := executor.Exec(ctx, opName, txn, restoreQuery); err != nil { - return errors.Wrapf(err, "inserting data to system.%s", systemTableName) - } - return nil -} - // When restoring the settings table, we want to make sure to not override the // version. func settingsRestoreFunc( @@ -314,9 +217,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{ shouldIncludeInClusterBackup: optInToClusterBackup, }, systemschema.JobsTable.GetName(): { - shouldIncludeInClusterBackup: optInToClusterBackup, - migrationFunc: jobsMigrationFunc, - customRestoreFunc: jobsRestoreFunc, + shouldIncludeInClusterBackup: optOutOfClusterBackup, }, systemschema.ScheduledJobsTable.GetName(): { shouldIncludeInClusterBackup: optInToClusterBackup, From 11718f3d135c9edc117e154b4395d3829d6e0ed7 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 23 Jul 2022 21:05:28 +0000 Subject: [PATCH 3/3] backupccl: check crdb_internal.invalid_objects is empty after restore-mid-schema-change tests Release notes: none. --- pkg/ccl/backupccl/restore_mid_schema_change_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ccl/backupccl/restore_mid_schema_change_test.go b/pkg/ccl/backupccl/restore_mid_schema_change_test.go index 3b89d579de1e..a1f024856142 100644 --- a/pkg/ccl/backupccl/restore_mid_schema_change_test.go +++ b/pkg/ccl/backupccl/restore_mid_schema_change_test.go @@ -223,5 +223,6 @@ func restoreMidSchemaChange( // adding spans. sqlDB.CheckQueryResultsRetry(t, "SELECT * FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND NOT (status = 'succeeded' OR status = 'failed')", [][]string{}) verifyMidSchemaChange(t, schemaChangeName, kvDB, sqlDB) + sqlDB.CheckQueryResultsRetry(t, "SELECT * from crdb_internal.invalid_objects", [][]string{}) } }