Skip to content

Commit

Permalink
mcs: tso service should not forward again (tikv#7348)
Browse files Browse the repository at this point in the history
ref tikv#5836

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Dec 1, 2023
1 parent 5c3e40a commit a3f40a3
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 37 deletions.
34 changes: 0 additions & 34 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -88,21 +86,9 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler

// Tso returns a stream of timestamps
func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
var (
doneCh chan struct{}
errCh chan error
)
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
for {
// Prevent unnecessary performance overhead of the channel.
if errCh != nil {
select {
case err := <-errCh:
return errors.WithStack(err)
default:
}
}
request, err := stream.Recv()
if err == io.EOF {
return nil
Expand All @@ -111,26 +97,6 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return errors.WithStack(err)
}

streamCtx := stream.Context()
forwardedHost := grpcutil.GetForwardedHost(streamCtx)
if !s.IsLocalRequest(forwardedHost) {
clientConn, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), forwardedHost)
if err != nil {
return errors.WithStack(err)
}

if errCh == nil {
doneCh = make(chan struct{})
defer close(doneCh)
errCh = make(chan error)
}

tsoProtoFactory := s.tsoProtoFactory
tsoRequest := tsoutil.NewTSOProtoRequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
continue
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
Expand Down
3 changes: 0 additions & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ type Server struct {
service *Service
keyspaceGroupManager *tso.KeyspaceGroupManager

// tsoDispatcher is used to dispatch the TSO requests to
// the corresponding forwarding TSO channels.
tsoDispatcher *tsoutil.TSODispatcher
// tsoProtoFactory is the abstract factory for creating tso
// related data structures defined in the tso grpc protocol
tsoProtoFactory *tsoutil.TSOProtoFactory
Expand Down

0 comments on commit a3f40a3

Please sign in to comment.