Skip to content

Commit

Permalink
server, allocator_manager: close grpc conn when close server (#7786)
Browse files Browse the repository at this point in the history
ref #7782

server: close grpc conn when close server

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Feb 6, 2024
1 parent f0699ba commit b2f40b6
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/grpcutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -167,3 +168,14 @@ func (bs *BaseServer) IsSecure() bool {
func (bs *BaseServer) StartTimestamp() int64 {
return bs.startTimestamp
}

// CloseClientConns closes all client connections.
func (bs *BaseServer) CloseClientConns() {
bs.clientConns.Range(func(key, value any) bool {
conn := value.(*grpc.ClientConn)
if err := conn.Close(); err != nil {
log.Error("close client connection meet error")
}
return true
})
}
1 change: 1 addition & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func (am *AllocatorManager) close() {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
}

for _, cc := range am.localAllocatorConn.clientConns {
if err := cc.Close(); err != nil {
log.Error("failed to close allocator manager grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
}
}

am.cancel()
am.svcLoopWG.Wait()

Expand Down
8 changes: 8 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ func (s *Server) Close() {
cb()
}

s.clientConns.Range(func(key, value any) bool {
conn := value.(*grpc.ClientConn)
if err := conn.Close(); err != nil {
log.Error("close grpc client meet error", errs.ZapError(err))
}
return true
})

log.Info("close server")
}

Expand Down
1 change: 1 addition & 0 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() {
re := suite.Require()
suite.cleanupFunc()
suite.cluster.Destroy()
suite.dialClient.CloseIdleConnections()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

Expand Down
8 changes: 4 additions & 4 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func TestMemberDelete(t *testing.T) {
{path: fmt.Sprintf("id/%d", members[1].GetServerID()), members: []*config.Config{leader.GetConfig()}},
}

httpClient := &http.Client{Timeout: 15 * time.Second}
httpClient := &http.Client{Timeout: 15 * time.Second, Transport: &http.Transport{DisableKeepAlives: true}}
defer httpClient.CloseIdleConnections()
for _, table := range tables {
t.Log(time.Now(), "try to delete:", table.path)
testutil.Eventually(re, func() bool {
Expand All @@ -103,7 +104,7 @@ func TestMemberDelete(t *testing.T) {
}
// Check by member list.
cluster.WaitLeader()
if err = checkMemberList(re, leader.GetConfig().ClientUrls, table.members); err != nil {
if err = checkMemberList(re, *httpClient, leader.GetConfig().ClientUrls, table.members); err != nil {
t.Logf("check member fail: %v", err)
time.Sleep(time.Second)
return false
Expand All @@ -120,8 +121,7 @@ func TestMemberDelete(t *testing.T) {
}
}

func checkMemberList(re *require.Assertions, clientURL string, configs []*config.Config) error {
httpClient := &http.Client{Timeout: 15 * time.Second}
func checkMemberList(re *require.Assertions, httpClient http.Client, clientURL string, configs []*config.Config) error {
addr := clientURL + "/pd/api/v1/members"
res, err := httpClient.Get(addr)
re.NoError(err)
Expand Down

0 comments on commit b2f40b6

Please sign in to comment.