Skip to content

Commit

Permalink
do not return error when add task & fix some cases
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored and ti-chi-bot committed Dec 14, 2024
1 parent 6760855 commit 775349a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 66 deletions.
13 changes: 7 additions & 6 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
return err
// ignore the error, just log it
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
}
if globalCheckpointTs < c.task.StartTs {
globalCheckpointTs = c.task.StartTs
Expand Down Expand Up @@ -571,8 +571,8 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if err != nil {
return false, err
}
if globalTs <= c.task.StartTs {
// task is not started yet
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
}

Expand All @@ -599,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
Expand Down Expand Up @@ -664,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
log.Info("No tasks yet, skipping advancing.")
return nil
}

Expand Down
97 changes: 38 additions & 59 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,73 +687,47 @@ func TestUnregisterAfterPause(t *testing.T) {
})
adv.StartTaskListener(ctx)

// No matter how many times the task is paused, after put a new one the task should run normally
// First sequence: pause -> unregister -> put
// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// task is should be paused when global checkpoint is laggeod
// even the global checkpoint is equal to task start ts(not advanced all the time)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))

// Second sequence: put -> pause -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))
// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// Third sequence: put -> pause -> put -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")

// Fourth sequence: unregister -> put -> pause -> put -> unregister -> put
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
env.putTask()
// wait for the task to be deleted
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// reset
c.advanceClusterTimeBy(-1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
require.NoError(t, adv.OnTick(ctx))

// Fifth sequence: multiple rapid operations with put before pause
for i := 0; i < 3; i++ {
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.unregisterTask()
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
}

// Sixth sequence: rapid alternating put and pause
for i := 0; i < 3; i++ {
c.advanceClusterTimeBy(1 * time.Minute)
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
env.PauseTask(ctx, "whole")
env.putTask()
require.NoError(t, adv.OnTick(ctx))
}
// wait for the task to be add
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// Final verification
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -865,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(3 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(2 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.NoError(t, adv.OnTick(ctx))
// if cannot connect to pd, the checkpoint will be rolled back
// because at this point. the global ts is 2 minutes
// and the local checkpoint ts is 1 minute
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")

// only when local checkpoint > global ts, the next tick will be normal
c.advanceCheckpointBy(12 * time.Minute)
// Verify no err raised
require.NoError(t, adv.OnTick(ctx))
}
Expand Down Expand Up @@ -907,13 +886,13 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
})
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
c.advanceClusterTimeBy(4 * time.Minute)
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
adv.StartTaskListener(ctx)
require.NoError(t, adv.OnTick(ctx))

// if start ts < checkpoint, the task will be paused
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
// Try update checkpoint
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
defer t.mu.Unlock()

if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
log.Error("checkpoint rolling back",
zap.Uint64("from", t.checkpoint),
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
Expand Down Expand Up @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))

t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
}

Expand Down

0 comments on commit 775349a

Please sign in to comment.