Skip to content

Commit

Permalink
recheck
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 18, 2024
1 parent da58fbe commit a383beb
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 34 deletions.
6 changes: 3 additions & 3 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestExitWatch(t *testing.T) {
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
cfg1 := server.Config()
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls)
client2, _, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls)
re.NoError(err)
// close the original leader
server.Server.HardStop()
Expand Down Expand Up @@ -189,7 +189,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe
re := require.New(t)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
client2, _, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)
defer client2.Close()

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestRequestProgress(t *testing.T) {
defer os.RemoveAll(fname)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
client2, _, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)
defer client2.Close()

Expand Down
9 changes: 8 additions & 1 deletion pkg/mcs/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
Expand All @@ -33,7 +34,8 @@ import (
type BaseServer struct {
ctx context.Context
// etcd client
etcdClient *clientv3.Client
etcdClient *clientv3.Client
etcdHealthyChecker *etcdutil.HealthyChecker
// http client
httpClient *http.Client
grpcServer *grpc.Server
Expand Down Expand Up @@ -99,6 +101,11 @@ func (bs *BaseServer) SetETCDClient(etcdClient *clientv3.Client) {
bs.etcdClient = etcdClient
}

// SetETCDHealthyChecker sets the etcd healthy checker.
func (bs *BaseServer) SetETCDHealthyChecker(checker *etcdutil.HealthyChecker) {
bs.etcdHealthyChecker = checker
}

// SetHTTPClient sets the http client.
func (bs *BaseServer) SetHTTPClient(httpClient *http.Client) {
bs.httpClient = httpClient
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type server interface {
SetGRPCServer(*grpc.Server)
SetHTTPServer(*http.Server)
SetETCDClient(*clientv3.Client)
SetETCDHealthyChecker(*etcdutil.HealthyChecker)
SetHTTPClient(*http.Client)
IsSecure() bool
RegisterGRPCService(*grpc.Server)
Expand Down Expand Up @@ -177,12 +178,13 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
etcdClient, checker, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
if err != nil {
return err
}
s.SetETCDClient(etcdClient)
s.SetHTTPClient(etcdutil.CreateHTTPClient(tlsConfig))
s.SetETCDHealthyChecker(checker)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in
})

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
m.leadership.ResetCampaignTimes()
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
m.leadership.ResetCampaignTimes()
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

Expand Down
67 changes: 51 additions & 16 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,26 +254,26 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er
}

// CreateEtcdClient creates etcd v3 client with detecting endpoints.
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, *HealthyChecker, error) {
urls := make([]string, 0, len(acURLs))
for _, u := range acURLs {
urls = append(urls, u.String())
}
client, err := newClient(tlsConfig, urls...)
if err != nil {
return nil, err
return nil, nil, err
}

tickerInterval := defaultDialKeepAliveTime
failpoint.Inject("fastTick", func() {
tickerInterval = 100 * time.Millisecond
})
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
failpoint.Return(client, nil, err)
})
initHealthyChecker(tickerInterval, tlsConfig, client)
checker := initHealthyChecker(tickerInterval, tlsConfig, client)

return client, err
return client, checker, err
}

// healthyClient will wrap a etcd client and record its last health time.
Expand All @@ -285,21 +285,23 @@ type healthyClient struct {
lastHealth time.Time
}

