Skip to content

Commit

Permalink
dm-worker/: refine query error (pingcap#512)
Browse files Browse the repository at this point in the history
* fix worker query error bug
  • Loading branch information
lichunzhu authored Mar 4, 2020
1 parent 9ff86cd commit 3ae47e8
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
35 changes: 35 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package worker

import (
"context"
"errors"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -410,6 +411,40 @@ func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
c.Assert(minPos.Pos, Equals, uint32(12))
}

func (t *testServer) TestQueryError(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)

s := NewServer(cfg)
s.closed.Set(false)

sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
w, err := NewWorker(&sourceCfg, nil)
c.Assert(err, IsNil)
w.closed.Set(closedFalse)

subtaskCfg := config.SubTaskConfig{}
err = subtaskCfg.DecodeFile(subtaskSampleFile, true)
c.Assert(err, IsNil)

// subtask failed just after it is started
st := NewSubTask(&subtaskCfg, nil)
st.fail(errors.New("mockSubtaskFail"))
w.subTaskHolder.recordSubTask(st)
s.setWorker(w, true)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := s.QueryError(ctx, &pb.QueryErrorRequest{})
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
c.Assert(resp.Result, IsTrue)
c.Assert(resp.Msg, HasLen, 0)
c.Assert(resp.SubTaskError, HasLen, 1)
c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`)
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
Expand Down
48 changes: 31 additions & 17 deletions dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/utils"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
Expand All @@ -27,12 +28,18 @@ import (

// Status returns the status of the current sub task
func (st *SubTask) Status() interface{} {
return st.CurrUnit().Status()
if cu := st.CurrUnit(); cu != nil {
return cu.Status()
}
return nil
}

// Error returns the error of the current sub task
func (st *SubTask) Error() interface{} {
return st.CurrUnit().Error()
if cu := st.CurrUnit(); cu != nil {
return cu.Error()
}
return nil
}

// StatusJSON returns the status of the current sub task as json string
Expand Down Expand Up @@ -133,7 +140,7 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
return nil // no sub task started
}

error := make([]*pb.SubTaskError, 0, len(sts))
errs := make([]*pb.SubTaskError, 0, len(sts))

// return error order by name
names := make([]string, 0, len(sts))
Expand All @@ -158,26 +165,33 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
stError = pb.SubTaskError{
Name: name,
Stage: st.Stage(),
Unit: cu.Type(),
}

// oneof error
us := cu.Error()
switch cu.Type() {
case pb.UnitType_Check:
stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)}
case pb.UnitType_Dump:
stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)}
case pb.UnitType_Load:
stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)}
case pb.UnitType_Sync:
stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)}
if cu != nil {
// oneof error
stError.Unit = cu.Type()
us := cu.Error()
switch cu.Type() {
case pb.UnitType_Check:
stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)}
case pb.UnitType_Dump:
stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)}
case pb.UnitType_Load:
stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)}
case pb.UnitType_Sync:
stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)}
}
} else if result := st.Result(); result != nil {
processErrorMsg := utils.JoinProcessErrors(result.Errors)
if len(processErrorMsg) > 0 {
stError.Error = &pb.SubTaskError_Msg{Msg: processErrorMsg}
}
}
}
error = append(error, &stError)
errs = append(errs, &stError)
}

return error
return errs
}

// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client) (w *Worker
if cfg.EnableRelay {
// initial relay holder, the cfg's password need decrypt
w.relayHolder = NewRelayHolder(cfg)
purger, err1 := w.relayHolder.Init([]purger.PurgeInterceptor{
purger1, err1 := w.relayHolder.Init([]purger.PurgeInterceptor{
w,
})
if err1 != nil {
return nil, err1
}
w.relayPurger = purger
w.relayPurger = purger1
}

// initial task status checker
Expand Down

0 comments on commit 3ae47e8

Please sign in to comment.