Skip to content

Commit

Permalink
config: refactor TSO config (#5902)
Browse files Browse the repository at this point in the history
ref #5839, ref #5901

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot authored Feb 6, 2023
1 parent 393b287 commit 2a7c8d4
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 62 deletions.
7 changes: 3 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -250,12 +249,12 @@ func (m *Member) isSameLeader(leader *pdpb.Member) bool {
}

// MemberInfo initializes the member info.
func (m *Member) MemberInfo(cfg *config.Config, name string, rootPath string) {
func (m *Member) MemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
ClientUrls: strings.Split(cfg.AdvertiseClientUrls, ","),
PeerUrls: strings.Split(cfg.AdvertisePeerUrls, ","),
ClientUrls: strings.Split(advertiseClientUrls, ","),
PeerUrls: strings.Split(advertisePeerUrls, ","),
}

data, err := leader.Marshal()
Expand Down
15 changes: 8 additions & 7 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
leaderTickInterval = 50 * time.Millisecond
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
// The value should be the same as the variable defined in server's config.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
)

var (
Expand Down Expand Up @@ -134,17 +132,20 @@ type AllocatorManager struct {
func NewAllocatorManager(
m *member.Member,
rootPath string,
cfg config,
enableLocalTSO bool,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
enableLocalTSO: cfg.IsLocalTSOEnabled(),
enableLocalTSO: enableLocalTSO,
member: m,
rootPath: rootPath,
saveInterval: cfg.GetTSOSaveInterval(),
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
securityConfig: tlsConfig,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
44 changes: 38 additions & 6 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,46 @@
package tso

import (
"flag"
"time"

"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

type config interface {
IsLocalTSOEnabled() bool
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
const (
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
)

// Config is the configuration for the TSO.
type Config struct {
flagSet *flag.FlagSet

configFile string
// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

// The interval to update physical part of timestamp. Usually, this config should not be set.
// At most 1<<18 (262144) TSOs can be generated in the interval. The smaller the value, the
// more TSOs provided, and at the same time consuming more CPU time.
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg.flagSet = flag.NewFlagSet("pd", flag.ContinueOnError)
fs := cfg.flagSet

fs.StringVar(&cfg.configFile, "config", "", "config file")

return cfg
}
86 changes: 43 additions & 43 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,49 +457,6 @@ func (c *Config) Validate() error {
return nil
}

// Utility to test if a configuration is defined.
type configMetaData struct {
meta *toml.MetaData
path []string
}

func newConfigMetadata(meta *toml.MetaData) *configMetaData {
return &configMetaData{meta: meta}
}

func (m *configMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

func (m *configMetaData) Child(path ...string) *configMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &configMetaData{
meta: m.meta,
path: newPath,
}
}

func (m *configMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := newConfigMetadata(meta)
Expand Down Expand Up @@ -613,6 +570,49 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
return nil
}

// Utility to test if a configuration is defined.
type configMetaData struct {
meta *toml.MetaData
path []string
}

func newConfigMetadata(meta *toml.MetaData) *configMetaData {
return &configMetaData{meta: meta}
}

func (m *configMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

func (m *configMetaData) Child(path ...string) *configMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &configMetaData{
meta: m.meta,
path: newPath,
}
}

func (m *configMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}

func (c *Config) adjustLog(meta *configMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (s *Server) startServer(ctx context.Context) error {
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

s.rootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)
s.member.MemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath)
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
Expand All @@ -374,7 +374,7 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
})
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg,
s.member, s.rootPath, s.cfg.IsLocalTSOEnabled(), s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetTLSConfig(),
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })
// Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully.
s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())
Expand Down

0 comments on commit 2a7c8d4

Please sign in to comment.