Skip to content

Commit

Permalink
log backup: set a proper maxVersion to resolve lock (#57178)
Browse files Browse the repository at this point in the history
close #57134
  • Loading branch information
3pointer authored Nov 11, 2024
1 parent 91436e3 commit 042846e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 33 deletions.
13 changes: 8 additions & 5 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -472,7 +471,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
Expand All @@ -498,7 +497,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
return err
}

if c.setCheckpoint(ctx, cp) {
if c.setCheckpoint(cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
Expand Down Expand Up @@ -585,7 +584,7 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(ctx, c.checkpoints.Min())
c.setCheckpoint(c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
Expand Down Expand Up @@ -685,10 +684,14 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar
// do not block main tick here
go func() {
failpoint.Inject("AsyncResolveLocks", func() {})
maxTs := uint64(0)
for _, t := range targets {
maxTs = max(maxTs, t.Value)
}
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
ctx, c.env, maxTs+1, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
}
workerPool := util.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
var wg sync.WaitGroup
Expand Down
59 changes: 34 additions & 25 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package streamhelper_test

import (
"bytes"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -356,48 +357,60 @@ func TestResolveLock(t *testing.T) {
lockRegion := c.findRegionByKey([]byte("01"))
allLocks := []*txnlock.Lock{
{
Key: []byte{1},
Key: []byte("011"),
// TxnID == minCheckpoint
TxnID: minCheckpoint,
},
{
Key: []byte{2},
Key: []byte("012"),
// TxnID > minCheckpoint
TxnID: minCheckpoint + 1,
},
{
Key: []byte("013"),
// this lock cannot be resolved due to scan version
TxnID: oracle.GoTimeToTS(oracle.GetTimeFromTS(minCheckpoint).Add(2 * time.Minute)),
},
}
c.LockRegion(lockRegion, allLocks)

// ensure resolve locks triggered and collect all locks from scan locks
resolveLockRef := atomic.NewBool(false)
env.resolveLocks = func(locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
resolveLockRef.Store(true)
require.ElementsMatch(t, locks, allLocks)
// The third lock has skipped, because it's less than max version.
require.ElementsMatch(t, locks, allLocks[:2])
return loc, nil
}
adv := streamhelper.NewCheckpointAdvancer(env)
// make lastCheckpoint stuck at 123
adv.UpdateLastCheckpoint(streamhelper.NewCheckpointWithSpan(spans.Valued{
Key: kv.KeyRange{
StartKey: kv.Key([]byte("1")),
EndKey: kv.Key([]byte("2")),
},
Value: 123,
}))
adv.NewCheckpoints(
spans.Sorted(spans.NewFullWith([]kv.KeyRange{
{
StartKey: kv.Key([]byte("1")),
EndKey: kv.Key([]byte("2")),
},
}, 0)),
)
adv.StartTaskListener(ctx)
require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil },
time.Second, 50*time.Millisecond)

maxTargetTs := uint64(0)
coll := streamhelper.NewClusterCollector(ctx, env)
coll.SetOnSuccessHook(func(u uint64, kr kv.KeyRange) {
adv.WithCheckpoints(func(s *spans.ValueSortedFull) {
for _, lock := range allLocks {
// if there is any lock key in the range
if bytes.Compare(kr.StartKey, lock.Key) <= 0 && (bytes.Compare(lock.Key, kr.EndKey) < 0 || len(kr.EndKey) == 0) {
// mock lock behavior, do not update checkpoint
s.Merge(spans.Valued{Key: kr, Value: minCheckpoint})
return
}
}
s.Merge(spans.Valued{Key: kr, Value: u})
maxTargetTs = max(maxTargetTs, u)
})
})
err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll)
require.NoError(t, err)
r, err := coll.Finish(ctx)
require.NoError(t, err)
require.Len(t, r.FailureSubRanges, 0)
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)

env.maxTs = maxTargetTs + 1
require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil },
time.Second, 50*time.Millisecond)
// now the lock state must be ture. because tick finished and asyncResolveLocks got stuck.
require.True(t, adv.GetInResolvingLock())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks"))
Expand All @@ -406,10 +419,6 @@ func TestResolveLock(t *testing.T) {
// state must set to false after tick
require.Eventually(t, func() bool { return !adv.GetInResolvingLock() },
8*time.Second, 50*time.Microsecond)
r, err := coll.Finish(ctx)
require.NoError(t, err)
require.Len(t, r.FailureSubRanges, 0)
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
}

func TestOwnerDropped(t *testing.T) {
Expand Down
27 changes: 24 additions & 3 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type fakeCluster struct {
idAlloced uint64
stores map[uint64]*fakeStore
regions []*region
maxTs uint64
testCtx *testing.T

onGetClient func(uint64) error
Expand Down Expand Up @@ -773,14 +774,24 @@ func (t *testEnv) putTask() {
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
if t.maxTs != maxVersion {
return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion)
}
for _, r := range t.regions {
if len(r.locks) != 0 {
return r.locks, &tikv.KeyLocation{
locks := make([]*txnlock.Lock, 0, len(r.locks))
for _, l := range r.locks {
// skip the lock larger than maxVersion
if l.TxnID < maxVersion {
locks = append(locks, l)
}
}
return locks, &tikv.KeyLocation{
Region: tikv.NewRegionVerID(r.id, 0, 0),
}, nil
}
}
return nil, nil, nil
return nil, &tikv.KeyLocation{}, nil
}

func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
Expand All @@ -791,7 +802,7 @@ func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.L
return t.resolveLocks(locks, loc)
}
}
return nil, nil
return loc, nil
}

func (t *testEnv) Identifier() string {
Expand Down Expand Up @@ -856,6 +867,16 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor
}, nil
}

func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
// only used for GetRegionCache once in resolve lock
return []*metapb.Store{
{
Id: 1,
Address: "127.0.0.1",
},
}, nil
}

func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 {
return 1
}
Expand Down

0 comments on commit 042846e

Please sign in to comment.