Skip to content

Commit

Permalink
Add skeleton to initialize tso microservice
Browse files Browse the repository at this point in the history
Issue Number: ref tikv#5836

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Feb 7, 2023
1 parent 2a7c8d4 commit a0ce7ee
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 89 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ifeq ("$(WITH_RACE)", "1")
BUILD_CGO_ENABLED := 1
endif

LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDReleaseVersion=v6.5.0"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDBuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDGitHash=$(shell git rev-parse HEAD)"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDGitBranch=$(shell git rev-parse --abbrev-ref HEAD)"
Expand Down
78 changes: 50 additions & 28 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"os"
"os/signal"
"strings"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/tikv/pd/pkg/autoscaling"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
basicsvr "github.com/tikv/pd/pkg/basic_server"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand All @@ -46,8 +49,53 @@ import (
)

func main() {
var ctx context.Context
var cancel context.CancelFunc
var svr basicsvr.Server

if len(os.Args) < 2 {
ctx, cancel, svr = createServerWrapper(os.Args[1:])
} else {
switch strings.ToLower(os.Args[1]) {
case "service-mode-tso":
ctx, cancel, svr = tsoserver.CreateServerWrapper(os.Args[2:])
default:
ctx, cancel, svr = createServerWrapper(os.Args[2:])
}
}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func createServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := config.NewConfig()
err := cfg.Parse(os.Args[1:])
err := cfg.Parse(args)

if cfg.Version {
server.PrintPDInfo()
Expand Down Expand Up @@ -104,33 +152,7 @@ func main() {
log.Fatal("create server failed", errs.ZapError(err))
}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
return ctx, cancel, svr
}

func exit(code int) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
Expand Down
41 changes: 41 additions & 0 deletions pkg/basic_server/basic_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package basicsvr

import (
"context"
"net/http"

"go.etcd.io/etcd/clientv3"
)

type Server interface {
// Name returns the unique etcd Name for this server in etcd cluster.
Name() string
// Context returns the context of server.
Context() context.Context

// Run runs the pd server.
Run() error
// Close closes the server.
Close()
// IsClosed checks whether server is closed or not.
IsClosed() bool

// GetClient returns builtin etcd client.
GetClient() *clientv3.Client
// GetHTTPClient returns builtin etcd client.
GetHTTPClient() *http.Client
}
26 changes: 25 additions & 1 deletion pkg/tso/config.go → pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package tso
package server

import (
"flag"
"time"

"github.com/pkg/errors"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

Expand All @@ -30,7 +32,11 @@ const (
type Config struct {
flagSet *flag.FlagSet

Version bool `json:"-"`

ConfigCheck bool `json:"-"`
configFile string

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
Expand All @@ -46,6 +52,11 @@ type Config struct {
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`

// MaxResetTSGap is the max gap to reset the TSO.
MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`
}

// NewConfig creates a new config.
Expand All @@ -58,3 +69,16 @@ func NewConfig() *Config {

return cfg
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(arguments []string) error {
// Parse first to get config file.
err := c.flagSet.Parse(arguments)
if err != nil {
return errors.WithStack(err)
}

// TODO: Implement the main function body

return nil
}
128 changes: 128 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"flag"
"net/http"
"os"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
)

// if server doesn't implement all methods of basicsvr.Server, this line will result in
// clear error message "*MyTServerype does not implement basicsvr.Server (missing Method method)"
var _ basicsvr.Server = (*Server)(nil)

// Server is the TSO server. It implements basicsvr.Server
// nolint
type Server struct {
ctx context.Context
}

// TODO: Implement the following methos

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return ""
}

// Context returns the context of server.
func (s *Server) Context() context.Context {
return s.ctx
}

// Run runs the pd server.
func (s *Server) Run() error {
return nil
}

// Close closes the server.
func (s *Server) Close() {
}

// IsClosed checks whether server is closed or not.
func (s *Server) IsClosed() bool {
return true
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return nil
}

// GetHTTPClient returns builtin etcd client.
func (s *Server) GetHTTPClient() *http.Client {
return nil
}

func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])

if cfg.Version {
printVersionInfo()
exit(0)
}

defer logutil.LogPanic()

switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
exit(0)
default:
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
}

// TODO: Initialize logger

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// Joins the cluster
// ...

// Creates the server
// ...

return nil, nil, nil
}

func printVersionInfo() {
}

func printConfigCheckMsg(cfg *Config) {
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
5 changes: 1 addition & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,7 @@ func CalSuffixBits(maxSuffix int32) int {
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {
am.mu.Lock()
defer am.mu.Unlock()
if am.updatePhysicalInterval != defaultTSOUpdatePhysicalInterval {
log.Warn("tso update physical interval is non-default",
zap.Duration("update-physical-interval", am.updatePhysicalInterval))
}

if _, exist := am.mu.allocatorGroups[dcLocation]; exist {
return
}
Expand Down
Loading

0 comments on commit a0ce7ee

Please sign in to comment.