Skip to content

Commit

Permalink
mcs: init HTTP handler (tikv#6963)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Dec 1, 2023
1 parent 4c19ebb commit 05b0da7
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 422 deletions.
5 changes: 5 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// register microservice API
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/install"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/install"
_ "github.com/tikv/pd/pkg/mcs/tso/server/install"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Service struct {
}

// NewService creates a new resource manager service.
func NewService[T ResourceManagerConfigProvider](svr bs.Server) registry.RegistrableService {
func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService {
manager := NewManager[T](svr)

return &Service{
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package install

import (
"github.com/tikv/pd/pkg/mcs/registry"
rm_server "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/resourcemanager/server"

// init API group
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1"
Expand All @@ -28,5 +28,5 @@ func init() {

// Install registers the API group and grpc service.
func Install(register *registry.ServiceRegistry) {
register.RegisterService("ResourceManager", rm_server.NewService[*rm_server.Server])
register.RegisterService("ResourceManager", server.NewService[*server.Server])
}
8 changes: 4 additions & 4 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type Manager struct {
consumptionRecord map[string]time.Time
}

// ResourceManagerConfigProvider is used to get resource manager config from the given
// ConfigProvider is used to get resource manager config from the given
// `bs.server` without modifying its interface.
type ResourceManagerConfigProvider interface {
type ConfigProvider interface {
GetControllerConfig() *ControllerConfig
}

// NewManager returns a new manager base on the given server,
// which should implement the `ResourceManagerConfigProvider` interface.
func NewManager[T ResourceManagerConfigProvider](srv bs.Server) *Manager {
// which should implement the `ConfigProvider` interface.
func NewManager[T ConfigProvider](srv bs.Server) *Manager {
m := &Manager{
controllerConfig: srv.(T).GetControllerConfig(),
groups: make(map[string]*ResourceGroup),
Expand Down
207 changes: 105 additions & 102 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ import (
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -76,9 +75,15 @@ type Server struct {
etcdClient *clientv3.Client
httpClient *http.Client

secure bool
muxListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service

// Store as map[string]*grpc.ClientConn
clientConns sync.Map

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
Expand All @@ -105,16 +110,19 @@ func (s *Server) GetAddr() string {

// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
skipWaitAPIServiceReady := false
failpoint.Inject("skipWaitAPIServiceReady", func() {
skipWaitAPIServiceReady = true
})
if !skipWaitAPIServiceReady {
if err := utils.WaitAPIServiceReady(s); err != nil {
return err
}
}
if err = s.startServer(); err != nil {
if err := utils.InitClient(s); err != nil {
return err
}

s.startServerLoop()

return nil
return s.startServer()
}

func (s *Server) startServerLoop() {
Expand Down Expand Up @@ -211,6 +219,8 @@ func (s *Server) Close() {

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -258,103 +268,97 @@ func (s *Server) IsClosed() bool {
return s != nil && atomic.LoadInt64(&s.isRunning) == 0
}

// IsSecure checks if the server enable TLS.
func (s *Server) IsSecure() bool {
return s.secure
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ","))
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
// GetBackendEndpoints returns the backend endpoints.
func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetClientConns returns the client connections.
func (s *Server) GetClientConns() *sync.Map {
return &s.clientConns
}

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
err := gs.Serve(l)
log.Info("gRPC server stop serving")
// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
}

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout)
defer timer.Stop()
select {
case <-done:
case <-timer.C:
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout))
gs.Stop()
}
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
// ServerLoopWgAdd increases the server loop wait group.
func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetHTTPServer returns the http server.
func (s *Server) GetHTTPServer() *http.Server {
return s.httpServer
}

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
err := hs.Serve(l)
log.Info("http server stop serving")
// SetHTTPServer sets the http server.
func (s *Server) SetHTTPServer(httpServer *http.Server) {
s.httpServer = httpServer
}

ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
// GetGRPCServer returns the grpc server.
func (s *Server) GetGRPCServer() *grpc.Server {
return s.grpcServer
}

mux := cmux.New(l)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
// SetGRPCServer sets the grpc server.
func (s *Server) SetGRPCServer(grpcServer *grpc.Server) {
s.grpcServer = grpcServer
}

s.serverLoopWg.Add(2)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)
// RegisterGRPCService registers the grpc service.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
s.service.RegisterGRPCService(grpcServer)
}

if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
} else {
log.Fatal("mux stop serving unexpectedly", errs.ZapError(err))
// SetETCDClient sets the etcd client.
func (s *Server) SetETCDClient(etcdClient *clientv3.Client) {
s.etcdClient = etcdClient
}

// SetHTTPClient sets the http client.
func (s *Server) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
Expand All @@ -378,7 +382,10 @@ func (s *Server) startServer() (err error) {
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
s.service = &Service{
ctx: s.ctx,
manager: NewManager[*Server](s),
Expand All @@ -388,11 +395,8 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -401,8 +405,12 @@ func (s *Server) startServer() (err error) {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener)
<-serverReadyChan
s.startServerLoop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down Expand Up @@ -455,7 +463,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
return
} else if printVersion {
versioninfo.Print()
exit(0)
utils.Exit(0)
}

// New zap logger
Expand Down Expand Up @@ -500,13 +508,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
utils.Exit(0)
default:
exit(1)
utils.Exit(1)
}
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
Loading

0 comments on commit 05b0da7

Please sign in to comment.