Skip to content

Commit

Permalink
Register tso/resource_manager services with ServiceRegistryEntry json…
Browse files Browse the repository at this point in the history
… struct (#6171)

close #5836

Register tso/resource_manager services with ServiceRegistryEntry json struct

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
binshi-bing and ti-chi-bot committed Mar 16, 2023
1 parent bc12668 commit 266c021
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 3 deletions.
47 changes: 47 additions & 0 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,50 @@ func TestDiscover(t *testing.T) {
re.NoError(err)
re.Empty(endpoints)
}

func TestServiceRegistryEntry(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
re.NoError(err)

client, err := clientv3.NewFromURL(ep)
re.NoError(err)

<-etcd.Server.ReadyNotify()
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
returnedEntry1.Deserialize([]byte(endpoints[0]))
re.Equal("127.0.0.1:1", returnedEntry1.ServiceAddr)
returnedEntry2 := &ServiceRegistryEntry{}
returnedEntry2.Deserialize([]byte(endpoints[1]))
re.Equal("127.0.0.1:2", returnedEntry2.ServiceAddr)

sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "test_service")
re.NoError(err)
re.Empty(endpoints)
}
46 changes: 46 additions & 0 deletions pkg/mcs/discovery/registry_entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discovery

import (
"encoding/json"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
ServiceAddr string `json:"serviceAddr"`
}

// Serialize this service registry entry
func (e *ServiceRegistryEntry) Serialize() (serializedValue string, err error) {
data, err := json.Marshal(e)
if err != nil {
log.Error("json marshal the service registry entry failed", zap.Error(err))
return "", err
}
return string(data), nil
}

// Deserialize the data to this service registry entry
func (e *ServiceRegistryEntry) Deserialize(data []byte) error {
if err := json.Unmarshal(data, e); err != nil {
log.Error("json unmarshal the service registry entry failed", zap.Error(err))
return err
}
return nil
}
7 changes: 6 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,12 @@ func (s *Server) startServer() (err error) {

// Server has started.
atomic.StoreInt64(&s.isServing, 1)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "resource_manager", s.cfg.ListenAddr, s.cfg.ListenAddr, discovery.DefaultLeaseInSeconds)
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "resource_manager", s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister.Register()
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,12 @@ func (s *Server) startServer() (err error) {

// Server has started.
atomic.StoreInt64(&s.isServing, 1)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "tso", s.cfg.ListenAddr, s.cfg.ListenAddr, discovery.DefaultLeaseInSeconds)
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "tso", s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister.Register()
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion tests/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
// test API server discovery
endpoints, err := discovery.Discover(client, serviceName)
re.NoError(err)
re.Equal(addr, endpoints[0])
returnedEntry := &discovery.ServiceRegistryEntry{}
returnedEntry.Deserialize([]byte(endpoints[0]))
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
Expand Down

0 comments on commit 266c021

Please sign in to comment.