Skip to content

Commit

Permalink
Return apiserver addresses from both etcd and endpoints
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Dec 6, 2024
1 parent 71918e0 commit 168b344
Showing 1 changed file with 81 additions and 14 deletions.
95 changes: 81 additions & 14 deletions pkg/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/k3s-io/k3s/pkg/bootstrap"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/nodepassword"
"github.com/k3s-io/k3s/pkg/server/auth"
"github.com/k3s-io/k3s/pkg/util"
Expand All @@ -31,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
Expand Down Expand Up @@ -305,22 +307,15 @@ func fileHandler(fileName ...string) http.Handler {
})
}

// apiserversHandler returns a list of apiserver addresses.
// It attempts to merge results from both the apiserver and directly from etcd,
// in case we are recovering from an apiserver outage that rendered the endpoint list unavailable.
func apiserversHandler(server *config.Control) http.Handler {
var endpointsClient typedcorev1.EndpointsInterface
collectAddresses := getAddressCollector(server)
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
var endpoints []string
if endpointsClient == nil {
if server.Runtime.Core != nil {
endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault)
}
}
if endpointsClient != nil {
if endpoint, _ := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil {
endpoints = util.GetAddresses(endpoint)
}
}

ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
defer cancel()
endpoints := collectAddresses(ctx)
resp.Header().Set("content-type", "application/json")
if err := json.NewEncoder(resp).Encode(endpoints); err != nil {
util.SendError(errors.Wrap(err, "failed to encode apiserver endpoints"), resp, req, http.StatusInternalServerError)
Expand Down Expand Up @@ -526,3 +521,75 @@ func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) {
return false, nil
})
}

// addressGetter is a common signature for functions that return an address channel
type addressGetter func(ctx context.Context) <-chan []string

// kubernetesGetter returns a function that returns a channel that can be read to get apiserver addresses from kubernetes endpoints
func kubernetesGetter(server *config.Control) addressGetter {
var endpointsClient typedcorev1.EndpointsInterface
return func(ctx context.Context) <-chan []string {
ch := make(chan []string, 1)
go func() {
if endpointsClient == nil {
if server.Runtime.K8s != nil {
endpointsClient = server.Runtime.K8s.CoreV1().Endpoints(metav1.NamespaceDefault)
}
}
if endpointsClient != nil {
if endpoint, err := endpointsClient.Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil {
logrus.Debugf("Failed to get apiserver addresses from kubernetes: %v", err)
} else {
ch <- util.GetAddresses(endpoint)
}
}
close(ch)
}()
return ch
}
}

// etcdGetter returns a function that returns a channel that can be read to get apiserver addresses from etcd
func etcdGetter(server *config.Control) addressGetter {
return func(ctx context.Context) <-chan []string {
ch := make(chan []string, 1)
go func() {
if addresses, err := etcd.GetAPIServerURLsFromETCD(ctx, server); err != nil {
logrus.Debugf("Failed to get apiserver addresses from etcd: %v", err)
} else {
ch <- addresses
}
close(ch)
}()
return ch
}
}

// getAddressCollector returns a function that can be called to return
// apiserver addresses from both kubernetes and etcd
func getAddressCollector(server *config.Control) func(ctx context.Context) []string {
getFromKubernetes := kubernetesGetter(server)
getFromEtcd := etcdGetter(server)

// read from both kubernetes and etcd in parallel, returning the collected results
return func(ctx context.Context) []string {
a := sets.Set[string]{}
r := []string{}
k8sCh := getFromKubernetes(ctx)
k8sOk := true
etcdCh := getFromEtcd(ctx)
etcdOk := true

for k8sOk || etcdOk {
select {
case r, k8sOk = <-k8sCh:
a.Insert(r...)
case r, etcdOk = <-etcdCh:
a.Insert(r...)
case <-ctx.Done():
return a.UnsortedList()
}
}
return a.UnsortedList()
}
}

0 comments on commit 168b344

Please sign in to comment.