Skip to content

Commit

Permalink
Allow using different resource group and listner name with the in mem…
Browse files Browse the repository at this point in the history
…ory server (kubernetes-sigs#10096)
  • Loading branch information
fabriziopandini authored and Dhairya-Arora01 committed May 25, 2024
1 parent d4fcdbe commit 8af84e7
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
// ResourceGroupAnnotationName tracks the name of a resource group a InMemoryCluster cluster is linked to.
ResourceGroupAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/resource-group"
// ListenerAnnotationName tracks the name of the listener a cluster is linked to.
// NOTE: the annotation must be added by the components that creates the listener only if using the HotRestart feature.
ListenerAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/listener"

// ClusterFinalizer allows InMemoryClusterReconciler to clean up resources associated with InMemoryCluster before
// removing it from the API server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ func (r *InMemoryClusterReconciler) reconcileHotRestart(ctx context.Context) err
}

func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Store the resource group used by this inMemoryCluster.
inMemoryCluster.Annotations[infrav1.ResourceGroupAnnotationName] = resourceGroup
inMemoryCluster.Annotations[infrav1.ListenerAnnotationName] = listenerName

// Create a resource group for all the in memory resources belonging the workload cluster;
// if the resource group already exists, the operation is a no-op.
Expand All @@ -166,13 +168,13 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *

// Initialize a listener for the workload cluster; if the listener has been already initialized
// the operation is a no-op.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// IMPORTANT: The fact that both the listener and the resourceGroup for a workload cluster have
// the same name is used by the current implementation of the resourceGroup resolvers in the APIServerMux.
listener, err := r.APIServerMux.InitWorkloadClusterListener(resourceGroup)
listener, err := r.APIServerMux.InitWorkloadClusterListener(listenerName)
if err != nil {
return errors.Wrap(err, "failed to init the listener for the workload cluster")
}
if err := r.APIServerMux.RegisterResourceGroup(listenerName, resourceGroup); err != nil {
return errors.Wrap(err, "failed to register the resource group for the workload cluster")
}

// Surface the control plane endpoint
if inMemoryCluster.Spec.ControlPlaneEndpoint.Host == "" {
Expand All @@ -187,14 +189,16 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *
}

func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Delete the resource group hosting all the in memory resources belonging the workload cluster;
r.InMemoryManager.DeleteResourceGroup(resourceGroup)

