Skip to content

Commit

Permalink
Separate persistent config struct from LoadBalancer and make fields p…
Browse files Browse the repository at this point in the history
…rivate

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Dec 6, 2024
1 parent 13e9113 commit 67fd5fa
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
16 changes: 13 additions & 3 deletions pkg/agent/loadbalancer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ import (
"github.com/k3s-io/k3s/pkg/agent/util"
)

// lbConfig stores loadbalancer state that should be persisted across restarts.
type lbConfig struct {
ServerURL string `json:"ServerURL"`
ServerAddresses []string `json:"ServerAddresses"`
}

func (lb *LoadBalancer) writeConfig() error {
configOut, err := json.MarshalIndent(lb, "", " ")
config := &lbConfig{
ServerURL: lb.serverURL,
ServerAddresses: lb.serverAddresses,
}
configOut, err := json.MarshalIndent(config, "", " ")
if err != nil {
return err
}
Expand All @@ -18,9 +28,9 @@ func (lb *LoadBalancer) writeConfig() error {
func (lb *LoadBalancer) updateConfig() error {
writeConfig := true
if configBytes, err := os.ReadFile(lb.configFile); err == nil {
config := &LoadBalancer{}
config := &lbConfig{}
if err := json.Unmarshal(configBytes, config); err == nil {
if config.ServerURL == lb.ServerURL {
if config.ServerURL == lb.serverURL {
writeConfig = false
lb.setServers(config.ServerAddresses)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/agent/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ type LoadBalancer struct {
localAddress string
localServerURL string
defaultServerAddress string
ServerURL string
ServerAddresses []string
serverURL string
serverAddresses []string
randomServers []string
servers map[string]*server
currentServerAddress string
nextServerIndex int
Listener net.Listener
}

const RandomPort = 0
Expand Down Expand Up @@ -105,7 +104,7 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
localServerURL: localServerURL,
defaultServerAddress: defaultServerAddress,
servers: make(map[string]*server),
ServerURL: serverURL,
serverURL: serverURL,
}

lb.setServers([]string{lb.defaultServerAddress})
Expand All @@ -127,7 +126,7 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
if err := lb.proxy.Start(); err != nil {
return nil, err
}
logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.ServerAddresses, lb.defaultServerAddress)
logrus.Infof("Running load balancer %s %s -> %v [default: %s]", serviceName, lb.localAddress, lb.serverAddresses, lb.defaultServerAddress)

go lb.runHealthChecks(ctx)

Expand All @@ -141,7 +140,7 @@ func (lb *LoadBalancer) Update(serverAddresses []string) {
if !lb.setServers(serverAddresses) {
return
}
logrus.Infof("Updated load balancer %s server addresses -> %v [default: %s]", lb.serviceName, lb.ServerAddresses, lb.defaultServerAddress)
logrus.Infof("Updated load balancer %s server addresses -> %v [default: %s]", lb.serviceName, lb.serverAddresses, lb.defaultServerAddress)

if err := lb.writeConfig(); err != nil {
logrus.Warnf("Error updating load balancer %s config: %s", lb.serviceName, err)
Expand All @@ -155,6 +154,13 @@ func (lb *LoadBalancer) LoadBalancerServerURL() string {
return lb.localServerURL
}

func (lb *LoadBalancer) ServerAddresses() []string {
if lb == nil {
return nil
}
return lb.serverAddresses
}

func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net.Conn, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/loadbalancer/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
defer lb.mutex.Unlock()

newAddresses := sets.NewString(serverAddresses...)
curAddresses := sets.NewString(lb.ServerAddresses...)
curAddresses := sets.NewString(lb.serverAddresses...)
if newAddresses.Equal(curAddresses) {
return false
}
Expand Down Expand Up @@ -53,8 +53,8 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
}
}

lb.ServerAddresses = serverAddresses
lb.randomServers = append([]string{}, lb.ServerAddresses...)
lb.serverAddresses = serverAddresses
lb.randomServers = append([]string{}, lb.serverAddresses...)
rand.Shuffle(len(lb.randomServers), func(i, j int) {
lb.randomServers[i], lb.randomServers[j] = lb.randomServers[j], lb.randomServers[i]
})
Expand Down Expand Up @@ -155,7 +155,7 @@ func (lb *LoadBalancer) SetDefault(serverAddress string) {
lb.mutex.Lock()
defer lb.mutex.Unlock()

hasDefaultServer := slices.Contains(lb.ServerAddresses, lb.defaultServerAddress)
hasDefaultServer := slices.Contains(lb.serverAddresses, lb.defaultServerAddress)
// if the old default server is not currently in use, remove it from the server map
if server := lb.servers[lb.defaultServerAddress]; server != nil && !hasDefaultServer {
defer server.closeAll()
Expand Down Expand Up @@ -211,7 +211,7 @@ func (lb *LoadBalancer) runHealthChecks(ctx context.Context) {
// If there is at least one healthy server, and the default server is not in the server list,
// close all the connections to the default server so that clients reconnect and switch over
// to a preferred server.
hasDefaultServer := slices.Contains(lb.ServerAddresses, lb.defaultServerAddress)
hasDefaultServer := slices.Contains(lb.serverAddresses, lb.defaultServerAddress)
if healthyServerExists && !hasDefaultServer {
if server, ok := lb.servers[lb.defaultServerAddress]; ok {
defer server.closeAll()
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcdproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *etcdproxy) Update(addresses []string) {
e.etcdLB.Update(addresses)

validEndpoint := map[string]bool{}
for _, address := range e.etcdLB.ServerAddresses {
for _, address := range e.etcdLB.ServerAddresses() {
validEndpoint[address] = true
if _, ok := e.disconnect[address]; !ok {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 67fd5fa

Please sign in to comment.