From 70377564a630de776c5c9ac3d68ddacaaeaceeff Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 21 Dec 2021 21:56:53 +0800 Subject: [PATCH 1/3] owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980) --- cdc/owner/scheduler.go | 5 +++++ cdc/owner/scheduler_test.go | 19 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cdc/owner/scheduler.go b/cdc/owner/scheduler.go index 00f0771d815..9dd8b7350be 100644 --- a/cdc/owner/scheduler.go +++ b/cdc/owner/scheduler.go @@ -300,6 +300,11 @@ func (s *scheduler) handleJobs(jobs []*schedulerJob) { func (s *scheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + if status == nil { + log.Warn("task status of the capture is not found, may be the key in etcd was deleted.") + return status, false, nil + } + changed := false for tableID, operation := range status.Operation { if operation.Status == model.OperFinished { diff --git a/cdc/owner/scheduler_test.go b/cdc/owner/scheduler_test.go index 669ab85a8f5..701e1838efb 100644 --- a/cdc/owner/scheduler_test.go +++ b/cdc/owner/scheduler_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -84,8 +85,24 @@ func (s *schedulerSuite) finishTableOperation(captureID model.CaptureID, tableID func (s *schedulerSuite) TestScheduleOneCapture(c *check.C) { defer testleak.AfterTest(c)() + + s.reset(c) + captureID := "test-capture-0" + s.addCapture(captureID) + + _, _ = s.scheduler.Tick(s.state, []model.TableID{}, s.captures) + + // Manually simulate the scenario where the corresponding key was deleted in the etcd + key := &etcd.CDCKey{ + Tp: etcd.CDCKeyTypeTaskStatus, + CaptureID: captureID, + ChangefeedID: s.state.ID, + } + s.tester.MustUpdate(key.String(), nil) + s.tester.MustApplyPatches() + s.reset(c) - captureID := "test-capture-1" + captureID = "test-capture-1" s.addCapture(captureID) // add three tables From 4e64ffeeb697358620c6814ecfe8041b86ca0618 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 22 Dec 2021 10:40:57 +0800 Subject: [PATCH 2/3] add captureID and changefeedID fields to warn log --- cdc/owner/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/scheduler.go b/cdc/owner/scheduler.go index 9dd8b7350be..43c269c91ca 100644 --- a/cdc/owner/scheduler.go +++ b/cdc/owner/scheduler.go @@ -301,7 +301,7 @@ func (s *scheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { if status == nil { - log.Warn("task status of the capture is not found, may be the key in etcd was deleted.") + log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.Any("captureID", captureID), zap.Any("changeFeedID", s.state.ID)) return status, false, nil } From 3fc1c5878b460833d1be3f8e17709494f82ccb79 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 22 Dec 2021 10:50:42 +0800 Subject: [PATCH 3/3] handle comment Co-authored-by: amyangfei --- cdc/owner/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/scheduler.go b/cdc/owner/scheduler.go index 43c269c91ca..7f47c61ff9f 100644 --- a/cdc/owner/scheduler.go +++ b/cdc/owner/scheduler.go @@ -301,7 +301,7 @@ func (s *scheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { if status == nil { - log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.Any("captureID", captureID), zap.Any("changeFeedID", s.state.ID)) + log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.String("captureID", captureID), zap.String("changeFeedID", s.state.ID)) return status, false, nil }