Skip to content

Commit

Permalink
feat: add static discovery provider (#72)
Browse files Browse the repository at this point in the history
* feat: add static discovery provider
* fix: data race
  • Loading branch information
Tochemey committed Jul 9, 2023
1 parent d5767a5 commit 7ca75e3
Show file tree
Hide file tree
Showing 10 changed files with 625 additions and 98 deletions.
22 changes: 7 additions & 15 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
)

const (
clientPortName = "clients-port"
peersPortName = "peers-port"
memberAddThreshold = 5
)

Expand Down Expand Up @@ -102,7 +100,7 @@ func (c *Cluster) Start(ctx context.Context) error {
for _, discoNode := range discoNodes {
if !(hostNode.host == discoNode.Host) {
// build the peer URL
_, clientsURL := nodeURLs(discoNode)
_, clientsURL := discoNode.URLs()
// set the endpoints
existingEndpoints = append(existingEndpoints, clientsURL)
}
Expand Down Expand Up @@ -170,7 +168,7 @@ func (c *Cluster) Start(ctx context.Context) error {
endpoints := make([]string, len(discoNodes))
for i, discoNode := range discoNodes {
// build the peer URL
peersURL, clientsURL := nodeURLs(discoNode)
peersURL, clientsURL := discoNode.URLs()
// build the initial cluster builder
initialPeerURLs[i] = fmt.Sprintf("%s=%s", discoNode.Name, peersURL)
// set the endpoints
Expand Down Expand Up @@ -346,7 +344,7 @@ func (c *Cluster) handleClusterEvents(events <-chan discovery.Event) {
// when matching member is found
if member != nil {
// grab the latest URLs
peerURL, clientURL := nodeURLs(latest)
peerURL, clientURL := latest.URLs()
// create an instance of member and set the various URLs
m := &etcdserverpb.Member{
ID: member.GetID(),
Expand Down Expand Up @@ -401,9 +399,9 @@ func (c *Cluster) whoami(discoNodes []*discovery.Node) *hostNode {
// here the host node is found
if slices.Contains(addresses, discoNode.Host) {
// get the peer port
peersPort := discoNode.Ports[peersPortName]
peersPort := discoNode.PeersPort()
// get the clients port
clientsPort := discoNode.Ports[clientPortName]
clientsPort := discoNode.ClientsPort()

// let us build the host peer URLs and getClient URLs
peerURLs := goset.NewSet[string]()
Expand Down Expand Up @@ -458,7 +456,7 @@ func (c *Cluster) discoverNodes(ctx context.Context) []*discovery.Node {
// remove duplicate
for _, discoNode := range discoNodes {
// let us get the node URL
peersURL, _ := nodeURLs(discoNode)
peersURL, _ := discoNode.URLs()
// check whether the Cluster has been already discovered and ignore it
if _, ok := seen[peersURL]; ok {
continue
Expand All @@ -484,16 +482,10 @@ func (c *Cluster) discoverNodes(ctx context.Context) []*discovery.Node {
return nodes
}

// nodeURLs returns the actual node URLs
func nodeURLs(node *discovery.Node) (peersURL string, clientURL string) {
return fmt.Sprintf("http://%s:%d", node.Host, node.Ports[peersPortName]),
fmt.Sprintf("http://%s:%d", node.Host, node.Ports[clientPortName])
}

// locateMember helps find a given member using its peerURL
func locateMember(members []*etcdserverpb.Member, node *discovery.Node) *etcdserverpb.Member {
// grab the given node URL
peerURL, _ := nodeURLs(node)
peerURL, _ := node.URLs()
for _, member := range members {
if slices.Contains(member.GetPeerURLs(), peerURL) && member.GetName() == node.Name {
return member
Expand Down
Loading

0 comments on commit 7ca75e3

Please sign in to comment.