Skip to content

Commit

Permalink
etcdutil, server: sort out the initialization functions of etcd client (
Browse files Browse the repository at this point in the history
#7725)

ref #7499

Sort out the initialization functions of etcd client.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Jan 17, 2024
1 parent af84465 commit aa9c83c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 49 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, httpClient, err := etcdutil.CreateClients(tlsConfig, backendUrls)
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
if err != nil {
return err
}
s.SetETCDClient(etcdClient)
s.SetHTTPClient(httpClient)
s.SetHTTPClient(etcdutil.CreateHTTPClient(tlsConfig))
return nil
}

Expand Down
14 changes: 2 additions & 12 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,18 +460,8 @@ func syncUrls(client *clientv3.Client) []string {
return eps
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// createHTTPClient creates a http client with the given tls config.
func createHTTPClient(tlsConfig *tls.Config) *http.Client {
// CreateHTTPClient creates a http client with the given tls config.
func CreateHTTPClient(tlsConfig *tls.Config) *http.Client {
// FIXME: Currently, there is no timeout set for certain requests, such as GetRegions,
// which may take a significant amount of time. However, it might be necessary to
// define an appropriate timeout in the future.
Expand Down
72 changes: 37 additions & 35 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,36 +348,16 @@ func (s *Server) startEtcd(ctx context.Context) error {
return errs.ErrCancelStartEtcd.FastGenByArgs()
}

// start client
s.client, s.httpClient, err = s.startClient()
// Start the etcd and HTTP clients, then init the member.
err = s.startClient()
if err != nil {
return err
}

s.electionClient, err = s.startElectionClient()
err = s.initMember(etcd)
if err != nil {
return err
}

// update advertise peer urls.
etcdMembers, err := etcdutil.ListEtcdMembers(s.client)
if err != nil {
return err
}
etcdServerID := uint64(etcd.Server.ID())
for _, m := range etcdMembers.Members {
if etcdServerID == m.ID {
etcdPeerURLs := strings.Join(m.PeerURLs, ",")
if s.cfg.AdvertisePeerUrls != etcdPeerURLs {
log.Info("update advertise peer urls", zap.String("from", s.cfg.AdvertisePeerUrls), zap.String("to", etcdPeerURLs))
s.cfg.AdvertisePeerUrls = etcdPeerURLs
}
}
}
failpoint.Inject("memberNil", func() {
time.Sleep(1500 * time.Millisecond)
})
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
s.initGRPCServiceLabels()
return nil
}
Expand All @@ -392,29 +372,51 @@ func (s *Server) initGRPCServiceLabels() {
}
}

func (s *Server) startClient() (*clientv3.Client, *http.Client, error) {
func (s *Server) startClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return nil, nil, err
return err
}
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
if err != nil {
return nil, nil, err
return err
}
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls)
}

func (s *Server) startElectionClient() (*clientv3.Client, error) {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
/* Starting two different etcd clients here is to avoid the throttling. */
// This etcd client will be used to access the etcd cluster to read and write all kinds of meta data.
s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls)
if err != nil {
return nil, err
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
// This etcd client will only be used to read and write the election-related data, such as leader key.
s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls)
if err != nil {
return nil, err
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
s.httpClient = etcdutil.CreateHTTPClient(tlsConfig)
return nil
}

return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls)
func (s *Server) initMember(etcd *embed.Etcd) error {
// Update advertise peer URLs.
etcdMembers, err := etcdutil.ListEtcdMembers(s.client)
if err != nil {
return err
}
etcdServerID := uint64(etcd.Server.ID())
for _, m := range etcdMembers.Members {
if etcdServerID == m.ID {
etcdPeerURLs := strings.Join(m.PeerURLs, ",")
if s.cfg.AdvertisePeerUrls != etcdPeerURLs {
log.Info("update advertise peer urls", zap.String("from", s.cfg.AdvertisePeerUrls), zap.String("to", etcdPeerURLs))
s.cfg.AdvertisePeerUrls = etcdPeerURLs
}
}
}
failpoint.Inject("memberNil", func() {
time.Sleep(1500 * time.Millisecond)
})
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
return nil
}

// AddStartCallback adds a callback in the startServer phase.
Expand Down

0 comments on commit aa9c83c

Please sign in to comment.