diff --git a/cdc/owner/scheduler_v1.go b/cdc/owner/scheduler_v1.go index 480a170d6a9..140d08057c3 100644 --- a/cdc/owner/scheduler_v1.go +++ b/cdc/owner/scheduler_v1.go @@ -309,6 +309,11 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) { func (s *oldScheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + if status == nil { + log.Warn("task status of the capture is not found, may be the key in etcd was deleted.") + return status, false, nil + } + changed := false for tableID, operation := range status.Operation { if operation.Status == model.OperFinished { diff --git a/cdc/owner/scheduler_v1_test.go b/cdc/owner/scheduler_v1_test.go index e732669873f..eb2950b7190 100644 --- a/cdc/owner/scheduler_v1_test.go +++ b/cdc/owner/scheduler_v1_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -83,8 +84,24 @@ func (s *schedulerSuite) finishTableOperation(captureID model.CaptureID, tableID func (s *schedulerSuite) TestScheduleOneCapture(c *check.C) { defer testleak.AfterTest(c)() + + s.reset(c) + captureID := "test-capture-0" + s.addCapture(captureID) + + _, _ = s.scheduler.Tick(s.state, []model.TableID{}, s.captures) + + // Manually simulate the scenario where the corresponding key was deleted in the etcd + key := &etcd.CDCKey{ + Tp: etcd.CDCKeyTypeTaskStatus, + CaptureID: captureID, + ChangefeedID: s.state.ID, + } + s.tester.MustUpdate(key.String(), nil) + s.tester.MustApplyPatches() + s.reset(c) - captureID := "test-capture-1" + captureID = "test-capture-1" s.addCapture(captureID) // add three tables