// healthyChecker is used to check the health of etcd endpoints. Inside the checker,
// HealthyChecker is used to check the health of etcd endpoints. Inside the checker,
// we will maintain a map from each available etcd endpoint to its healthyClient.
type healthyChecker struct {
type HealthyChecker struct {
tickerInterval time.Duration
tlsConfig *tls.Config

isUnHealthy sync.Map // map[string]bool

sync.Map // map[string]*healthyClient
// client is the etcd client the healthy checker is guarding, it will be set with
// the checked healthy endpoints dynamically and periodically.
client *clientv3.Client
}

// initHealthyChecker initializes the healthy checker for etcd client.
func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) {
healthyChecker := &healthyChecker{
func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) *HealthyChecker {
healthyChecker := &HealthyChecker{
tickerInterval: tickerInterval,
tlsConfig: tlsConfig,
client: client,
Expand All @@ -310,9 +312,11 @@ func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, cli
go healthyChecker.syncer(ctx)
// Inspect the health of each endpoint by reading the health key periodically.
go healthyChecker.inspector(ctx)

return healthyChecker
}

func (checker *healthyChecker) syncer(ctx context.Context) {
func (checker *HealthyChecker) syncer(ctx context.Context) {
defer logutil.LogPanic()
checker.update()
ticker := time.NewTicker(checker.tickerInterval)
Expand All @@ -328,7 +332,7 @@ func (checker *healthyChecker) syncer(ctx context.Context) {
}
}

func (checker *healthyChecker) inspector(ctx context.Context) {
func (checker *HealthyChecker) inspector(ctx context.Context) {
defer logutil.LogPanic()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -368,7 +372,7 @@ func (checker *healthyChecker) inspector(ctx context.Context) {
}
}

func (checker *healthyChecker) close() {
func (checker *HealthyChecker) close() {
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
Expand All @@ -382,7 +386,20 @@ func resetClientEndpoints(client *clientv3.Client, endpoints ...string) {
client.SetEndpoints(endpoints...)
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
func (checker *HealthyChecker) SetClientStatusIsUnHealthy(ep string, status bool) {
log.Info("set etcd client healthy status", zap.String("endpoint", ep), zap.Bool("status", status))
if status {
checker.isUnHealthy.Store(ep, true)
if _, ok := checker.Load(ep); ok {
checker.removeClient(ep)
}
} else {
checker.isUnHealthy.Delete(ep)
checker.addClient(ep, time.Now())
}
}

func (checker *HealthyChecker) patrol(ctx context.Context) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145
count := 0
checker.Range(func(key, value interface{}) bool {
Expand Down Expand Up @@ -420,7 +437,7 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
return healthyList
}

func (checker *healthyChecker) update() {
func (checker *HealthyChecker) update() {
eps := syncUrls(checker.client)
epMap := make(map[string]struct{}, len(eps))
for _, ep := range eps {
Expand All @@ -440,6 +457,13 @@ func (checker *healthyChecker) update() {
}
continue
}

if status, ok := checker.isUnHealthy.Load(ep); ok && status.(bool) {
log.Info("some etcd server is unhealthy", zap.String("endpoint", ep))
// set unhealthy etcd client to healthy
checker.isUnHealthy.Delete(ep)
continue
}
checker.addClient(ep, time.Now())
}

Expand All @@ -454,7 +478,7 @@ func (checker *healthyChecker) update() {
})
}

func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
func (checker *HealthyChecker) addClient(ep string, lastHealth time.Time) {
client, err := newClient(checker.tlsConfig, ep)
if err != nil {
log.Error("failed to create etcd healthy client", zap.Error(err))
Expand All @@ -466,7 +490,7 @@ func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
})
}

func (checker *healthyChecker) removeClient(ep string) {
func (checker *HealthyChecker) removeClient(ep string) {
if client, ok := checker.LoadAndDelete(ep); ok {
err := client.(*healthyClient).Close()
if err != nil {
Expand All @@ -493,6 +517,17 @@ func syncUrls(client *clientv3.Client) []string {
return eps
}

// GetAllClients returns all etcd clients in healthy checker.
// just fot test
func (checker *HealthyChecker) GetAllClients() []*healthyClient {
clients := make([]*healthyClient, 0)
checker.Range(func(key, value interface{}) bool {
clients = append(clients, value.(*healthyClient))
return true
})
return clients
}

// CreateHTTPClient creates a http client with the given tls config.
func CreateHTTPClient(tlsConfig *tls.Config) *http.Client {
// FIXME: Currently, there is no timeout set for certain requests, such as GetRegions,
Expand Down
82 changes: 75 additions & 7 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ func TestEtcdScaleInAndOut(t *testing.T) {
etcd1, cfg1 := servers[0], servers[0].Config()

// Create two etcd clients with etcd1 as endpoint.
client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client
client1, _, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client
re.NoError(err)
defer client1.Close()
client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client
client2, _, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client
re.NoError(err)
defer client2.Close()

Expand Down Expand Up @@ -287,7 +287,7 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls)
client1, _, err := CreateEtcdClient(nil, urls)
re.NoError(err)
defer client1.Close()

Expand Down Expand Up @@ -359,6 +359,74 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error
return err
}

func TestEtcdWithHangLeaderWithRemoveClient(t *testing.T) {
re := require.New(t)
count := 3
servers := make([]*embed.Etcd, 0, count)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))

cfg := NewTestSingleConfig()
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
etcdClient, checker, err := CreateEtcdClient(nil, cfg.LCUrls)
re.NoError(err)
<-etcd.Server.ReadyNotify()
servers = append(servers, etcd)

for i := 1; i < count; i++ {
// Check the client can get the new member.
listResp, err := ListEtcdMembers(etcdClient)
re.NoError(err)
re.Len(listResp.Members, i)
// Add a new member.
etcd2 := MustAddEtcdMember(t, cfg, etcdClient)
cfg2 := etcd2.Config()
cfg = &cfg2
<-etcd2.Server.ReadyNotify()
servers = append(servers, etcd2)
}
checkMembers(re, etcdClient, servers)

time.Sleep(1 * time.Second)
clis := checker.GetAllClients()
re.Len(clis, count)

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))

// remove client endpoint
endpoint := clis[0].Endpoints()[0]
checker.SetClientStatusIsUnHealthy(endpoint, true)
clis = checker.GetAllClients()
re.Len(clis, count-1)

// add client endpoint
checker.SetClientStatusIsUnHealthy(endpoint, false)
clis = checker.GetAllClients()
re.Len(clis, count)

// remove client endpoint
checker.SetClientStatusIsUnHealthy(clis[1].Endpoints()[0], true)
clis = checker.GetAllClients()
re.Len(clis, count-1)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// wait for update
testutil.Eventually(re, func() bool {
return len(checker.GetAllClients()) == 3
})

etcdClient.Close()
for _, server := range servers {
if server != nil {
server.Close()
}
}

re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

type loopWatcherTestSuite struct {
suite.Suite
ctx context.Context
Expand All @@ -383,7 +451,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
suite.config = NewTestSingleConfig()
suite.config.Dir = suite.T().TempDir()
suite.startEtcd(re)
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, _, err = CreateEtcdClient(nil, suite.config.LCUrls)
re.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -642,23 +710,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, _, err = CreateEtcdClient(nil, suite.config.LCUrls)
re.NoError(err)
watcher.updateClientCh <- suite.client
suite.put(re, "TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, _, err = CreateEtcdClient(nil, suite.config.LCUrls)
re.NoError(err)
suite.put(re, "TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, _, err = CreateEtcdClient(nil, suite.config.LCUrls)
re.NoError(err)
suite.put(re, "TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
Loading

0 comments on commit a383beb

Please sign in to comment.