Skip to content

Commit

Permalink
*: remove api mode concept (tikv#8952)
Browse files Browse the repository at this point in the history
ref tikv#8477

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Jan 7, 2025
1 parent 5c4ab57 commit 973234d
Show file tree
Hide file tree
Showing 61 changed files with 455 additions and 469 deletions.
26 changes: 13 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ func newClientWithKeyspaceName(
c := &client{
callerComponent: adjustCallerComponent(callerComponent),
inner: &innerClient{
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
// Create a service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the service discovery for the following interactions.
keyspaceID: constants.NullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
Expand All @@ -384,7 +384,7 @@ func newClientWithKeyspaceName(
}
c.inner.keyspaceID = keyspaceMeta.GetId()
// c.keyspaceID is the source of truth for keyspace id.
c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID)
c.inner.serviceDiscovery.SetKeyspaceID(c.inner.keyspaceID)
return nil
}

Expand Down Expand Up @@ -412,17 +412,17 @@ func (c *client) ResetTSOClient() {

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(context.Context) uint64 {
return c.inner.pdSvcDiscovery.GetClusterID()
return c.inner.serviceDiscovery.GetClusterID()
}

// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.inner.pdSvcDiscovery.GetServingURL()
return c.inner.serviceDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.pdSvcDiscovery
return c.inner.serviceDiscovery
}

// UpdateOption updates the client option.
Expand All @@ -438,7 +438,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
return errors.New("[pd] tso follower proxy is only supported in PD mode")
}
enable, ok := value.(bool)
if !ok {
Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.inner.pdSvcDiscovery.GetServiceClient()
serviceClient := c.inner.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -598,7 +598,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url)
conn, err := c.inner.serviceDiscovery.GetOrCreateGRPCConn(url)
if err != nil {
log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err))
continue
Expand All @@ -619,7 +619,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

if resp == nil {
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.inner.pdSvcDiscovery.GetClusterID(),
ClusterId: c.inner.serviceDiscovery.GetClusterID(),
CallerId: string(caller.GetCallerID()),
CallerComponent: string(c.callerComponent),
}
Expand Down Expand Up @@ -1334,7 +1334,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand Down
2 changes: 1 addition & 1 deletion client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (c *Cli) DispatchRequest(request *Request) (bool, error) {
// Client is closed, no need to retry.
return false, request.clientCtx.Err()
case <-c.ctx.Done():
// tsoClient is closed due to the PD service mode switch, which is retryable.
// tsoClient is closed due to the service mode switch, which is retryable.
return true, c.ctx.Err()
default:
// This failpoint will increase the possibility that the request is sent to a closed dispatcher.
Expand Down
2 changes: 1 addition & 1 deletion client/clients/tso/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *mockTSOServiceProvider) getOption() *opt.Option {
}

func (*mockTSOServiceProvider) getServiceDiscovery() sd.ServiceDiscovery {
return sd.NewMockPDServiceDiscovery([]string{mockStreamURL}, nil)
return sd.NewMockServiceDiscovery([]string{mockStreamURL}, nil)
}

func (m *mockTSOServiceProvider) getConnectionCtxMgr() *cctx.Manager[*tsoStream] {
Expand Down
10 changes: 5 additions & 5 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// PD service will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
Expand Down Expand Up @@ -304,7 +304,7 @@ func WithMetrics(
}
}

// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
// NewClientWithServiceDiscovery creates a PD HTTP client with the given service discovery.
func NewClientWithServiceDiscovery(
source string,
sd sd.ServiceDiscovery,
Expand Down Expand Up @@ -332,7 +332,7 @@ func NewClient(
for _, opt := range opts {
opt(c)
}
sd := sd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := sd.NewDefaultServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down Expand Up @@ -420,7 +420,7 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
}
}

// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock PD service discovery.
// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock service discovery.
func newClientWithMockServiceDiscovery(
source string,
pdAddrs []string,
Expand All @@ -432,7 +432,7 @@ func newClientWithMockServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := sd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
sd := sd.NewMockServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down
36 changes: 18 additions & 18 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ const (
)

type innerClient struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher
keyspaceID uint32
svrUrls []string
serviceDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
serviceModeKeeper
Expand All @@ -45,13 +45,13 @@ type innerClient struct {
}

func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
c.pdSvcDiscovery = sd.NewPDServiceDiscovery(
c.serviceDiscovery = sd.NewServiceDiscovery(
c.ctx, c.cancel, &c.wg, c.setServiceMode,
updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
c.pdSvcDiscovery.Close()
if c.serviceDiscovery != nil {
c.serviceDiscovery.Close()
}
return err
}
Expand Down Expand Up @@ -92,10 +92,10 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = tso.NewClient(c.ctx, c.option,
c.pdSvcDiscovery, &tso.PDStreamBuilderFactory{})
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
c.ctx, c, c.pdSvcDiscovery,
c.ctx, c, c.serviceDiscovery,
c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
Expand All @@ -119,12 +119,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD mode and
// no tso microservice discovery is needed.
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
// We are switching from PD service mode to PD mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *innerClient) close() {
c.wg.Wait()

c.serviceModeKeeper.close()
c.pdSvcDiscovery.Close()
c.serviceDiscovery.Close()

if c.tokenDispatcher != nil {
tokenErr := errors.WithStack(errs.ErrClosing)
Expand All @@ -169,12 +169,12 @@ func (c *innerClient) setup() error {
}

// Init the client base.
if err := c.pdSvcDiscovery.Init(); err != nil {
if err := c.serviceDiscovery.Init(); err != nil {
return err
}

// Register callbacks
c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)
c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand All @@ -186,12 +186,12 @@ func (c *innerClient) setup() error {
func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (sd.ServiceClient, context.Context) {
var serviceClient sd.ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.GetServiceClientByKind(sd.UniversalAPIKind)
serviceClient = c.serviceDiscovery.GetServiceClientByKind(sd.UniversalAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
serviceClient = c.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand All @@ -201,12 +201,12 @@ func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFol
// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *innerClient) gRPCErrorHandler(err error) {
if errs.IsLeaderChange(err) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
}
}

func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.GetServingURL())
cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL())
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type KeyspaceClient interface {

// keyspaceClient returns the KeyspaceClient from current PD leader.
func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {
if client := c.inner.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
if client := c.inner.serviceDiscovery.GetServingEndpointClientConn(); client != nil {
return keyspacepb.NewKeyspaceClient(client)
}
return nil
Expand Down Expand Up @@ -70,7 +70,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key

if err != nil {
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

if err != nil {
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint

if err != nil {
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

// metaStorageClient gets the meta storage client from current PD leader.
func (c *innerClient) metaStorageClient() meta_storagepb.MetaStorageClient {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
if client := c.serviceDiscovery.GetServingEndpointClientConn(); client != nil {
return meta_storagepb.NewMetaStorageClient(client)
}
return nil
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me
Lease: options.Lease,
PrevKv: options.PrevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL())
ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora
Limit: options.Limit,
Revision: options.Revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL())
ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *innerClient) respForMetaStorageErr(observer prometheus.Observer, start
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand Down
4 changes: 2 additions & 2 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (c *innerClient) handleResourceTokenDispatcher(dispatcherCtx context.Contex
// If the stream is still nil, return an error.
if stream == nil {
firstRequest.done <- errors.Errorf("failed to get the stream connection")
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
connection.reset()
continue
}
Expand All @@ -343,7 +343,7 @@ func (c *innerClient) handleResourceTokenDispatcher(dispatcherCtx context.Contex
default:
}
if err = c.processTokenRequests(stream, firstRequest); err != nil {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
connection.reset()
log.Info("[resource_manager] token request error", zap.Error(err))
}
Expand Down
Loading

0 comments on commit 973234d

Please sign in to comment.