Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log backup: use global checkpoint ts as source of truth #58135

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs <= c.task.StartTs {
// task is not started yet
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
Expand Down
164 changes: 154 additions & 10 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
}
}

func TestOwnerChangeCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestEnv(c, t)
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
env.task = streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
},
Ranges: rngs,
}

adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx1, cancel1 := context.WithCancel(context.Background())
adv.OnStart(ctx1)
adv.OnBecomeOwner(ctx1)
log.Info("advancer1 become owner")
require.NoError(t, adv.OnTick(ctx1))

// another advancer but never advance checkpoint before
adv2 := streamhelper.NewCheckpointAdvancer(env)
adv2.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx2, cancel2 := context.WithCancel(context.Background())
adv2.OnStart(ctx2)

for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx1))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")

// resume task to make next tick normally
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)

// stop advancer1, and advancer2 should take over
cancel1()
log.Info("advancer1 owner canceled, and advancer2 become owner")
adv2.OnBecomeOwner(ctx2)
require.NoError(t, adv2.OnTick(ctx2))

// advancer2 should take over and tick normally
for i := 0; i < 10; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv2.OnTick(ctx2))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
// stop advancer2, and advancer1 should take over
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)
cancel2()
log.Info("advancer2 owner canceled, and advancer1 become owner")

adv.OnBecomeOwner(ctx)
// advancer1 should take over and tick normally when come back
require.NoError(t, adv.OnTick(ctx))
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand Down Expand Up @@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
Copy link
Contributor

@RidRisR RidRisR Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If there is a new task and the global checkpoint is never advanced, the task will never be paused even exceed the limit.

This implies that we should never pause a task that never advanced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it should be if global ts is less than task.start-ts which implies that could have some corner cases when start a new task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	if globalTs <= c.task.StartTs {
		// task is not started yet
		return false, nil
	}

Then maybe here should be < instead of <= ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if globalTs == c.task.StartTs this will only happen when new task created.

Since it's not the common case after task running for some time. I think it's better to make this not pause by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes the task may be stuck from creating, say, the advancer doesn't work or one of TiKV didn't notice the task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talk with @YuJuncen , we had a agreement to make the check include when globalTs == task.StartTs. I changed it.

Additionally I found the unproper error return logic when add task. I also fixed it this PR, and fix the related test cases.

c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(3 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
Expand All @@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
Expand Down Expand Up @@ -604,18 +686,74 @@ func TestUnregisterAfterPause(t *testing.T) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)

// No matter how many times the task is paused, after put a new one the task should run normally
// First sequence: pause -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
require.NoError(t, adv.OnTick(ctx))

// Second sequence: put -> pause -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))

// Third sequence: put -> pause -> put -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))

// Fourth sequence: unregister -> put -> pause -> put -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
env.putTask()
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))

// Fifth sequence: multiple rapid operations with put before pause
for i := 0; i < 3; i++ {
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.unregisterTask()
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
}

// Sixth sequence: rapid alternating put and pause
for i := 0; i < 3; i++ {
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
}

// Final verification
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -767,11 +905,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
c.advanceClusterTimeBy(3 * time.Minute)
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
c.advanceClusterTimeBy(4 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
require.NoError(t, adv.OnTick(ctx))

// if start ts < checkpoint, the task will be paused
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
// Try update checkpoint
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// Verify no err raised after paused
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ func (t *testEnv) putTask() {
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
Name: "whole",
StartTs: 5,
},
Ranges: rngs,
}
Expand Down