Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/dmctl-refine-usage' into dmctl-r…
Browse files Browse the repository at this point in the history
…efine-usage
  • Loading branch information
csuzhangxc committed Sep 25, 2019
2 parents 43906c8 + 901f8f0 commit 2d71705
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 43 deletions.
18 changes: 10 additions & 8 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ErrDBDriverError,[code=10001:class=database:scope=not-set:level=high],"database driver error"
ErrDBBadConn,[code=10002:class=database:scope=not-set:level=medium],"database driver"
ErrDBInvalidConn,[code=10003:class=database:scope=not-set:level=medium],"database driver"
ErrDBUnExpect,[code=10004:class=database:scope=not-set:level=medium],"unexpect database error: %s"
ErrDBBadConn,[code=10002:class=database:scope=not-set:level=high],"database driver"
ErrDBInvalidConn,[code=10003:class=database:scope=not-set:level=high],"database driver"
ErrDBUnExpect,[code=10004:class=database:scope=not-set:level=high],"unexpect database error: %s"
ErrDBQueryFailed,[code=10005:class=database:scope=not-set:level=high],"query statement failed: %s"
ErrDBExecuteFailed,[code=10006:class=database:scope=not-set:level=high],"execute statement failed: %s"
ErrParseMydumperMeta,[code=11001:class=functional:scope=internal:level=high],"parse mydumper metadata error: %s"
Expand Down Expand Up @@ -153,11 +153,11 @@ ErrTaskCheckFailedOpenDB,[code=26002:class=task-check:scope=internal:level=high]
ErrTaskCheckNewTableRouter,[code=26003:class=task-check:scope=internal:level=medium],"new table router error"
ErrTaskCheckNewColumnMapping,[code=26004:class=task-check:scope=internal:level=medium],"new column mapping error"
ErrTaskCheckSyncConfigError,[code=26005:class=task-check:scope=internal:level=medium],"%s %v: %v\n detail: %v"
ErrRelayParseUUIDIndex,[code=28001:class=relay-util:scope=internal:level=high],"parse server-uuid.index"
ErrRelayParseUUIDSuffix,[code=28002:class=relay-util:scope=internal:level=high],"UUID (with suffix) %s not valid"
ErrRelayUUIDWithSuffixNotFound,[code=28003:class=relay-util:scope=internal:level=high],"no UUID (with suffix) matched %s found in %s, all UUIDs are %v"
ErrRelayGenFakeRotateEvent,[code=28004:class=relay-util:scope=internal:level=high],"generate fake rotate event"
ErrRelayNoValidRelaySubDir,[code=28005:class=relay-util:scope=internal:level=high],"no valid relay sub directory exists"
ErrRelayParseUUIDIndex,[code=28001:class=relay-event-lib:scope=internal:level=high],"parse server-uuid.index"
ErrRelayParseUUIDSuffix,[code=28002:class=relay-event-lib:scope=internal:level=high],"UUID (with suffix) %s not valid"
ErrRelayUUIDWithSuffixNotFound,[code=28003:class=relay-event-lib:scope=internal:level=high],"no UUID (with suffix) matched %s found in %s, all UUIDs are %v"
ErrRelayGenFakeRotateEvent,[code=28004:class=relay-event-lib:scope=internal:level=high],"generate fake rotate event"
ErrRelayNoValidRelaySubDir,[code=28005:class=relay-event-lib:scope=internal:level=high],"no valid relay sub directory exists"
ErrRelayUUIDSuffixNotValid,[code=30001:class=relay-unit:scope=internal:level=high],"UUID %s suffix %d should be 1 larger than previous suffix %d"
ErrRelayUUIDSuffixLessThanPrev,[code=30002:class=relay-unit:scope=internal:level=high],"previous UUID %s has suffix larger than %s"
ErrRelayLoadMetaData,[code=30003:class=relay-unit:scope=internal:level=high],"load meta data"
Expand Down Expand Up @@ -310,6 +310,7 @@ ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high],"op
ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high],"operation not success: %s"
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down Expand Up @@ -378,6 +379,7 @@ ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high
ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"relay log purger is purging, cannot start sub task %s, please try again later"
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium],"config toml transform"
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium],"'%s' is an invalid flag"
Expand Down
30 changes: 24 additions & 6 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,20 @@ func NewServer(cfg *Config) *Server {
idGen: tracing.NewIDGen(),
ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}),
}
server.closed.Set(true)

