Skip to content

Commit

Permalink
Allow using different resource group and listner name with the in mem…
Browse files Browse the repository at this point in the history
…ory server (#10096)
  • Loading branch information
fabriziopandini committed Feb 9, 2024
1 parent adb6738 commit 4f7645b
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
// ResourceGroupAnnotationName tracks the name of a resource group a InMemoryCluster cluster is linked to.
ResourceGroupAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/resource-group"
// ListenerAnnotationName tracks the name of the listener a cluster is linked to.
// NOTE: the annotation must be added by the components that creates the listener only if using the HotRestart feature.
ListenerAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/listener"

// ClusterFinalizer allows InMemoryClusterReconciler to clean up resources associated with InMemoryCluster before
// removing it from the API server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ func (r *InMemoryClusterReconciler) reconcileHotRestart(ctx context.Context) err
}

func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Store the resource group used by this inMemoryCluster.
inMemoryCluster.Annotations[infrav1.ResourceGroupAnnotationName] = resourceGroup
inMemoryCluster.Annotations[infrav1.ListenerAnnotationName] = listenerName

// Create a resource group for all the in memory resources belonging the workload cluster;
// if the resource group already exists, the operation is a no-op.
Expand All @@ -166,13 +168,13 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *

// Initialize a listener for the workload cluster; if the listener has been already initialized
// the operation is a no-op.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// IMPORTANT: The fact that both the listener and the resourceGroup for a workload cluster have
// the same name is used by the current implementation of the resourceGroup resolvers in the APIServerMux.
listener, err := r.APIServerMux.InitWorkloadClusterListener(resourceGroup)
listener, err := r.APIServerMux.InitWorkloadClusterListener(listenerName)
if err != nil {
return errors.Wrap(err, "failed to init the listener for the workload cluster")
}
if err := r.APIServerMux.RegisterResourceGroup(listenerName, resourceGroup); err != nil {
return errors.Wrap(err, "failed to register the resource group for the workload cluster")
}

// Surface the control plane endpoint
if inMemoryCluster.Spec.ControlPlaneEndpoint.Host == "" {
Expand All @@ -187,14 +189,16 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *
}

func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Delete the resource group hosting all the in memory resources belonging the workload cluster;
r.InMemoryManager.DeleteResourceGroup(resourceGroup)

