From 775349ae51f522d53e32db3a8737dc64bb553276 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 12 Dec 2024 17:00:33 +0800 Subject: [PATCH] do not return error when add task & fix some cases --- br/pkg/streamhelper/advancer.go | 13 +-- br/pkg/streamhelper/advancer_test.go | 97 +++++++++-------------- br/pkg/streamhelper/basic_lib_for_test.go | 9 ++- 3 files changed, 53 insertions(+), 66 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 4de81bf57e71f..d2fd40ac9f1c2 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name) if err != nil { - log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err)) - return err + // ignore the error, just log it + log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err)) } if globalCheckpointTs < c.task.StartTs { globalCheckpointTs = c.task.StartTs @@ -571,8 +571,8 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro if err != nil { return false, err } - if globalTs <= c.task.StartTs { - // task is not started yet + if globalTs < c.task.StartTs { + // unreachable. return false, nil } @@ -599,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { } isLagged, err := c.isCheckpointLagged(ctx) if err != nil { - return errors.Annotate(err, "failed to check timestamp") + // ignore the error, just log it + log.Warn("failed to check timestamp", logutil.ShortError(err)) } if isLagged { err := c.env.PauseTask(ctx, c.task.Name) @@ -664,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { c.taskMu.Lock() defer c.taskMu.Unlock() if c.task == nil || c.isPaused.Load() { - log.Debug("No tasks yet, skipping advancing.") + log.Info("No tasks yet, skipping advancing.") return nil } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 1b8ef3b01cd5c..37aa697e67791 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -687,73 +687,47 @@ func TestUnregisterAfterPause(t *testing.T) { }) adv.StartTaskListener(ctx) - // No matter how many times the task is paused, after put a new one the task should run normally - // First sequence: pause -> unregister -> put + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // task is should be paused when global checkpoint is laggeod + // even the global checkpoint is equal to task start ts(not advanced all the time) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) env.PauseTask(ctx, "whole") c.advanceClusterTimeBy(1 * time.Minute) - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") env.unregisterTask() env.putTask() - require.NoError(t, adv.OnTick(ctx)) - // Second sequence: put -> pause -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.unregisterTask() - env.putTask() - require.NoError(t, adv.OnTick(ctx)) + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) - // Third sequence: put -> pause -> put -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - env.unregisterTask() - env.putTask() - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") - // Fourth sequence: unregister -> put -> pause -> put -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) env.unregisterTask() - env.putTask() - env.PauseTask(ctx, "whole") - time.Sleep(1 * time.Second) - env.putTask() + // wait for the task to be deleted + require.Eventually(t, func() bool { + return !adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // reset + c.advanceClusterTimeBy(-1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx, "whole") + c.advanceClusterTimeBy(1 * time.Minute) env.unregisterTask() env.putTask() - require.NoError(t, adv.OnTick(ctx)) - - // Fifth sequence: multiple rapid operations with put before pause - for i := 0; i < 3; i++ { - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.unregisterTask() - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - } - - // Sixth sequence: rapid alternating put and pause - for i := 0; i < 3; i++ { - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - } + // wait for the task to be add + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) - // Final verification - c.advanceClusterTimeBy(1 * time.Minute) - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") } // If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. @@ -865,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) { adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) + adv.StartTaskListener(ctx) c.advanceClusterTimeBy(3 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(2 * time.Minute) env.mockPDConnectionError() - adv.StartTaskListener(ctx) - // Try update checkpoint - require.NoError(t, adv.OnTick(ctx)) + // if cannot connect to pd, the checkpoint will be rolled back + // because at this point. the global ts is 2 minutes + // and the local checkpoint ts is 1 minute + require.Error(t, adv.OnTick(ctx), "checkpoint rollback") + + // only when local checkpoint > global ts, the next tick will be normal + c.advanceCheckpointBy(12 * time.Minute) // Verify no err raised require.NoError(t, adv.OnTick(ctx)) } @@ -907,13 +886,13 @@ func TestAddTaskWithLongRunTask3(t *testing.T) { }) // advance cluster time to 4 minutes, and checkpoint to 1 minutes // if start ts equals to checkpoint, the task will not be paused - c.advanceClusterTimeBy(4 * time.Minute) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(2 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) - adv.StartTaskListener(ctx) require.NoError(t, adv.OnTick(ctx)) - // if start ts < checkpoint, the task will be paused + c.advanceClusterTimeBy(2 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) // Try update checkpoint diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 5b5cd35c79021..22a66c18e27af 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, defer t.mu.Unlock() if checkpoint < t.checkpoint { - t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + log.Error("checkpoint rolling back", + zap.Uint64("from", t.checkpoint), + zap.Uint64("to", checkpoint), + zap.Stack("stack")) + // t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + return errors.New("checkpoint rolling back") } t.checkpoint = checkpoint return nil @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) { t.mu.Lock() defer t.mu.Unlock() + log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint)) + t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration)) }