diff --git a/dm/master/etcd.go b/dm/master/etcd.go index a45aa193b1..eb0b97cd0a 100644 --- a/dm/master/etcd.go +++ b/dm/master/etcd.go @@ -41,7 +41,7 @@ const ( // startEtcd starts an embedded etcd server. func startEtcd(etcdCfg *embed.Config, gRPCSvr func(*grpc.Server), - httpHandles map[string]http.Handler) (*embed.Etcd, error) { + httpHandles map[string]http.Handler, startTimeout time.Duration) (*embed.Etcd, error) { // attach extra gRPC and HTTP server if gRPCSvr != nil { etcdCfg.ServiceRegister = gRPCSvr @@ -57,10 +57,17 @@ func startEtcd(etcdCfg *embed.Config, select { case <-e.Server.ReadyNotify(): - case <-time.After(etcdStartTimeout): + case <-time.After(startTimeout): + // if fail to startup, the etcd server may be still blocking in + // https://github.com/etcd-io/etcd/blob/3cf2f69b5738fb702ba1a935590f36b52b18979b/embed/serve.go#L92 + // then `e.Close` will block in + // https://github.com/etcd-io/etcd/blob/3cf2f69b5738fb702ba1a935590f36b52b18979b/embed/etcd.go#L377 + // because `close(sctx.serversC)` has not been called in + // https://github.com/etcd-io/etcd/blob/3cf2f69b5738fb702ba1a935590f36b52b18979b/embed/serve.go#L200. + // so for `ReadyNotify` timeout, we choose to only call `e.Server.Stop()` now, + // and we should exit the DM-master process after returned with error from this function. e.Server.Stop() - e.Close() - return nil, terror.ErrMasterStartEmbedEtcdFail.Generatef("start embed etcd timeout %v", etcdStartTimeout) + return nil, terror.ErrMasterStartEmbedEtcdFail.Generatef("start embed etcd timeout %v", startTimeout) } return e, nil } diff --git a/dm/master/etcd_test.go b/dm/master/etcd_test.go index 5614e9856a..cd24811365 100644 --- a/dm/master/etcd_test.go +++ b/dm/master/etcd_test.go @@ -40,6 +40,28 @@ func (t *testEtcdSuite) SetUpSuite(c *check.C) { log.InitLogger(&log.Config{}) } +func (t *testEtcdSuite) TestStartEtcdFail(c *check.C) { + cfgCluster := NewConfig() + cfgCluster.Name = "dm-master-1" + cfgCluster.DataDir = c.MkDir() + cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgCluster.PeerUrls = tempurl.Alloc() + c.Assert(cfgCluster.adjust(), check.IsNil) + + // add another non-existing member for bootstrapping. + cfgCluster.InitialCluster = fmt.Sprintf("%s=%s,%s=%s", + cfgCluster.Name, cfgCluster.AdvertisePeerUrls, + "dm-master-2", tempurl.Alloc()) + c.Assert(cfgCluster.adjust(), check.IsNil) + + // start an etcd cluster + cfgClusterEtcd, err := cfgCluster.genEmbedEtcdConfig() + c.Assert(err, check.IsNil) + e, err := startEtcd(cfgClusterEtcd, nil, nil, 3*time.Second) + c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err), check.IsTrue) + c.Assert(e, check.IsNil) +} + func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { cfgCluster := NewConfig() // used to start an etcd cluster cfgCluster.Name = "dm-master-1" @@ -103,7 +125,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { c.Assert(os.RemoveAll(memberDP), check.IsNil) // remove previous data // start an etcd cluster - e1, err := startEtcd(cfgClusterEtcd, nil, nil) + e1, err := startEtcd(cfgClusterEtcd, nil, nil, etcdStartTimeout) c.Assert(err, check.IsNil) defer e1.Close() @@ -156,7 +178,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { // start the joining etcd cfgAfterEtcd, err := cfgAfter.genEmbedEtcdConfig() c.Assert(err, check.IsNil) - e2, err := startEtcd(cfgAfterEtcd, nil, nil) + e2, err := startEtcd(cfgAfterEtcd, nil, nil, etcdStartTimeout) c.Assert(err, check.IsNil) defer e2.Close() diff --git a/dm/master/server.go b/dm/master/server.go index 87f8bbaf15..c7229293c2 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -161,7 +161,7 @@ func (s *Server) Start(ctx context.Context) (err error) { gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) } // start embed etcd server, gRPC API server and HTTP (API, status and debug) server. - s.etcd, err = startEtcd(etcdCfg, gRPCSvr, userHandles) + s.etcd, err = startEtcd(etcdCfg, gRPCSvr, userHandles, etcdStartTimeout) if err != nil { return }