Skip to content

Commit

Permalink
server: implement dynamic forward (tikv#8744)
Browse files Browse the repository at this point in the history
ref tikv#8477

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Oct 30, 2024
1 parent e257097 commit de92fc5
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 7 deletions.
36 changes: 31 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ const (
heartbeatTaskRunner = "heartbeat-async"
miscTaskRunner = "misc-async"
logTaskRunner = "log-async"

// TODO: make it configurable
IsTSODynamicSwitchingEnabled = false
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -409,11 +412,30 @@ func (c *RaftCluster) checkSchedulingService() {
// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
if IsTSODynamicSwitchingEnabled {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by TSO server")
if !c.IsServiceIndependent(constant.TSOServiceName) {
c.SetServiceIndependent(constant.TSOServiceName)
}
}
}
return
}

if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
Expand All @@ -428,6 +450,8 @@ func (c *RaftCluster) runServiceCheckJob() {
schedulingTicker.Reset(time.Millisecond)
})
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
Expand All @@ -436,11 +460,13 @@ func (c *RaftCluster) runServiceCheckJob() {
return
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
}
}
}

func (c *RaftCluster) startTSOJobs() error {
func (c *RaftCluster) startTSOJobsIfNeeded() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
Expand All @@ -456,7 +482,7 @@ func (c *RaftCluster) startTSOJobs() error {
return nil
}

func (c *RaftCluster) stopTSOJobs() error {
func (c *RaftCluster) stopTSOJobsIfNeeded() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
Expand Down Expand Up @@ -824,7 +850,7 @@ func (c *RaftCluster) Stop() {
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.stopSchedulingJobs()
}
if err := c.stopTSOJobs(); err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}
c.heartbeatRunner.Stop()
Expand Down
34 changes: 34 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,29 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return s.forwardTSO(stream)
}

tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1)
go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh)

var (
doneCh chan struct{}
errCh chan error
// The following are tso forward stream related variables.
forwardStream tsopb.TSO_TsoClient
cancelForward context.CancelFunc
forwardCtx context.Context
tsoStreamErr error
lastForwardedHost string
)

defer func() {
if cancelForward != nil {
cancelForward()
}
if grpcutil.NeedRebuildConnection(tsoStreamErr) {
s.closeDelegateClient(lastForwardedHost)
}
}()

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
for {
Expand Down Expand Up @@ -570,6 +589,21 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
continue
}

if s.IsServiceIndependent(constant.TSOServiceName) {
if request.GetCount() == 0 {
err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
return status.Error(codes.Unknown, err.Error())
}
forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, nil, request, tsDeadlineCh, lastForwardedHost, cancelForward)
if tsoStreamErr != nil {
return tsoStreamErr
}
if err != nil {
return err
}
continue
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
Expand Down
3 changes: 1 addition & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,8 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == APIServiceMode && !s.IsClosed() {
// TODO: remove it after we support tso discovery
if name == constant.TSOServiceName {
if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled {
return true
}
return s.cluster.IsServiceIndependent(name)
Expand Down

0 comments on commit de92fc5

Please sign in to comment.