diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index bd2c07e32d..896a295340 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -310,6 +310,7 @@ ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high],"op ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high],"operation not success: %s" ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status" ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc" +ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid" ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set" ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag" ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file" @@ -378,6 +379,7 @@ ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking" ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s" ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"relay log purger is purging, cannot start sub task %s, please try again later" +ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid" ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set" ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium],"config toml transform" ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium],"'%s' is an invalid flag" diff --git a/dm/master/server.go b/dm/master/server.go index 7d07a4ebdb..26deec4b73 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -88,6 +88,7 @@ func NewServer(cfg *Config) *Server { idGen: tracing.NewIDGen(), ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}), } + server.closed.Set(true) return &server } @@ -95,6 +96,12 @@ func NewServer(cfg *Config) *Server { // Start starts to serving func (s *Server) Start() error { var err error + + _, _, err = s.splitHostPort() + if err != nil { + return err + } + s.rootLis, err = net.Listen("tcp", s.cfg.MasterAddr) if err != nil { return terror.ErrMasterStartService.Delegate(err) @@ -188,16 +195,18 @@ func (s *Server) Start() error { return err } -// Close close the RPC server +// Close close the RPC server, this function can be called multiple times func (s *Server) Close() { s.Lock() defer s.Unlock() if s.closed.Get() { return } - err := s.rootLis.Close() - if err != nil && !common.IsErrNetClosing(err) { - log.L().Error("close net listener", zap.Error(err)) + if s.rootLis != nil { + err := s.rootLis.Close() + if err != nil && !common.IsErrNetClosing(err) { + log.L().Error("close net listener", zap.Error(err)) + } } if s.svr != nil { s.svr.GracefulStop() @@ -1974,9 +1983,9 @@ func (s *Server) workerArgsExtractor(args ...interface{}) (workerrpc.Client, str // HandleHTTPApis handles http apis and translate to grpc request func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error { // MasterAddr's format may be "host:port" or "":port" - _, port, err := net.SplitHostPort(s.cfg.MasterAddr) + _, port, err := s.splitHostPort() if err != nil { - return terror.ErrMasterHandleHTTPApis.Delegate(err) + return err } opts := []grpc.DialOption{grpc.WithInsecure()} @@ -1994,3 +2003,12 @@ func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error { return nil } + +func (s *Server) splitHostPort() (host, port string, err error) { + // MasterAddr's format may be "host:port" or ":port" + host, port, err = net.SplitHostPort(s.cfg.MasterAddr) + if err != nil { + err = terror.ErrMasterHostPortNotValid.Delegate(err, s.cfg.MasterAddr) + } + return +} diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 3fe17317d9..2dbcaf344f 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -15,6 +15,9 @@ package master import ( "context" + "io/ioutil" + "net/http" + "fmt" "io" "sync" @@ -30,6 +33,8 @@ import ( "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/pbmock" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // use task config from integration test `sharding` @@ -1474,3 +1479,49 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) { }() wg.Wait() } + +func (t *testMaster) TestServer(c *check.C) { + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) + + s := NewServer(cfg) + + masterAddr := cfg.MasterAddr + s.cfg.MasterAddr = "" + err := s.Start() + c.Assert(terror.ErrMasterHostPortNotValid.Equal(err), check.IsTrue) + s.Close() + s.cfg.MasterAddr = masterAddr + + go func() { + err1 := s.Start() + c.Assert(err1, check.IsNil) + }() + + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return !s.closed.Get() + }), check.IsTrue) + + t.testHTTPInterface(c, "status") + + dupServer := NewServer(cfg) + err = dupServer.Start() + c.Assert(terror.ErrMasterStartService.Equal(err), check.IsTrue) + c.Assert(err.Error(), check.Matches, ".*bind: address already in use") + + // close + s.Close() + + c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool { + return s.closed.Get() + }), check.IsTrue) +} + +func (t *testMaster) testHTTPInterface(c *check.C, uri string) { + resp, err := http.Get("http://127.0.0.1:8261/" + uri) + c.Assert(err, check.IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, check.Equals, 200) + _, err = ioutil.ReadAll(resp.Body) + c.Assert(err, check.IsNil) +} diff --git a/dm/worker/server.go b/dm/worker/server.go index 24b1102c18..ba392b5c2d 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -64,6 +64,12 @@ func NewServer(cfg *Config) *Server { // Start starts to serving func (s *Server) Start() error { var err error + + _, _, err = s.splitHostPort() + if err != nil { + return err + } + s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr) if err != nil { return terror.ErrWorkerStartService.Delegate(err) @@ -109,7 +115,7 @@ func (s *Server) Start() error { return terror.ErrWorkerStartService.Delegate(err) } -// Close close the RPC server +// Close close the RPC server, this function can be called multiple times func (s *Server) Close() { s.Lock() defer s.Unlock() @@ -117,9 +123,11 @@ func (s *Server) Close() { return } - err := s.rootLis.Close() - if err != nil && !common.IsErrNetClosing(err) { - log.L().Error("fail to close net listener", log.ShortError(err)) + if s.rootLis != nil { + err := s.rootLis.Close() + if err != nil && !common.IsErrNetClosing(err) { + log.L().Error("fail to close net listener", log.ShortError(err)) + } } if s.svr != nil { // GracefulStop can not cancel active stream RPCs @@ -129,8 +137,10 @@ func (s *Server) Close() { } // close worker and wait for return - s.worker.Close() - s.wg.Wait() + if s.worker != nil { + s.worker.Close() + s.wg.Wait() + } s.closed.Set(true) } @@ -444,3 +454,12 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { } return resp } + +func (s *Server) splitHostPort() (host, port string, err error) { + // WorkerAddr's format may be "host:port" or ":port" + host, port, err = net.SplitHostPort(s.cfg.WorkerAddr) + if err != nil { + err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr) + } + return +} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 7ddc64cb54..16313aa601 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) @@ -50,6 +51,13 @@ func (t *testServer) TestServer(c *C) { s := NewServer(cfg) + workerAddr := cfg.WorkerAddr + s.cfg.WorkerAddr = "" + err := s.Start() + c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue) + s.Close() + s.cfg.WorkerAddr = workerAddr + go func() { err1 := s.Start() c.Assert(err1, IsNil) @@ -123,6 +131,11 @@ func (t *testServer) TestServer(c *C) { c.Assert(err, IsNil) c.Assert(opsp.Log.Success, IsFalse) + dupServer := NewServer(cfg) + err = dupServer.Start() + c.Assert(terror.ErrWorkerStartService.Equal(err), IsTrue) + c.Assert(err.Error(), Matches, ".*bind: address already in use") + // close s.Close() diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 8684c8d15a..07c0551349 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -377,6 +377,7 @@ const ( codeMasterOperRespNotSuccess codeMasterOperRequestTimeout codeMasterHandleHTTPApis + codeMasterHostPortNotValid ) // DM-worker error code @@ -449,6 +450,7 @@ const ( codeWorkerExecDDLTimeout codeWorkerWaitRelayCatchupTimeout codeWorkerRelayIsPurging + codeWorkerHostPortNotValid ) // DM-tracer error code @@ -810,6 +812,7 @@ var ( ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "operation not success: %s") ErrMasterOperRequestTimeout = New(codeMasterOperRequestTimeout, ClassDMMaster, ScopeInternal, LevelHigh, "request is timeout, but request may be successful, please execute `query-status` to check status") ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc") + ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid") // DM-worker error ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set") @@ -880,6 +883,7 @@ var ( ErrWorkerExecDDLTimeout = New(codeWorkerExecDDLTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking") ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s") ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s, please try again later") + ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid") // DM-tracer error ErrTracerParseFlagSet = New(codeTracerParseFlagSet, ClassDMTracer, ScopeInternal, LevelMedium, "parse dm-tracer config flag set")