From 62d40391841fce13697717bde1254a8f78e324e8 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Thu, 11 Jan 2024 17:27:32 +0800 Subject: [PATCH] disttask: fix submit task with same key (#50317) close pingcap/tidb#50318 --- pkg/disttask/framework/handle/handle.go | 2 +- pkg/disttask/framework/handle/handle_test.go | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/disttask/framework/handle/handle.go b/pkg/disttask/framework/handle/handle.go index b774fd7aa01bf..4c1605b4fb419 100644 --- a/pkg/disttask/framework/handle/handle.go +++ b/pkg/disttask/framework/handle/handle.go @@ -60,7 +60,7 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co if err != nil { return nil, err } - task, err := taskManager.GetTaskByKey(ctx, taskKey) + task, err := taskManager.GetTaskByKeyWithHistory(ctx, taskKey) if err != nil && err != storage.ErrTaskNotFound { return nil, err } diff --git a/pkg/disttask/framework/handle/handle_test.go b/pkg/disttask/framework/handle/handle_test.go index 7a51944056a04..260cf05167a2a 100644 --- a/pkg/disttask/framework/handle/handle_test.go +++ b/pkg/disttask/framework/handle/handle_test.go @@ -40,8 +40,7 @@ func TestHandle(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu")) }) - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "handle_test") + ctx := util.WithInternalSourceType(context.Background(), "handle_test") store := testkit.CreateMockStore(t) gtk := testkit.NewTestKit(t, store) @@ -87,6 +86,15 @@ func TestHandle(t *testing.T) { // pause and resume task. require.NoError(t, handle.PauseTask(ctx, "2")) require.NoError(t, handle.ResumeTask(ctx, "2")) + + // submit task with same key + task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, proto.EmptyMeta) + require.NoError(t, err) + require.Equal(t, int64(3), task.ID) + require.NoError(t, mgr.TransferTasks2History(ctx, []*proto.Task{task})) + task, err = handle.SubmitTask(ctx, "3", proto.TaskTypeExample, 2, proto.EmptyMeta) + require.Nil(t, task) + require.Error(t, storage.ErrTaskAlreadyExists, err) } func TestRunWithRetry(t *testing.T) {