Skip to content

Commit

Permalink
*: unify the key prefix (#6248)
Browse files Browse the repository at this point in the history
ref #5836

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Mar 30, 2023
1 parent 1948c24 commit 3a61876
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, serviceName string) ([]string, error) {
key := discoveryPath(serviceName) + "/"
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key) + "/"

withRange := clientv3.WithRange(endKey)
Expand Down
16 changes: 8 additions & 8 deletions pkg/mcs/discovery/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func TestDiscover(t *testing.T) {
re.NoError(err)

<-etcd.Server.ReadyNotify()
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1)
err = sr1.Register()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "test_service")
endpoints, err := Discover(client, "12345", "test_service")
re.NoError(err)
re.Len(endpoints, 2)
re.Equal("127.0.0.1:1", endpoints[0])
Expand All @@ -57,7 +57,7 @@ func TestDiscover(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "test_service")
endpoints, err = Discover(client, "12345", "test_service")
re.NoError(err)
re.Empty(endpoints)
}
Expand All @@ -81,17 +81,17 @@ func TestServiceRegistryEntry(t *testing.T) {
entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"}
s1, err := entry1.Serialize()
re.NoError(err)
sr1 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", s1, 1)
sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", s1, 1)
err = sr1.Register()
re.NoError(err)
entry2 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:2"}
s2, err := entry2.Serialize()
re.NoError(err)
sr2 := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", s2, 1)
sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", s2, 1)
err = sr2.Register()
re.NoError(err)

endpoints, err := Discover(client, "test_service")
endpoints, err := Discover(client, "12345", "test_service")
re.NoError(err)
re.Len(endpoints, 2)
returnedEntry1 := &ServiceRegistryEntry{}
Expand All @@ -104,7 +104,7 @@ func TestServiceRegistryEntry(t *testing.T) {
sr1.cancel()
sr2.cancel()
time.Sleep(3 * time.Second)
endpoints, err = Discover(client, "test_service")
endpoints, err = Discover(client, "12345", "test_service")
re.NoError(err)
re.Empty(endpoints)
}
12 changes: 6 additions & 6 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@

package discovery

import "path"
import "strings"

const (
registryPrefix = "/pd/microservice"
registryPrefix = "/ms"
registryKey = "registry"
)

func registryPath(serviceName, serviceAddr string) string {
return path.Join(registryPrefix, serviceName, registryKey, serviceAddr)
func registryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

func discoveryPath(serviceName string) string {
return path.Join(registryPrefix, serviceName, registryKey)
func discoveryPath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}
4 changes: 2 additions & 2 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type ServiceRegister struct {
}

// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := registryPath(serviceName, serviceAddr)
serviceKey := registryPath(clusterID, serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ func TestRegister(t *testing.T) {
re.NoError(err)

<-etcd.Server.ReadyNotify()
sr := NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:1", "127.0.0.1:1", 10)
// with http prefix
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
re.NoError(err)
re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key)
resp, err := client.Get(context.Background(), sr.key)
re.NoError(err)
re.Equal("127.0.0.1:1", string(resp.Kvs[0].Value))
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

sr = NewServiceRegister(context.Background(), client, "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -399,12 +400,13 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, utils.ResourceManagerServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to regiser the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err))
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,15 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.ListenAddr}
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, mcsutils.TSOServiceName, s.cfg.ListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
mcsutils.TSOServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to regiser the service", zap.String("service-name", mcsutils.TSOServiceName), errs.ZapError(err))
log.Error("failed to register the service", zap.String("service-name", mcsutils.TSOServiceName), errs.ZapError(err))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
microserviceKey = "microservice"
microserviceKey = "ms"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

Expand Down
11 changes: 7 additions & 4 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package register_test

import (
"context"
"strconv"
"testing"

"github.com/stretchr/testify/suite"
Expand All @@ -39,6 +40,7 @@ type serverRegisterTestSuite struct {
cancel context.CancelFunc
cluster *tests.TestCluster
pdLeader *tests.TestServer
clusterID string
backendEndpoints string
}

Expand All @@ -59,6 +61,7 @@ func (suite *serverRegisterTestSuite) SetupSuite() {

leaderName := suite.cluster.WaitLeader()
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.clusterID = strconv.FormatUint(suite.pdLeader.GetClusterID(), 10)
suite.backendEndpoints = suite.pdLeader.GetAddr()
}

Expand All @@ -84,9 +87,9 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {

addr := s.GetAddr()
client := suite.pdLeader.GetEtcdClient()

// test API server discovery
endpoints, err := discovery.Discover(client, serviceName)

endpoints, err := discovery.Discover(client, suite.clusterID, serviceName)
re.NoError(err)
returnedEntry := &discovery.ServiceRegistryEntry{}
returnedEntry.Deserialize([]byte(endpoints[0]))
Expand All @@ -99,7 +102,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {

// test API server discovery after unregister
cleanup()
endpoints, err = discovery.Discover(client, serviceName)
endpoints, err = discovery.Discover(client, suite.clusterID, serviceName)
re.NoError(err)
re.Empty(endpoints)
}
Expand Down Expand Up @@ -134,7 +137,7 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
expectedPrimary = mcs.WaitForPrimaryServing(suite.Require(), serverMap)
// test API server discovery
client := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(client, serviceName)
endpoints, err := discovery.Discover(client, suite.clusterID, serviceName)
re.NoError(err)
re.Len(endpoints, serverNum-1)

Expand Down
6 changes: 4 additions & 2 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -216,10 +217,11 @@ func (suite *APIServerForwardTestSuite) TearDownSuite() {
suite.pdClient.Close()

etcdClient := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(etcdClient, utils.TSOServiceName)
clusterID := strconv.FormatUint(suite.pdLeader.GetClusterID(), 10)
endpoints, err := discovery.Discover(etcdClient, clusterID, utils.TSOServiceName)
suite.NoError(err)
if len(endpoints) != 0 {
endpoints, err = discovery.Discover(etcdClient, utils.TSOServiceName)
endpoints, err = discovery.Discover(etcdClient, clusterID, utils.TSOServiceName)
suite.NoError(err)
suite.Empty(endpoints)
}
Expand Down

0 comments on commit 3a61876

Please sign in to comment.