diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index 3a21da0f9d90..afb56c2edd91 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -23,7 +23,7 @@ import ( // Server defines the common basic behaviors of a server type Server interface { - // Name returns the unique Name for this server in the cluster. + // Name returns the unique name for this server in the cluster. Name() string // GetAddr returns the address of the server. GetAddr() string diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 7cb04c2619e6..5cb924599929 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -16,11 +16,8 @@ package server import ( "context" - "crypto/tls" "fmt" - "net" "net/http" - "net/url" "os" "os/signal" "path" @@ -36,8 +33,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/spf13/cobra" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -47,60 +46,40 @@ import ( "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/versioninfo" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) +var _ bs.Server = (*Server)(nil) + // Server is the resource manager server, and it implements bs.Server. type Server struct { + *server.BaseServer diagnosticspb.DiagnosticsServer // Server state. 0 is not running, 1 is running. isRunning int64 - // Server start timestamp - startTimestamp int64 - ctx context.Context serverLoopCtx context.Context serverLoopCancel func() serverLoopWg sync.WaitGroup cfg *Config clusterID uint64 - name string - listenURL *url.URL // for the primary election of resource manager participant *member.Participant - etcdClient *clientv3.Client - httpClient *http.Client - - secure bool - muxListener net.Listener - grpcServer *grpc.Server - httpServer *http.Server - service *Service - // Store as map[string]*grpc.ClientConn - clientConns sync.Map + service *Service - // Callback functions for different stages - // startCallbacks will be called after the server is started. - startCallbacks []func() // primaryCallbacks will be called after the server becomes leader. primaryCallbacks []func(context.Context) serviceRegister *discovery.ServiceRegister } -// Name returns the unique etcd name for this server in etcd cluster. +// Name returns the unique name for this server in the resource manager cluster. func (s *Server) Name() string { - return s.name -} - -// Context returns the context. -func (s *Server) Context() context.Context { - return s.ctx + return s.cfg.Name } // GetAddr returns the server address. @@ -126,7 +105,7 @@ func (s *Server) Run() (err error) { } func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) s.serverLoopWg.Add(1) go s.primaryElectionLoop() } @@ -221,18 +200,18 @@ func (s *Server) Close() { s.serviceRegister.Deregister() utils.StopHTTPServer(s) utils.StopGRPCServer(s) - s.muxListener.Close() + s.GetListener().Close() s.serverLoopCancel() s.serverLoopWg.Wait() - if s.etcdClient != nil { - if err := s.etcdClient.Close(); err != nil { + if s.GetClient() != nil { + if err := s.GetClient().Close(); err != nil { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } - if s.httpClient != nil { - s.httpClient.CloseIdleConnections() + if s.GetHTTPClient() != nil { + s.GetHTTPClient().CloseIdleConnections() } log.Info("resource manager server is closed") @@ -243,21 +222,6 @@ func (s *Server) GetControllerConfig() *ControllerConfig { return &s.cfg.Controller } -// GetClient returns builtin etcd client. -func (s *Server) GetClient() *clientv3.Client { - return s.etcdClient -} - -// GetHTTPClient returns builtin http client. -func (s *Server) GetHTTPClient() *http.Client { - return s.httpClient -} - -// AddStartCallback adds a callback in the startServer phase. -func (s *Server) AddStartCallback(callbacks ...func()) { - s.startCallbacks = append(s.startCallbacks, callbacks...) -} - // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) IsServing() bool { return !s.IsClosed() && s.participant.IsLeader() @@ -268,11 +232,6 @@ func (s *Server) IsClosed() bool { return s != nil && atomic.LoadInt64(&s.isRunning) == 0 } -// IsSecure checks if the server enable TLS. -func (s *Server) IsSecure() bool { - return s.secure -} - // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) @@ -283,11 +242,6 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } -// GetClientConns returns the client connections. -func (s *Server) GetClientConns() *sync.Map { - return &s.clientConns -} - // ServerLoopWgDone decreases the server loop wait group. func (s *Server) ServerLoopWgDone() { s.serverLoopWg.Done() @@ -298,76 +252,28 @@ func (s *Server) ServerLoopWgAdd(n int) { s.serverLoopWg.Add(n) } -// GetHTTPServer returns the http server. -func (s *Server) GetHTTPServer() *http.Server { - return s.httpServer -} - -// SetHTTPServer sets the http server. -func (s *Server) SetHTTPServer(httpServer *http.Server) { - s.httpServer = httpServer -} - // SetUpRestHandler sets up the REST handler. func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { return SetUpRestHandler(s.service) } -// GetGRPCServer returns the grpc server. -func (s *Server) GetGRPCServer() *grpc.Server { - return s.grpcServer -} - -// SetGRPCServer sets the grpc server. -func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { - s.grpcServer = grpcServer -} - // RegisterGRPCService registers the grpc service. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { s.service.RegisterGRPCService(grpcServer) } -// SetETCDClient sets the etcd client. -func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { - s.etcdClient = etcdClient -} - -// SetHTTPClient sets the http client. -func (s *Server) SetHTTPClient(httpClient *http.Client) { - s.httpClient = httpClient -} - // GetTLSConfig gets the security config. func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig } -// GetDelegateClient returns grpc client connection talking to the forwarded host -func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { - client, ok := s.clientConns.Load(forwardedHost) - if !ok { - tlsConfig, err := s.GetTLSConfig().ToTLSConfig() - if err != nil { - return nil, err - } - cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) - if err != nil { - return nil, err - } - client = cc - s.clientConns.Store(forwardedHost, cc) - } - return client.(*grpc.ClientConn), nil -} - // GetLeaderListenUrls gets service endpoints from the leader in election group. func (s *Server) GetLeaderListenUrls() []string { return s.participant.GetLeaderListenUrls() } func (s *Server) startServer() (err error) { - if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil { return err } log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) @@ -379,42 +285,29 @@ func (s *Server) startServer() (err error) { uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID) - s.participant = member.NewParticipant(s.etcdClient) + s.participant = member.NewParticipant(s.GetClient()) s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) - s.listenURL, err = url.Parse(s.cfg.ListenAddr) - if err != nil { - return err - } + s.service = &Service{ - ctx: s.ctx, + ctx: s.Context(), manager: NewManager[*Server](s), } - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - if tlsConfig != nil { - s.secure = true - s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) - } else { - s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) - } - if err != nil { + if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) - go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) <-serverReadyChan s.startServerLoop() // Run callbacks log.Info("triggering the start callback functions") - for _, cb := range s.startCallbacks { + for _, cb := range s.GetStartCallbacks() { cb() } @@ -424,7 +317,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10), utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) if err := s.serviceRegister.Register(); err != nil { log.Error("failed to register the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err)) @@ -437,10 +330,9 @@ func (s *Server) startServer() (err error) { // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *Config) *Server { svr := &Server{ + BaseServer: server.NewBaseServer(ctx), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - startTimestamp: time.Now().Unix(), cfg: cfg, - ctx: ctx, } return svr } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 3591974d4396..0f4562cce8c3 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -16,11 +16,8 @@ package server import ( "context" - "crypto/tls" "fmt" - "net" "net/http" - "net/url" "os" "os/signal" "path" @@ -36,11 +33,13 @@ import ( "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/spf13/cobra" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" + "github.com/tikv/pd/pkg/mcs/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/schedule" @@ -53,51 +52,33 @@ import ( "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/versioninfo" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) +var _ bs.Server = (*Server)(nil) + // Server is the scheduling server, and it implements bs.Server. type Server struct { + *server.BaseServer diagnosticspb.DiagnosticsServer // Server state. 0 is not running, 1 is running. isRunning int64 - // Server start timestamp - startTimestamp int64 - ctx context.Context serverLoopCtx context.Context serverLoopCancel func() serverLoopWg sync.WaitGroup cfg *config.Config - name string clusterID uint64 - listenURL *url.URL persistConfig *config.PersistConfig - // etcd client - etcdClient *clientv3.Client - // http client - httpClient *http.Client - // for the primary election of scheduling participant *member.Participant - secure bool - muxListener net.Listener - grpcServer *grpc.Server - httpServer *http.Server - service *Service - - // Store as map[string]*grpc.ClientConn - clientConns sync.Map + service *Service - // Callback functions for different stages - // startCallbacks will be called after the server is started. - startCallbacks []func() // primaryCallbacks will be called after the server becomes leader. primaryCallbacks []func(context.Context) @@ -114,14 +95,9 @@ type Server struct { ruleWatcher *rule.Watcher } -// Name returns the unique etcd name for this server in etcd cluster. +// Name returns the unique name for this server in the scheduling cluster. func (s *Server) Name() string { - return s.name -} - -// Context returns the context. -func (s *Server) Context() context.Context { - return s.ctx + return s.cfg.Name } // GetAddr returns the server address. @@ -134,11 +110,6 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } -// GetClientConns returns the client connections. -func (s *Server) GetClientConns() *sync.Map { - return &s.clientConns -} - // Run runs the scheduling server. func (s *Server) Run() error { skipWaitAPIServiceReady := false @@ -158,7 +129,7 @@ func (s *Server) Run() error { } func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) s.serverLoopWg.Add(1) go s.primaryElectionLoop() } @@ -253,38 +224,23 @@ func (s *Server) Close() { s.serviceRegister.Deregister() utils.StopHTTPServer(s) utils.StopGRPCServer(s) - s.muxListener.Close() + s.GetListener().Close() s.GetCoordinator().Stop() s.serverLoopCancel() s.serverLoopWg.Wait() - if s.etcdClient != nil { - if err := s.etcdClient.Close(); err != nil { + if s.GetClient() != nil { + if err := s.GetClient().Close(); err != nil { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } - if s.httpClient != nil { - s.httpClient.CloseIdleConnections() + if s.GetHTTPClient() != nil { + s.GetHTTPClient().CloseIdleConnections() } log.Info("scheduling server is closed") } -// GetClient returns builtin etcd client. -func (s *Server) GetClient() *clientv3.Client { - return s.etcdClient -} - -// GetHTTPClient returns builtin http client. -func (s *Server) GetHTTPClient() *http.Client { - return s.httpClient -} - -// AddStartCallback adds a callback in the startServer phase. -func (s *Server) AddStartCallback(callbacks ...func()) { - s.startCallbacks = append(s.startCallbacks, callbacks...) -} - // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) IsServing() bool { return !s.IsClosed() && s.participant.IsLeader() @@ -295,34 +251,11 @@ func (s *Server) IsClosed() bool { return s != nil && atomic.LoadInt64(&s.isRunning) == 0 } -// IsSecure checks if the server enable TLS. -func (s *Server) IsSecure() bool { - return s.secure -} - // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } -// GetDelegateClient returns grpc client connection talking to the forwarded host -func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { - client, ok := s.clientConns.Load(forwardedHost) - if !ok { - tlsConfig, err := s.GetTLSConfig().ToTLSConfig() - if err != nil { - return nil, err - } - cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) - if err != nil { - return nil, err - } - client = cc - s.clientConns.Store(forwardedHost, cc) - } - return client.(*grpc.ClientConn), nil -} - // GetTLSConfig gets the security config. func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig @@ -348,53 +281,23 @@ func (s *Server) ServerLoopWgAdd(n int) { s.serverLoopWg.Add(n) } -// GetHTTPServer returns the http server. -func (s *Server) GetHTTPServer() *http.Server { - return s.httpServer -} - -// SetHTTPServer sets the http server. -func (s *Server) SetHTTPServer(httpServer *http.Server) { - s.httpServer = httpServer -} - // SetUpRestHandler sets up the REST handler. func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { return SetUpRestHandler(s.service) } -// GetGRPCServer returns the grpc server. -func (s *Server) GetGRPCServer() *grpc.Server { - return s.grpcServer -} - -// SetGRPCServer sets the grpc server. -func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { - s.grpcServer = grpcServer -} - // RegisterGRPCService registers the grpc service. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { s.service.RegisterGRPCService(grpcServer) } -// SetETCDClient sets the etcd client. -func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { - s.etcdClient = etcdClient -} - -// SetHTTPClient sets the http client. -func (s *Server) SetHTTPClient(httpClient *http.Client) { - s.httpClient = httpClient -} - // GetLeaderListenUrls gets service endpoints from the leader in election group. func (s *Server) GetLeaderListenUrls() []string { return s.participant.GetLeaderListenUrls() } func (s *Server) startServer() (err error) { - if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil { return err } log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) @@ -406,36 +309,23 @@ func (s *Server) startServer() (err error) { uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID) - s.participant = member.NewParticipant(s.etcdClient) + s.participant = member.NewParticipant(s.GetClient()) s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil) + kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil) basicCluster := core.NewBasicCluster() - s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, basicCluster) - s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, basicCluster, s.hbStreams) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, basicCluster) + s.cluster, err = NewCluster(s.Context(), s.cfg, s.storage, basicCluster, s.hbStreams) if err != nil { return err } - s.listenURL, err = url.Parse(s.cfg.ListenAddr) - if err != nil { - return err - } s.service = &Service{Server: s} - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - if tlsConfig != nil { - s.secure = true - s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) - } else { - s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) - } - if err != nil { + if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } + err = s.startWatcher() if err != nil { return err @@ -445,13 +335,13 @@ func (s *Server) startServer() (err error) { serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) - go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) <-serverReadyChan s.startServerLoop() // Run callbacks log.Info("triggering the start callback functions") - for _, cb := range s.startCallbacks { + for _, cb := range s.GetStartCallbacks() { cb() } @@ -460,7 +350,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10), utils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) if err := s.serviceRegister.Register(); err != nil { log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) @@ -472,13 +362,13 @@ func (s *Server) startServer() (err error) { func (s *Server) startWatcher() (err error) { s.configWatcher, err = config.NewWatcher( - s.ctx, s.etcdClient, s.clusterID, s.persistConfig, + s.Context(), s.GetClient(), s.clusterID, s.persistConfig, ) if err != nil { return err } s.ruleWatcher, err = rule.NewWatcher( - s.ctx, s.etcdClient, s.clusterID, + s.Context(), s.GetClient(), s.clusterID, ) return err } @@ -486,11 +376,10 @@ func (s *Server) startWatcher() (err error) { // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *config.Config) *Server { svr := &Server{ + BaseServer: server.NewBaseServer(ctx), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - startTimestamp: time.Now().Unix(), cfg: cfg, persistConfig: config.NewPersistConfig(cfg), - ctx: ctx, } return svr } diff --git a/pkg/mcs/server/server.go b/pkg/mcs/server/server.go new file mode 100644 index 000000000000..c5f05687d95c --- /dev/null +++ b/pkg/mcs/server/server.go @@ -0,0 +1,164 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "net/url" + "sync" + "time" + + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/grpcutil" + "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" +) + +// BaseServer is a basic server that provides some common functionality. +type BaseServer struct { + ctx context.Context + // etcd client + etcdClient *clientv3.Client + // http client + httpClient *http.Client + grpcServer *grpc.Server + httpServer *http.Server + // Store as map[string]*grpc.ClientConn + clientConns sync.Map + secure bool + muxListener net.Listener + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + startTimestamp int64 +} + +// NewBaseServer creates a new BaseServer. +func NewBaseServer(ctx context.Context) *BaseServer { + return &BaseServer{ + ctx: ctx, + startTimestamp: time.Now().Unix(), + } +} + +// Context returns the context of server. +func (bs *BaseServer) Context() context.Context { + return bs.ctx +} + +// GetDelegateClient returns grpc client connection talking to the forwarded host +func (bs *BaseServer) GetDelegateClient(ctx context.Context, tlsCfg *grpcutil.TLSConfig, forwardedHost string) (*grpc.ClientConn, error) { + client, ok := bs.clientConns.Load(forwardedHost) + if !ok { + tlsConfig, err := tlsCfg.ToTLSConfig() + if err != nil { + return nil, err + } + cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) + if err != nil { + return nil, err + } + client = cc + bs.clientConns.Store(forwardedHost, cc) + } + return client.(*grpc.ClientConn), nil +} + +// GetClientConns returns the client connections. +func (bs *BaseServer) GetClientConns() *sync.Map { + return &bs.clientConns +} + +// GetClient returns builtin etcd client. +func (bs *BaseServer) GetClient() *clientv3.Client { + return bs.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (bs *BaseServer) GetHTTPClient() *http.Client { + return bs.httpClient +} + +// SetETCDClient sets the etcd client. +func (bs *BaseServer) SetETCDClient(etcdClient *clientv3.Client) { + bs.etcdClient = etcdClient +} + +// SetHTTPClient sets the http client. +func (bs *BaseServer) SetHTTPClient(httpClient *http.Client) { + bs.httpClient = httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (bs *BaseServer) AddStartCallback(callbacks ...func()) { + bs.startCallbacks = append(bs.startCallbacks, callbacks...) +} + +// GetStartCallbacks returns the start callbacks. +func (bs *BaseServer) GetStartCallbacks() []func() { + return bs.startCallbacks +} + +// GetHTTPServer returns the http server. +func (bs *BaseServer) GetHTTPServer() *http.Server { + return bs.httpServer +} + +// SetHTTPServer sets the http server. +func (bs *BaseServer) SetHTTPServer(httpServer *http.Server) { + bs.httpServer = httpServer +} + +// GetGRPCServer returns the grpc server. +func (bs *BaseServer) GetGRPCServer() *grpc.Server { + return bs.grpcServer +} + +// SetGRPCServer sets the grpc server. +func (bs *BaseServer) SetGRPCServer(grpcServer *grpc.Server) { + bs.grpcServer = grpcServer +} + +// InitListener initializes the listener. +func (bs *BaseServer) InitListener(tlsCfg *grpcutil.TLSConfig, listenAddr string) error { + listenURL, err := url.Parse(listenAddr) + if err != nil { + return err + } + tlsConfig, err := tlsCfg.ToTLSConfig() + if err != nil { + return err + } + if tlsConfig != nil { + bs.secure = true + bs.muxListener, err = tls.Listen(utils.TCPNetworkStr, listenURL.Host, tlsConfig) + } else { + bs.muxListener, err = net.Listen(utils.TCPNetworkStr, listenURL.Host) + } + return err +} + +// GetListener returns the listener. +func (bs *BaseServer) GetListener() net.Listener { + return bs.muxListener +} + +// IsSecure checks if the server enable TLS. +func (bs *BaseServer) IsSecure() bool { + return bs.secure +} diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index e9fdddf79abb..ce46a11b0d3a 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -113,7 +113,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { streamCtx := stream.Context() forwardedHost := grpcutil.GetForwardedHost(streamCtx) if !s.IsLocalRequest(forwardedHost) { - clientConn, err := s.GetDelegateClient(s.ctx, forwardedHost) + clientConn, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), forwardedHost) if err != nil { return errors.WithStack(err) } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 7c7db1be8047..01b1d03ef03e 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -16,11 +16,8 @@ package server import ( "context" - "crypto/tls" "fmt" - "net" "net/http" - "net/url" "os" "os/signal" "strconv" @@ -39,6 +36,7 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -50,7 +48,6 @@ import ( "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/versioninfo" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -62,35 +59,22 @@ var _ tso.ElectionMember = (*member.Participant)(nil) // Server is the TSO server, and it implements bs.Server. type Server struct { + *server.BaseServer diagnosticspb.DiagnosticsServer // Server state. 0 is not running, 1 is running. isRunning int64 - // Server start timestamp - startTimestamp int64 - ctx context.Context serverLoopCtx context.Context serverLoopCancel func() serverLoopWg sync.WaitGroup cfg *Config clusterID uint64 - listenURL *url.URL - // etcd client - etcdClient *clientv3.Client - // http client - httpClient *http.Client - - secure bool - muxListener net.Listener - grpcServer *grpc.Server - httpServer *http.Server service *Service keyspaceGroupManager *tso.KeyspaceGroupManager - // Store as map[string]*grpc.ClientConn - clientConns sync.Map + // tsoDispatcher is used to dispatch the TSO requests to // the corresponding forwarding TSO channels. tsoDispatcher *tsoutil.TSODispatcher @@ -98,10 +82,6 @@ type Server struct { // related data structures defined in the tso grpc protocol tsoProtoFactory *tsoutil.TSOProtoFactory - // Callback functions for different stages - // startCallbacks will be called after the server is started. - startCallbacks []func() - // for service registry serviceID *discovery.ServiceRegistryEntry serviceRegister *discovery.ServiceRegister @@ -109,16 +89,11 @@ type Server struct { // Implement the following methods defined in bs.Server -// Name returns the unique Name for this server in the TSO cluster. +// Name returns the unique name for this server in the TSO cluster. func (s *Server) Name() string { return s.cfg.Name } -// Context returns the context of server. -func (s *Server) Context() context.Context { - return s.ctx -} - // GetBasicServer returns the basic server. func (s *Server) GetBasicServer() bs.Server { return s @@ -134,11 +109,6 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } -// GetClientConns returns the client connections. -func (s *Server) GetClientConns() *sync.Map { - return &s.clientConns -} - // ServerLoopWgDone decreases the server loop wait group. func (s *Server) ServerLoopWgDone() { s.serverLoopWg.Done() @@ -149,46 +119,16 @@ func (s *Server) ServerLoopWgAdd(n int) { s.serverLoopWg.Add(n) } -// GetHTTPServer returns the http server. -func (s *Server) GetHTTPServer() *http.Server { - return s.httpServer -} - -// SetHTTPServer sets the http server. -func (s *Server) SetHTTPServer(httpServer *http.Server) { - s.httpServer = httpServer -} - // SetUpRestHandler sets up the REST handler. func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { return SetUpRestHandler(s.service) } -// GetGRPCServer returns the grpc server. -func (s *Server) GetGRPCServer() *grpc.Server { - return s.grpcServer -} - -// SetGRPCServer sets the grpc server. -func (s *Server) SetGRPCServer(grpcServer *grpc.Server) { - s.grpcServer = grpcServer -} - // RegisterGRPCService registers the grpc service. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { s.service.RegisterGRPCService(grpcServer) } -// SetETCDClient sets the etcd client. -func (s *Server) SetETCDClient(etcdClient *clientv3.Client) { - s.etcdClient = etcdClient -} - -// SetHTTPClient sets the http client. -func (s *Server) SetHTTPClient(httpClient *http.Client) { - s.httpClient = httpClient -} - // Run runs the TSO server. func (s *Server) Run() error { skipWaitAPIServiceReady := false @@ -200,7 +140,7 @@ func (s *Server) Run() error { return err } } - go systimemon.StartMonitor(s.ctx, time.Now, func() { + go systimemon.StartMonitor(s.Context(), time.Now, func() { log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime)) timeJumpBackCounter.Inc() }) @@ -224,37 +164,22 @@ func (s *Server) Close() { s.serviceRegister.Deregister() utils.StopHTTPServer(s) utils.StopGRPCServer(s) - s.muxListener.Close() + s.GetListener().Close() s.serverLoopCancel() s.serverLoopWg.Wait() - if s.etcdClient != nil { - if err := s.etcdClient.Close(); err != nil { + if s.GetClient() != nil { + if err := s.GetClient().Close(); err != nil { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } - if s.httpClient != nil { - s.httpClient.CloseIdleConnections() + if s.GetHTTPClient() != nil { + s.GetHTTPClient().CloseIdleConnections() } log.Info("tso server is closed") } -// GetClient returns builtin etcd client. -func (s *Server) GetClient() *clientv3.Client { - return s.etcdClient -} - -// GetHTTPClient returns builtin http client. -func (s *Server) GetHTTPClient() *http.Client { - return s.httpClient -} - -// AddStartCallback adds a callback in the startServer phase. -func (s *Server) AddStartCallback(callbacks ...func()) { - s.startCallbacks = append(s.startCallbacks, callbacks...) -} - // IsServing implements basicserver. It returns whether the server is the leader // if there is embedded etcd, or the primary otherwise. func (s *Server) IsServing() bool { @@ -328,11 +253,6 @@ func (s *Server) IsClosed() bool { return atomic.LoadInt64(&s.isRunning) == 0 } -// IsSecure checks if the server enable TLS. -func (s *Server) IsSecure() bool { - return s.secure -} - // GetKeyspaceGroupManager returns the manager of keyspace group. func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager { return s.keyspaceGroupManager @@ -352,24 +272,6 @@ func (s *Server) IsLocalRequest(forwardedHost string) bool { return forwardedHost == "" } -// GetDelegateClient returns grpc client connection talking to the forwarded host -func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { - client, ok := s.clientConns.Load(forwardedHost) - if !ok { - tlsConfig, err := s.GetTLSConfig().ToTLSConfig() - if err != nil { - return nil, err - } - cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) - if err != nil { - return nil, err - } - client = cc - s.clientConns.Store(forwardedHost, cc) - } - return client.(*grpc.ClientConn), nil -} - // ValidateInternalRequest checks if server is closed, which is used to validate // the gRPC communication between TSO servers internally. // TODO: Check if the sender is from the global TSO allocator @@ -437,7 +339,7 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { } func (s *Server) startServer() (err error) { - if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil { return err } log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) @@ -448,18 +350,13 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - s.listenURL, err = url.Parse(s.cfg.ListenAddr) - if err != nil { - return err - } - // Initialize the TSO service. - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, + s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr, discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err @@ -468,29 +365,19 @@ func (s *Server) startServer() (err error) { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.service = &Service{Server: s} - tlsConfig, err := s.cfg.Security.ToTLSConfig() - if err != nil { - return err - } - if tlsConfig != nil { - s.secure = true - s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) - } else { - s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) - } - if err != nil { + if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) - go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener) + go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) <-serverReadyChan // Run callbacks log.Info("triggering the start callback functions") - for _, cb := range s.startCallbacks { + for _, cb := range s.GetStartCallbacks() { cb() } @@ -499,7 +386,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10), utils.TSOServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) if err := s.serviceRegister.Register(); err != nil { log.Error("failed to register the service", zap.String("service-name", utils.TSOServiceName), errs.ZapError(err)) @@ -513,10 +400,9 @@ func (s *Server) startServer() (err error) { // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *Config) *Server { svr := &Server{ + BaseServer: server.NewBaseServer(ctx), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - startTimestamp: time.Now().Unix(), cfg: cfg, - ctx: ctx, } return svr } diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 5418abcb3147..2390f5be4f04 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -83,7 +83,7 @@ type server interface { Context() context.Context GetTLSConfig() *grpcutil.TLSConfig GetClientConns() *sync.Map - GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) + GetDelegateClient(ctx context.Context, tlsCfg *grpcutil.TLSConfig, forwardedHost string) (*grpc.ClientConn, error) ServerLoopWgDone() ServerLoopWgAdd(int) IsClosed() bool @@ -130,7 +130,7 @@ func isAPIServiceReady(s server) (bool, error) { if len(urls) == 0 { return false, errors.New("no backend endpoints") } - cc, err := s.GetDelegateClient(s.Context(), urls[0]) + cc, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), urls[0]) if err != nil { return false, err } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 6fb31004db0b..d2fb8f04b0e5 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -103,7 +103,7 @@ type ElectionMember interface { // server id of a cluster or the unique keyspace group replica id of the election // group comprised of the replicas of a keyspace group. ID() uint64 - // ID returns the unique Name in the election group. + // ID returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string