return &server
}

// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.MasterAddr)
if err != nil {
return terror.ErrMasterStartService.Delegate(err)
Expand Down Expand Up @@ -188,16 +195,18 @@ func (s *Server) Start() error {
return err
}

// Close close the RPC server
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.Lock()
defer s.Unlock()
if s.closed.Get() {
return
}
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("close net listener", zap.Error(err))
if s.rootLis != nil {
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("close net listener", zap.Error(err))
}
}
if s.svr != nil {
s.svr.GracefulStop()
Expand Down Expand Up @@ -1974,9 +1983,9 @@ func (s *Server) workerArgsExtractor(args ...interface{}) (workerrpc.Client, str
// HandleHTTPApis handles http apis and translate to grpc request
func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error {
// MasterAddr's format may be "host:port" or "":port"
_, port, err := net.SplitHostPort(s.cfg.MasterAddr)
_, port, err := s.splitHostPort()
if err != nil {
return terror.ErrMasterHandleHTTPApis.Delegate(err)
return err
}

opts := []grpc.DialOption{grpc.WithInsecure()}
Expand All @@ -1994,3 +2003,12 @@ func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error {

return nil
}

func (s *Server) splitHostPort() (host, port string, err error) {
// MasterAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.MasterAddr)
if err != nil {
err = terror.ErrMasterHostPortNotValid.Delegate(err, s.cfg.MasterAddr)
}
return
}
51 changes: 51 additions & 0 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package master

import (
"context"
"io/ioutil"
"net/http"

"fmt"
"io"
"sync"
Expand All @@ -30,6 +33,8 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/pbmock"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// use task config from integration test `sharding`
Expand Down Expand Up @@ -1474,3 +1479,49 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) {
}()
wg.Wait()
}

func (t *testMaster) TestServer(c *check.C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)

s := NewServer(cfg)

masterAddr := cfg.MasterAddr
s.cfg.MasterAddr = ""
err := s.Start()
c.Assert(terror.ErrMasterHostPortNotValid.Equal(err), check.IsTrue)
s.Close()
s.cfg.MasterAddr = masterAddr

go func() {
err1 := s.Start()
c.Assert(err1, check.IsNil)
}()

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return !s.closed.Get()
}), check.IsTrue)

t.testHTTPInterface(c, "status")

dupServer := NewServer(cfg)
err = dupServer.Start()
c.Assert(terror.ErrMasterStartService.Equal(err), check.IsTrue)
c.Assert(err.Error(), check.Matches, ".*bind: address already in use")

// close
s.Close()

c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s.closed.Get()
}), check.IsTrue)
}

func (t *testMaster) testHTTPInterface(c *check.C, uri string) {
resp, err := http.Get("http://127.0.0.1:8261/" + uri)
c.Assert(err, check.IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, check.Equals, 200)
_, err = ioutil.ReadAll(resp.Body)
c.Assert(err, check.IsNil)
}
31 changes: 25 additions & 6 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func NewServer(cfg *Config) *Server {
// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr)
if err != nil {
return terror.ErrWorkerStartService.Delegate(err)
Expand Down Expand Up @@ -109,17 +115,19 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

// Close close the RPC server
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.Lock()
defer s.Unlock()
if s.closed.Get() {
return
}

err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("fail to close net listener", log.ShortError(err))
if s.rootLis != nil {
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("fail to close net listener", log.ShortError(err))
}
}
if s.svr != nil {
// GracefulStop can not cancel active stream RPCs
Expand All @@ -129,8 +137,10 @@ func (s *Server) Close() {
}

// close worker and wait for return
s.worker.Close()
s.wg.Wait()
if s.worker != nil {
s.worker.Close()
s.wg.Wait()
}

s.closed.Set(true)
}
Expand Down Expand Up @@ -444,3 +454,12 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

func (s *Server) splitHostPort() (host, port string, err error) {
// WorkerAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.WorkerAddr)
if err != nil {
err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr)
}
return
}
13 changes: 13 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