// Delete the listener for the workload cluster;
if err := r.APIServerMux.DeleteWorkloadClusterListener(resourceGroup); err != nil {
if err := r.APIServerMux.DeleteWorkloadClusterListener(listenerName); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (r *InMemoryMachineReconciler) reconcileNormal(ctx context.Context, cluster
}

func (r *InMemoryMachineReconciler) reconcileNormalCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -320,8 +319,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalNode(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -405,9 +403,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the etcd pod
Expand Down Expand Up @@ -484,7 +483,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
}

// If there is not yet an etcd member listener for this machine, add it to the server.
if !r.APIServerMux.HasEtcdMember(resourceGroup, etcdMember) {
if !r.APIServerMux.HasEtcdMember(listenerName, etcdMember) {
// Getting the etcd CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.EtcdCA)
if err != nil {
Expand All @@ -510,7 +509,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{}, errors.Wrapf(err, "invalid etcd CA: invalid %s", secret.TLSKeyDataName)
}

if err := r.APIServerMux.AddEtcdMember(resourceGroup, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddEtcdMember(listenerName, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start etcd member")
}
}
Expand Down Expand Up @@ -609,9 +608,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the apiserver pod
Expand Down Expand Up @@ -651,7 +651,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
}

// If there is not yet an API server listener for this machine.
if !r.APIServerMux.HasAPIServer(resourceGroup, apiServer) {
if !r.APIServerMux.HasAPIServer(listenerName, apiServer) {
// Getting the Kubernetes CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.ClusterCA)
if err != nil {
Expand Down Expand Up @@ -679,7 +679,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context

// Adding the APIServer.
// NOTE: When the first APIServer is added, the workload cluster listener is started.
if err := r.APIServerMux.AddAPIServer(resourceGroup, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddAPIServer(listenerName, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start API server")
}
}
Expand All @@ -703,7 +703,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -751,7 +750,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -791,7 +789,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -861,7 +858,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeProxy(ctx context.Context
// TODO: Add provisioning time for KubeProxy.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -908,7 +904,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalCoredns(ctx context.Context,
// TODO: Add provisioning time for CoreDNS.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -996,7 +991,6 @@ func (r *InMemoryMachineReconciler) reconcileDelete(ctx context.Context, cluster

func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1015,7 +1009,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Cont

func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1040,9 +1033,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

etcdMember := fmt.Sprintf("etcd-%s", inMemoryMachine.Name)
Expand All @@ -1055,7 +1049,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
if err := inmemoryClient.Delete(ctx, etcdPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete etcd Pod")
}
if err := r.APIServerMux.DeleteEtcdMember(resourceGroup, etcdMember); err != nil {
if err := r.APIServerMux.DeleteEtcdMember(listenerName, etcdMember); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1073,9 +1067,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

apiServer := fmt.Sprintf("kube-apiserver-%s", inMemoryMachine.Name)
Expand All @@ -1088,7 +1083,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
if err := inmemoryClient.Delete(ctx, apiServerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete apiServer Pod")
}
if err := r.APIServerMux.DeleteAPIServer(resourceGroup, apiServer); err != nil {
if err := r.APIServerMux.DeleteAPIServer(listenerName, apiServer); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1102,7 +1097,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1126,7 +1120,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down
7 changes: 7 additions & 0 deletions test/infrastructure/inmemory/pkg/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type WorkloadClusterListener struct {
host string
port int

resourceGroup string

scheme *runtime.Scheme

apiServers sets.Set[string]
Expand All @@ -65,6 +67,11 @@ func (s *WorkloadClusterListener) Port() int {
return s.port
}

// ResourceGroup returns the resource group that hosts in memory resources for a WorkloadClusterListener.
func (s *WorkloadClusterListener) ResourceGroup() string {
return s.resourceGroup
}

// Address returns the address of a WorkloadClusterListener.
func (s *WorkloadClusterListener) Address() string {
return fmt.Sprintf("https://%s", s.HostPort())
Expand Down
67 changes: 61 additions & 6 deletions test/infrastructure/inmemory/pkg/server/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,25 @@ func NewWorkloadClustersMux(manager inmemoryruntime.Manager, host string, opts .
func (m *WorkloadClustersMux) mixedHandler() http.Handler {
// Prepare a function that can identify which workloadCluster/resourceGroup a
// request targets to.
// IMPORTANT: this function assumes that both the listener and the resourceGroup
// for a workload cluster have the same name.
resourceGroupResolver := func(host string) (string, error) {
m.lock.RLock()
defer m.lock.RUnlock()
wclName, ok := m.workloadClusterNameByHost[host]
if !ok {
return "", errors.Errorf("failed to get workloadClusterListener for host %s", host)
}
return wclName, nil
wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
// NOTE: this should never happen because initWorkloadClusterListenerWithPortLocked always add a workload cluster in both maps
panic(fmt.Sprintf("workloadCluster with name %s exists in workloadClusterNameByHost but not workloadClusterListeners", wclName))
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}

return resourceGroup, nil
}

// build the handlers for API server and etcd.
Expand Down Expand Up @@ -260,12 +269,12 @@ func (m *WorkloadClustersMux) HotRestart(clusters *infrav1.InMemoryClusterList)
return errors.Errorf("unable to restart the WorkloadClustersMux, there are two or more clusters using port %d", c.Spec.ControlPlaneEndpoint.Port)
}

resourceGroup, ok := c.Annotations[infrav1.ResourceGroupAnnotationName]
listenerName, ok := c.Annotations[infrav1.ListenerAnnotationName]
if !ok {
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ResourceGroupAnnotationName)
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ListenerAnnotationName)
}

m.initWorkloadClusterListenerWithPortLocked(resourceGroup, c.Spec.ControlPlaneEndpoint.Port)
m.initWorkloadClusterListenerWithPortLocked(listenerName, c.Spec.ControlPlaneEndpoint.Port)

if maxPort < c.Spec.ControlPlaneEndpoint.Port {
maxPort = c.Spec.ControlPlaneEndpoint.Port
Expand Down Expand Up @@ -307,13 +316,59 @@ func (m *WorkloadClustersMux) initWorkloadClusterListenerWithPortLocked(wclName
etcdMembers: sets.New[string](),
etcdServingCertificates: map[string]*tls.Certificate{},
}

// NOTE: it is required to add on both maps and keep them in sync
// In order to get the resourceGroupResolver to work.
m.workloadClusterListeners[wclName] = wcl
m.workloadClusterNameByHost[wcl.HostPort()] = wclName

m.log.Info("Workload cluster listener created", "listenerName", wclName, "address", wcl.Address())
return wcl
}

// RegisterResourceGroup registers the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) RegisterResourceGroup(wclName, resourceGroup string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before registering a resource group", wclName)
}
wcl.resourceGroup = resourceGroup
return nil
}

// ResourceGroupByWorkloadCluster returns the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) ResourceGroupByWorkloadCluster(wclName string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return "", errors.Errorf("workloadClusterListener with name %s not yet initialized", wclName)
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}
return resourceGroup, nil
}

// WorkloadClusterByResourceGroup returns the WorkloadClusterListener that serves resources from a given resource.
func (m *WorkloadClustersMux) WorkloadClusterByResourceGroup(resouceGroup string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

for wclName, wcl := range m.workloadClusterListeners {
if wcl.ResourceGroup() == resouceGroup {
return wclName, nil
}
}
return "", errors.Errorf("resouceGroup with name %s not yet registered to a workloadClusterListener", resouceGroup)
}

// AddAPIServer mimics adding an API server instance behind the WorkloadClusterListener.
// When the first API server instance is added the serving certificates and the admin certificate
// for tests are generated, and the listener is started.
Expand Down
Loading

0 comments on commit 8af84e7

Please sign in to comment.