// Delete the listener for the workload cluster;
if err := r.APIServerMux.DeleteWorkloadClusterListener(resourceGroup); err != nil {
if err := r.APIServerMux.DeleteWorkloadClusterListener(listenerName); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (r *InMemoryMachineReconciler) reconcileNormal(ctx context.Context, cluster
}

func (r *InMemoryMachineReconciler) reconcileNormalCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -320,8 +319,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalNode(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -405,9 +403,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the etcd pod
Expand Down Expand Up @@ -484,7 +483,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
}

// If there is not yet an etcd member listener for this machine, add it to the server.
if !r.APIServerMux.HasEtcdMember(resourceGroup, etcdMember) {
if !r.APIServerMux.HasEtcdMember(listenerName, etcdMember) {
// Getting the etcd CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.EtcdCA)
if err != nil {
Expand All @@ -510,7 +509,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{}, errors.Wrapf(err, "invalid etcd CA: invalid %s", secret.TLSKeyDataName)
}

if err := r.APIServerMux.AddEtcdMember(resourceGroup, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddEtcdMember(listenerName, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start etcd member")
}
}
Expand Down Expand Up @@ -609,9 +608,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the apiserver pod
Expand Down Expand Up @@ -651,7 +651,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
}

// If there is not yet an API server listener for this machine.
if !r.APIServerMux.HasAPIServer(resourceGroup, apiServer) {
if !r.APIServerMux.HasAPIServer(listenerName, apiServer) {
// Getting the Kubernetes CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.ClusterCA)
if err != nil {
Expand Down Expand Up @@ -679,7 +679,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context

// Adding the APIServer.
// NOTE: When the first APIServer is added, the workload cluster listener is started.
if err := r.APIServerMux.AddAPIServer(resourceGroup, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddAPIServer(listenerName, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start API server")
}
}
Expand All @@ -703,7 +703,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -751,7 +750,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -791,7 +789,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -861,7 +858,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeProxy(ctx context.Context
// TODO: Add provisioning time for KubeProxy.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -908,7 +904,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalCoredns(ctx context.Context,
// TODO: Add provisioning time for CoreDNS.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -996,7 +991,6 @@ func (r *InMemoryMachineReconciler) reconcileDelete(ctx context.Context, cluster

func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1015,7 +1009,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Cont

func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1040,9 +1033,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

etcdMember := fmt.Sprintf("etcd-%s", inMemoryMachine.Name)
Expand All @@ -1055,7 +1049,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
if err := inmemoryClient.Delete(ctx, etcdPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete etcd Pod")
}
if err := r.APIServerMux.DeleteEtcdMember(resourceGroup, etcdMember); err != nil {
if err := r.APIServerMux.DeleteEtcdMember(listenerName, etcdMember); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1073,9 +1067,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

apiServer := fmt.Sprintf("kube-apiserver-%s", inMemoryMachine.Name)
Expand All @@ -1088,7 +1083,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
if err := inmemoryClient.Delete(ctx, apiServerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete apiServer Pod")
}
if err := r.APIServerMux.DeleteAPIServer(resourceGroup, apiServer); err != nil {
if err := r.APIServerMux.DeleteAPIServer(listenerName, apiServer); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1102,7 +1097,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1126,7 +1120,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down
7 changes: 7 additions & 0 deletions test/infrastructure/inmemory/pkg/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type WorkloadClusterListener struct {
host string
port int

resourceGroup string

scheme *runtime.Scheme

apiServers sets.Set[string]
Expand All @@ -65,6 +67,11 @@ func (s *WorkloadClusterListener) Port() int {
return s.port
}

// ResourceGroup returns the resource group that hosts in memory resources for a WorkloadClusterListener.
func (s *WorkloadClusterListener) ResourceGroup() string {
return s.resourceGroup
}

// Address returns the address of a WorkloadClusterListener.
func (s *WorkloadClusterListener) Address() string {
return fmt.Sprintf("https://%s", s.HostPort())
Expand Down
67 changes: 61 additions & 6 deletions test/infrastructure/inmemory/pkg/server/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,25 @@ func NewWorkloadClustersMux(manager inmemoryruntime.Manager, host string, opts .
func (m *WorkloadClustersMux) mixedHandler() http.Handler {
// Prepare a function that can identify which workloadCluster/resourceGroup a
// request targets to.
// IMPORTANT: this function assumes that both the listener and the resourceGroup
// for a workload cluster have the same name.
resourceGroupResolver := func(host string) (string, error) {
m.lock.RLock()
defer m.lock.RUnlock()
wclName, ok := m.workloadClusterNameByHost[host]
if !ok {
return "", errors.Errorf("failed to get workloadClusterListener for host %s", host)
}
return wclName, nil
wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
// NOTE: this should never happen because initWorkloadClusterListenerWithPortLocked always add a workload cluster in both maps
panic(fmt.Sprintf("workloadCluster with name %s exists in workloadClusterNameByHost but not workloadClusterListeners", wclName))
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}

return resourceGroup, nil
}

// build the handlers for API server and etcd.
Expand Down Expand Up @@ -260,12 +269,12 @@ func (m *WorkloadClustersMux) HotRestart(clusters *infrav1.InMemoryClusterList)
return errors.Errorf("unable to restart the WorkloadClustersMux, there are two or more clusters using port %d", c.Spec.ControlPlaneEndpoint.Port)
}

resourceGroup, ok := c.Annotations[infrav1.ResourceGroupAnnotationName]
listenerName, ok := c.Annotations[infrav1.ListenerAnnotationName]
if !ok {
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ResourceGroupAnnotationName)
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ListenerAnnotationName)
}

m.initWorkloadClusterListenerWithPortLocked(resourceGroup, c.Spec.ControlPlaneEndpoint.Port)
m.initWorkloadClusterListenerWithPortLocked(listenerName, c.Spec.ControlPlaneEndpoint.Port)

if maxPort < c.Spec.ControlPlaneEndpoint.Port {
maxPort = c.Spec.ControlPlaneEndpoint.Port
Expand Down Expand Up @@ -307,13 +316,59 @@ func (m *WorkloadClustersMux) initWorkloadClusterListenerWithPortLocked(wclName
etcdMembers: sets.New[string](),
etcdServingCertificates: map[string]*tls.Certificate{},
}

// NOTE: it is required to add on both maps and keep them in sync
// In order to get the resourceGroupResolver to work.
m.workloadClusterListeners[wclName] = wcl
m.workloadClusterNameByHost[wcl.HostPort()] = wclName

m.log.Info("Workload cluster listener created", "listenerName", wclName, "address", wcl.Address())
return wcl
}

// RegisterResourceGroup registers the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) RegisterResourceGroup(wclName, resourceGroup string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before registering a resource group", wclName)
}
wcl.resourceGroup = resourceGroup
return nil
}

// ResourceGroupByWorkloadCluster returns the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) ResourceGroupByWorkloadCluster(wclName string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return "", errors.Errorf("workloadClusterListener with name %s not yet initialized", wclName)
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}
return resourceGroup, nil
}

// WorkloadClusterByResourceGroup returns the WorkloadClusterListener that serves resources from a given resource.
func (m *WorkloadClustersMux) WorkloadClusterByResourceGroup(resouceGroup string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

for wclName, wcl := range m.workloadClusterListeners {
if wcl.ResourceGroup() == resouceGroup {
return wclName, nil
}
}
return "", errors.Errorf("resouceGroup with name %s not yet registered to a workloadClusterListener", resouceGroup)
}

// AddAPIServer mimics adding an API server instance behind the WorkloadClusterListener.
// When the first API server instance is added the serving certificates and the admin certificate
// for tests are generated, and the listener is started.
Expand Down
Loading

0 comments on commit 4f7645b

Please sign in to comment.