Expand All @@ -50,6 +51,13 @@ func (t *testServer) TestServer(c *C) {

s := NewServer(cfg)

workerAddr := cfg.WorkerAddr
s.cfg.WorkerAddr = ""
err := s.Start()
c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue)
s.Close()
s.cfg.WorkerAddr = workerAddr

go func() {
err1 := s.Start()
c.Assert(err1, IsNil)
Expand Down Expand Up @@ -123,6 +131,11 @@ func (t *testServer) TestServer(c *C) {
c.Assert(err, IsNil)
c.Assert(opsp.Log.Success, IsFalse)

dupServer := NewServer(cfg)
err = dupServer.Start()
c.Assert(terror.ErrWorkerStartService.Equal(err), IsTrue)
c.Assert(err.Error(), Matches, ".*bind: address already in use")

// close
s.Close()

Expand Down
20 changes: 12 additions & 8 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ const (
codeMasterOperRespNotSuccess
codeMasterOperRequestTimeout
codeMasterHandleHTTPApis
codeMasterHostPortNotValid
)

// DM-worker error code
Expand Down Expand Up @@ -449,6 +450,7 @@ const (
codeWorkerExecDDLTimeout
codeWorkerWaitRelayCatchupTimeout
codeWorkerRelayIsPurging
codeWorkerHostPortNotValid
)

// DM-tracer error code
Expand All @@ -469,10 +471,10 @@ const (
var (
// Database operation related error
ErrDBDriverError = New(codeDBDriverError, ClassDatabase, ScopeNotSet, LevelHigh, "database driver error")
ErrDBBadConn = New(codeDBBadConn, ClassDatabase, ScopeNotSet, LevelMedium, "database driver")
ErrDBInvalidConn = New(codeDBInvalidConn, ClassDatabase, ScopeNotSet, LevelMedium, "database driver")
ErrDBBadConn = New(codeDBBadConn, ClassDatabase, ScopeNotSet, LevelHigh, "database driver")
ErrDBInvalidConn = New(codeDBInvalidConn, ClassDatabase, ScopeNotSet, LevelHigh, "database driver")

ErrDBUnExpect = New(codeDBUnExpect, ClassDatabase, ScopeNotSet, LevelMedium, "unexpect database error: %s")
ErrDBUnExpect = New(codeDBUnExpect, ClassDatabase, ScopeNotSet, LevelHigh, "unexpect database error: %s")
ErrDBQueryFailed = New(codeDBQueryFailed, ClassDatabase, ScopeNotSet, LevelHigh, "query statement failed: %s")
ErrDBExecuteFailed = New(codeDBExecuteFailed, ClassDatabase, ScopeNotSet, LevelHigh, "execute statement failed: %s")

Expand Down Expand Up @@ -642,11 +644,11 @@ var (
ErrTaskCheckSyncConfigError = New(codeTaskCheckSyncConfigError, ClassTaskCheck, ScopeInternal, LevelMedium, "%s %v: %v\n detail: %v")

// Relay log basic API error
ErrRelayParseUUIDIndex = New(codeRelayParseUUIDIndex, ClassRelayUtil, ScopeInternal, LevelHigh, "parse server-uuid.index")
ErrRelayParseUUIDSuffix = New(codeRelayParseUUIDSuffix, ClassRelayUtil, ScopeInternal, LevelHigh, "UUID (with suffix) %s not valid")
ErrRelayUUIDWithSuffixNotFound = New(codeRelayUUIDWithSuffixNotFound, ClassRelayUtil, ScopeInternal, LevelHigh, "no UUID (with suffix) matched %s found in %s, all UUIDs are %v")
ErrRelayGenFakeRotateEvent = New(codeRelayGenFakeRotateEvent, ClassRelayUtil, ScopeInternal, LevelHigh, "generate fake rotate event")
ErrRelayNoValidRelaySubDir = New(codeRelayNoValidRelaySubDir, ClassRelayUtil, ScopeInternal, LevelHigh, "no valid relay sub directory exists")
ErrRelayParseUUIDIndex = New(codeRelayParseUUIDIndex, ClassRelayEventLib, ScopeInternal, LevelHigh, "parse server-uuid.index")
ErrRelayParseUUIDSuffix = New(codeRelayParseUUIDSuffix, ClassRelayEventLib, ScopeInternal, LevelHigh, "UUID (with suffix) %s not valid")
ErrRelayUUIDWithSuffixNotFound = New(codeRelayUUIDWithSuffixNotFound, ClassRelayEventLib, ScopeInternal, LevelHigh, "no UUID (with suffix) matched %s found in %s, all UUIDs are %v")
ErrRelayGenFakeRotateEvent = New(codeRelayGenFakeRotateEvent, ClassRelayEventLib, ScopeInternal, LevelHigh, "generate fake rotate event")
ErrRelayNoValidRelaySubDir = New(codeRelayNoValidRelaySubDir, ClassRelayEventLib, ScopeInternal, LevelHigh, "no valid relay sub directory exists")

// Relay unit error
ErrRelayUUIDSuffixNotValid = New(codeRelayUUIDSuffixNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "UUID %s suffix %d should be 1 larger than previous suffix %d")
Expand Down Expand Up @@ -810,6 +812,7 @@ var (
ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "operation not success: %s")
ErrMasterOperRequestTimeout = New(codeMasterOperRequestTimeout, ClassDMMaster, ScopeInternal, LevelHigh, "request is timeout, but request may be successful, please execute `query-status` to check status")
ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc")
ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid")

// DM-worker error
ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set")
Expand Down Expand Up @@ -880,6 +883,7 @@ var (
ErrWorkerExecDDLTimeout = New(codeWorkerExecDDLTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking")
ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s")
ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s, please try again later")
ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid")

// DM-tracer error
ErrTracerParseFlagSet = New(codeTracerParseFlagSet, ClassDMTracer, ScopeInternal, LevelMedium, "parse dm-tracer config flag set")
Expand Down
30 changes: 15 additions & 15 deletions pkg/terror/terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
ClassBinlogOp
ClassCheckpoint
ClassTaskCheck
ClassRelayUtil
ClassRelayEventLib
ClassRelayUnit
ClassDumpUnit
ClassLoadUnit
Expand All @@ -50,20 +50,20 @@ const (
)

var errClass2Str = map[ErrClass]string{
ClassDatabase: "database",
ClassFunctional: "functional",
ClassConfig: "config",
ClassBinlogOp: "binlog-op",
ClassCheckpoint: "checkpoint",
ClassTaskCheck: "task-check",
ClassRelayUtil: "relay-util",
ClassRelayUnit: "relay-unit",
ClassDumpUnit: "dump-unit",
ClassLoadUnit: "load-unit",
ClassSyncUnit: "sync-unit",
ClassDMMaster: "dm-master",
ClassDMWorker: "dm-worker",
ClassDMTracer: "dm-tracer",
ClassDatabase: "database",
ClassFunctional: "functional",
ClassConfig: "config",
ClassBinlogOp: "binlog-op",
ClassCheckpoint: "checkpoint",
ClassTaskCheck: "task-check",
ClassRelayEventLib: "relay-event-lib",
ClassRelayUnit: "relay-unit",
ClassDumpUnit: "dump-unit",
ClassLoadUnit: "load-unit",
ClassSyncUnit: "sync-unit",
ClassDMMaster: "dm-master",
ClassDMWorker: "dm-worker",
ClassDMTracer: "dm-tracer",
}

// String implements fmt.Stringer interface
Expand Down

0 comments on commit 2d71705

Please sign in to comment.