Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log backup: use global checkpoint ts as source of truth #58135

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
19 changes: 14 additions & 5 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
return err
// ignore the error, just log it
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
}
if globalCheckpointTs < c.task.StartTs {
globalCheckpointTs = c.task.StartTs
Expand Down Expand Up @@ -568,13 +568,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
Expand All @@ -592,7 +600,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
Expand Down Expand Up @@ -657,7 +666,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
log.Info("No tasks yet, skipping advancing.")
return nil
}

Expand Down
149 changes: 136 additions & 13 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
}
}

func TestOwnerChangeCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestEnv(c, t)
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
env.task = streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
},
Ranges: rngs,
}

adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx1, cancel1 := context.WithCancel(context.Background())
adv.OnStart(ctx1)
adv.OnBecomeOwner(ctx1)
log.Info("advancer1 become owner")
require.NoError(t, adv.OnTick(ctx1))

// another advancer but never advance checkpoint before
adv2 := streamhelper.NewCheckpointAdvancer(env)
adv2.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx2, cancel2 := context.WithCancel(context.Background())
adv2.OnStart(ctx2)

for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx1))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")

// resume task to make next tick normally
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)

// stop advancer1, and advancer2 should take over
cancel1()
log.Info("advancer1 owner canceled, and advancer2 become owner")
adv2.OnBecomeOwner(ctx2)
require.NoError(t, adv2.OnTick(ctx2))

// advancer2 should take over and tick normally
for i := 0; i < 10; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv2.OnTick(ctx2))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
// stop advancer2, and advancer1 should take over
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)
cancel2()
log.Info("advancer2 owner canceled, and advancer1 become owner")

adv.OnBecomeOwner(ctx)
// advancer1 should take over and tick normally when come back
require.NoError(t, adv.OnTick(ctx))
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand Down Expand Up @@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
Copy link
Contributor

@RidRisR RidRisR Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If there is a new task and the global checkpoint is never advanced, the task will never be paused even exceed the limit.

This implies that we should never pause a task that never advanced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it should be if global ts is less than task.start-ts which implies that could have some corner cases when start a new task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	if globalTs <= c.task.StartTs {
		// task is not started yet
		return false, nil
	}

Then maybe here should be < instead of <= ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if globalTs == c.task.StartTs this will only happen when new task created.

Since it's not the common case after task running for some time. I think it's better to make this not pause by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes the task may be stuck from creating, say, the advancer doesn't work or one of TiKV didn't notice the task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talk with @YuJuncen , we had a agreement to make the check include when globalTs == task.StartTs. I changed it.

Additionally I found the unproper error return logic when add task. I also fixed it this PR, and fix the related test cases.

c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(3 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
Expand All @@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
Expand Down Expand Up @@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// task is should be paused when global checkpoint is laggeod
// even the global checkpoint is equal to task start ts(not advanced all the time)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
env.unregisterTask()
env.putTask()

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")

env.unregisterTask()
// wait for the task to be deleted
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// reset
c.advanceClusterTimeBy(-1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
// wait for the task to be add
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(3 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(2 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.NoError(t, adv.OnTick(ctx))
// if cannot connect to pd, the checkpoint will be rolled back
// because at this point. the global ts is 2 minutes
// and the local checkpoint ts is 1 minute
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")

// only when local checkpoint > global ts, the next tick will be normal
c.advanceCheckpointBy(12 * time.Minute)
// Verify no err raised
require.NoError(t, adv.OnTick(ctx))
}
Expand Down Expand Up @@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
c.advanceClusterTimeBy(3 * time.Minute)
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))

c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// Verify no err raised after paused
Expand Down
12 changes: 10 additions & 2 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
defer t.mu.Unlock()

if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
log.Error("checkpoint rolling back",
zap.Uint64("from", t.checkpoint),
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
Expand Down Expand Up @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))

t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
}

Expand All @@ -766,7 +773,8 @@ func (t *testEnv) putTask() {
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
Name: "whole",
StartTs: 5,
},
Ranges: rngs,
}
Expand Down
Loading