Skip to content

Commit

Permalink
openapi(dm): add error_msg in task status (pingcap#4421)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored and zhaoxinyu committed Feb 16, 2022
1 parent 799c9ad commit f56b46f
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 174 deletions.
21 changes: 13 additions & 8 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,16 @@ func getOpenAPISubtaskStatusByTaskName(taskName string,
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
sourceStatus := workerStatus.SourceStatus
openapiSubTaskStatus := openapi.SubTaskStatus{
Name: taskName,
SourceName: sourceStatus.GetSource(),
WorkerName: sourceStatus.GetWorker(),
}
if !workerStatus.Result {
openapiSubTaskStatus.ErrorMsg = &workerStatus.Msg
subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus)
continue
}
// find right task name
var subTaskStatus *pb.SubTaskStatus
for _, cfg := range workerStatus.SubTaskStatus {
Expand All @@ -947,14 +957,9 @@ func getOpenAPISubtaskStatusByTaskName(taskName string,
// not find
continue
}
openapiSubTaskStatus := openapi.SubTaskStatus{
Name: taskName,
SourceName: sourceStatus.GetSource(),
WorkerName: sourceStatus.GetWorker(),
Stage: subTaskStatus.GetStage().String(),
Unit: subTaskStatus.GetUnit().String(),
UnresolvedDdlLockId: &subTaskStatus.UnresolvedDDLLockID,
}
openapiSubTaskStatus.Stage = subTaskStatus.GetStage().String()
openapiSubTaskStatus.Unit = subTaskStatus.GetUnit().String()
openapiSubTaskStatus.UnresolvedDdlLockId = &subTaskStatus.UnresolvedDDLLockID
// add load status
if loadS := subTaskStatus.GetLoad(); loadS != nil {
openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{
Expand Down
104 changes: 66 additions & 38 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {

// get task status
mockWorkerClient := pbmock.NewMockWorkerClient(ctrl)
mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1)
mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1, false)
s.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient))
taskStatusURL := fmt.Sprintf("%s/%s/status", taskURL, task.Name)
result = testutil.NewRequest().Get(taskStatusURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
Expand Down Expand Up @@ -576,6 +576,21 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(status.WorkerName, check.Equals, workerName1)
c.Assert(status.Name, check.Equals, task.Name)

// test some error happened on worker
mockWorkerClient = pbmock.NewMockWorkerClient(ctrl)
mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1, true)
s.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient))
result = testutil.NewRequest().Get(taskURL+"?with_status=true").GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
c.Assert(result.UnmarshalBodyToObject(&resultListTask), check.IsNil)
c.Assert(resultListTask.Data, check.HasLen, 1)
c.Assert(resultListTask.Total, check.Equals, 1)
c.Assert(resultListTask.Data[0].StatusList, check.NotNil)
statusList = *resultListTask.Data[0].StatusList
c.Assert(statusList, check.HasLen, 1)
status = statusList[0]
c.Assert(status.ErrorMsg, check.NotNil)

// stop task
result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", taskURL, task.Name)).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusNoContent)
Expand Down Expand Up @@ -840,49 +855,62 @@ func mockRelayQueryStatus(
}

func mockTaskQueryStatus(
mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceName, workerName string) {
queryResp := &pb.QueryStatusResponse{
Result: true,
SourceStatus: &pb.SourceStatus{
Worker: workerName,
Source: sourceName,
},
SubTaskStatus: []*pb.SubTaskStatus{
{
Stage: pb.Stage_Running,
Name: taskName,
Status: &pb.SubTaskStatus_Sync{
Sync: &pb.SyncStatus{
TotalEvents: 0,
TotalTps: 0,
RecentTps: 0,
MasterBinlog: "",
MasterBinlogGtid: "",
SyncerBinlog: "",
SyncerBinlogGtid: "",
BlockingDDLs: nil,
UnresolvedGroups: nil,
Synced: false,
BinlogType: "",
SecondsBehindMaster: 0,
mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceName, workerName string, needError bool) {
var queryResp *pb.QueryStatusResponse
if needError {
queryResp = &pb.QueryStatusResponse{
Result: false,
Msg: "some error happened",
SourceStatus: &pb.SourceStatus{
Worker: workerName,
Source: sourceName,
},
}
} else {
queryResp = &pb.QueryStatusResponse{
Result: true,
SourceStatus: &pb.SourceStatus{
Worker: workerName,
Source: sourceName,
},
SubTaskStatus: []*pb.SubTaskStatus{
{
Stage: pb.Stage_Running,
Name: taskName,
Status: &pb.SubTaskStatus_Sync{
Sync: &pb.SyncStatus{
TotalEvents: 0,
TotalTps: 0,
RecentTps: 0,
MasterBinlog: "",
MasterBinlogGtid: "",
SyncerBinlog: "",
SyncerBinlogGtid: "",
BlockingDDLs: nil,
UnresolvedGroups: nil,
Synced: false,
BinlogType: "",
SecondsBehindMaster: 0,
},
},
},
},
{
Stage: pb.Stage_Running,
Name: taskName,
Status: &pb.SubTaskStatus_Dump{
Dump: &pb.DumpStatus{
CompletedTables: 0.0,
EstimateTotalRows: 10.0,
FinishedBytes: 0.0,
FinishedRows: 5.0,
TotalTables: 1,
{
Stage: pb.Stage_Running,
Name: taskName,
Status: &pb.SubTaskStatus_Dump{
Dump: &pb.DumpStatus{
CompletedTables: 0.0,
EstimateTotalRows: 10.0,
FinishedBytes: 0.0,
FinishedRows: 5.0,
TotalTables: 1,
},
},
},
},
},
}
}

mockWorkerClient.EXPECT().QueryStatus(
gomock.Any(),
gomock.Any(),
Expand Down
Loading

0 comments on commit f56b46f

Please sign in to comment.