Skip to content

Commit

Permalink
*: fix server startup panic (pingcap#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Sep 25, 2019
1 parent d888160 commit d01a5c0
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 12 deletions.
2 changes: 2 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high],"op
ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high],"operation not success: %s"
ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status"
ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc"
ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down Expand Up @@ -378,6 +379,7 @@ ErrWorkerExecDDLSyncerOnly,[code=40065:class=dm-worker:scope=internal:level=high
ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],"ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking"
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high],"wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"relay log purger is purging, cannot start sub task %s, please try again later"
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium],"config toml transform"
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium],"'%s' is an invalid flag"
Expand Down
30 changes: 24 additions & 6 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,20 @@ func NewServer(cfg *Config) *Server {
idGen: tracing.NewIDGen(),
ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}),
}
server.closed.Set(true)

return &server
}

// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.MasterAddr)
if err != nil {
return terror.ErrMasterStartService.Delegate(err)
Expand Down Expand Up @@ -188,16 +195,18 @@ func (s *Server) Start() error {
return err
}

// Close close the RPC server
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.Lock()
defer s.Unlock()
if s.closed.Get() {
return
}
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("close net listener", zap.Error(err))
if s.rootLis != nil {
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("close net listener", zap.Error(err))
}
}
if s.svr != nil {
s.svr.GracefulStop()
Expand Down Expand Up @@ -1974,9 +1983,9 @@ func (s *Server) workerArgsExtractor(args ...interface{}) (workerrpc.Client, str
// HandleHTTPApis handles http apis and translate to grpc request
func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error {
// MasterAddr's format may be "host:port" or "":port"
_, port, err := net.SplitHostPort(s.cfg.MasterAddr)
_, port, err := s.splitHostPort()
if err != nil {
return terror.ErrMasterHandleHTTPApis.Delegate(err)
return err
}

opts := []grpc.DialOption{grpc.WithInsecure()}
Expand All @@ -1994,3 +2003,12 @@ func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error {

return nil
}

func (s *Server) splitHostPort() (host, port string, err error) {
// MasterAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.MasterAddr)
if err != nil {
err = terror.ErrMasterHostPortNotValid.Delegate(err, s.cfg.MasterAddr)
}
return
}
51 changes: 51 additions & 0 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package master

import (
"context"
"io/ioutil"
"net/http"

"fmt"
"io"
"sync"
Expand All @@ -30,6 +33,8 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/pbmock"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// use task config from integration test `sharding`
Expand Down Expand Up @@ -1474,3 +1479,49 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) {
}()
wg.Wait()
}

func (t *testMaster) TestServer(c *check.C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil)

s := NewServer(cfg)

masterAddr := cfg.MasterAddr
s.cfg.MasterAddr = ""
err := s.Start()
c.Assert(terror.ErrMasterHostPortNotValid.Equal(err), check.IsTrue)
s.Close()
s.cfg.MasterAddr = masterAddr

go func() {
err1 := s.Start()
c.Assert(err1, check.IsNil)
}()

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return !s.closed.Get()
}), check.IsTrue)

t.testHTTPInterface(c, "status")

dupServer := NewServer(cfg)
err = dupServer.Start()
c.Assert(terror.ErrMasterStartService.Equal(err), check.IsTrue)
c.Assert(err.Error(), check.Matches, ".*bind: address already in use")

// close
s.Close()

c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s.closed.Get()
}), check.IsTrue)
}

func (t *testMaster) testHTTPInterface(c *check.C, uri string) {
resp, err := http.Get("http://127.0.0.1:8261/" + uri)
c.Assert(err, check.IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, check.Equals, 200)
_, err = ioutil.ReadAll(resp.Body)
c.Assert(err, check.IsNil)
}
31 changes: 25 additions & 6 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func NewServer(cfg *Config) *Server {
// Start starts to serving
func (s *Server) Start() error {
var err error

_, _, err = s.splitHostPort()
if err != nil {
return err
}

s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr)
if err != nil {
return terror.ErrWorkerStartService.Delegate(err)
Expand Down Expand Up @@ -109,17 +115,19 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

// Close close the RPC server
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.Lock()
defer s.Unlock()
if s.closed.Get() {
return
}

err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("fail to close net listener", log.ShortError(err))
if s.rootLis != nil {
err := s.rootLis.Close()
if err != nil && !common.IsErrNetClosing(err) {
log.L().Error("fail to close net listener", log.ShortError(err))
}
}
if s.svr != nil {
// GracefulStop can not cancel active stream RPCs
Expand All @@ -129,8 +137,10 @@ func (s *Server) Close() {
}

// close worker and wait for return
s.worker.Close()
s.wg.Wait()
if s.worker != nil {
s.worker.Close()
s.wg.Wait()
}

s.closed.Set(true)
}
Expand Down Expand Up @@ -444,3 +454,12 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

func (s *Server) splitHostPort() (host, port string, err error) {
// WorkerAddr's format may be "host:port" or ":port"
host, port, err = net.SplitHostPort(s.cfg.WorkerAddr)
if err != nil {
err = terror.ErrWorkerHostPortNotValid.Delegate(err, s.cfg.WorkerAddr)
}
return
}
13 changes: 13 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

Expand All @@ -50,6 +51,13 @@ func (t *testServer) TestServer(c *C) {

s := NewServer(cfg)

workerAddr := cfg.WorkerAddr
s.cfg.WorkerAddr = ""
err := s.Start()
c.Assert(terror.ErrWorkerHostPortNotValid.Equal(err), IsTrue)
s.Close()
s.cfg.WorkerAddr = workerAddr

go func() {
err1 := s.Start()
c.Assert(err1, IsNil)
Expand Down Expand Up @@ -123,6 +131,11 @@ func (t *testServer) TestServer(c *C) {
c.Assert(err, IsNil)
c.Assert(opsp.Log.Success, IsFalse)

dupServer := NewServer(cfg)
err = dupServer.Start()
c.Assert(terror.ErrWorkerStartService.Equal(err), IsTrue)
c.Assert(err.Error(), Matches, ".*bind: address already in use")

// close
s.Close()

Expand Down
4 changes: 4 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ const (
codeMasterOperRespNotSuccess
codeMasterOperRequestTimeout
codeMasterHandleHTTPApis
codeMasterHostPortNotValid
)

// DM-worker error code
Expand Down Expand Up @@ -449,6 +450,7 @@ const (
codeWorkerExecDDLTimeout
codeWorkerWaitRelayCatchupTimeout
codeWorkerRelayIsPurging
codeWorkerHostPortNotValid
)

// DM-tracer error code
Expand Down Expand Up @@ -810,6 +812,7 @@ var (
ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "operation not success: %s")
ErrMasterOperRequestTimeout = New(codeMasterOperRequestTimeout, ClassDMMaster, ScopeInternal, LevelHigh, "request is timeout, but request may be successful, please execute `query-status` to check status")
ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc")
ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid")

// DM-worker error
ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set")
Expand Down Expand Up @@ -880,6 +883,7 @@ var (
ErrWorkerExecDDLTimeout = New(codeWorkerExecDDLTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "ExecuteDDL timeout, try use `query-status` to query whether the DDL is still blocking")
ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "wait relay catchup timeout, loader end binlog pos: %s, relay binlog pos: %s")
ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s, please try again later")
ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid")

// DM-tracer error
ErrTracerParseFlagSet = New(codeTracerParseFlagSet, ClassDMTracer, ScopeInternal, LevelMedium, "parse dm-tracer config flag set")
Expand Down

0 comments on commit d01a5c0

Please sign in to comment.