Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Allow using different resource group and listener name with the in memory server #10096

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
// ResourceGroupAnnotationName tracks the name of a resource group a InMemoryCluster cluster is linked to.
ResourceGroupAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/resource-group"
// ListenerAnnotationName tracks the name of the listener a cluster is linked to.
// NOTE: the annotation must be added by the components that creates the listener only if using the HotRestart feature.
ListenerAnnotationName = "inmemorycluster.infrastructure.cluster.x-k8s.io/listener"

// ClusterFinalizer allows InMemoryClusterReconciler to clean up resources associated with InMemoryCluster before
// removing it from the API server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ func (r *InMemoryClusterReconciler) reconcileHotRestart(ctx context.Context) err
}

func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Store the resource group used by this inMemoryCluster.
inMemoryCluster.Annotations[infrav1.ResourceGroupAnnotationName] = resourceGroup
inMemoryCluster.Annotations[infrav1.ListenerAnnotationName] = listenerName

// Create a resource group for all the in memory resources belonging the workload cluster;
// if the resource group already exists, the operation is a no-op.
Expand All @@ -166,13 +168,13 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *

// Initialize a listener for the workload cluster; if the listener has been already initialized
// the operation is a no-op.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// IMPORTANT: The fact that both the listener and the resourceGroup for a workload cluster have
// the same name is used by the current implementation of the resourceGroup resolvers in the APIServerMux.
listener, err := r.APIServerMux.InitWorkloadClusterListener(resourceGroup)
listener, err := r.APIServerMux.InitWorkloadClusterListener(listenerName)
if err != nil {
return errors.Wrap(err, "failed to init the listener for the workload cluster")
}
if err := r.APIServerMux.RegisterResourceGroup(listenerName, resourceGroup); err != nil {
return errors.Wrap(err, "failed to register the resource group for the workload cluster")
}

// Surface the control plane endpoint
if inMemoryCluster.Spec.ControlPlaneEndpoint.Host == "" {
Expand All @@ -187,14 +189,16 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *
}

func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) error {
// Compute the resource group unique name.
// Compute the name for resource group and listener.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()

// Delete the resource group hosting all the in memory resources belonging the workload cluster;
r.InMemoryManager.DeleteResourceGroup(resourceGroup)

// Delete the listener for the workload cluster;
if err := r.APIServerMux.DeleteWorkloadClusterListener(resourceGroup); err != nil {
if err := r.APIServerMux.DeleteWorkloadClusterListener(listenerName); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (r *InMemoryMachineReconciler) reconcileNormal(ctx context.Context, cluster
}

func (r *InMemoryMachineReconciler) reconcileNormalCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -320,8 +319,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalNode(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the name for resource group.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -405,9 +403,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the etcd pod
Expand Down Expand Up @@ -484,7 +483,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
}

// If there is not yet an etcd member listener for this machine, add it to the server.
if !r.APIServerMux.HasEtcdMember(resourceGroup, etcdMember) {
if !r.APIServerMux.HasEtcdMember(listenerName, etcdMember) {
// Getting the etcd CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.EtcdCA)
if err != nil {
Expand All @@ -510,7 +509,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{}, errors.Wrapf(err, "invalid etcd CA: invalid %s", secret.TLSKeyDataName)
}

if err := r.APIServerMux.AddEtcdMember(resourceGroup, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddEtcdMember(listenerName, etcdMember, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start etcd member")
}
}
Expand Down Expand Up @@ -609,9 +608,10 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
return ctrl.Result{RequeueAfter: start.Add(provisioningDuration).Sub(now)}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

// Create the apiserver pod
Expand Down Expand Up @@ -651,7 +651,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
}

// If there is not yet an API server listener for this machine.
if !r.APIServerMux.HasAPIServer(resourceGroup, apiServer) {
if !r.APIServerMux.HasAPIServer(listenerName, apiServer) {
// Getting the Kubernetes CA
s, err := secret.Get(ctx, r.Client, client.ObjectKeyFromObject(cluster), secret.ClusterCA)
if err != nil {
Expand Down Expand Up @@ -679,7 +679,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context

// Adding the APIServer.
// NOTE: When the first APIServer is added, the workload cluster listener is started.
if err := r.APIServerMux.AddAPIServer(resourceGroup, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
if err := r.APIServerMux.AddAPIServer(listenerName, apiServer, cert, key.(*rsa.PrivateKey)); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to start API server")
}
}
Expand All @@ -703,7 +703,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -751,7 +750,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -791,7 +789,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -861,7 +858,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeProxy(ctx context.Context
// TODO: Add provisioning time for KubeProxy.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -908,7 +904,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalCoredns(ctx context.Context,
// TODO: Add provisioning time for CoreDNS.

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down Expand Up @@ -996,7 +991,6 @@ func (r *InMemoryMachineReconciler) reconcileDelete(ctx context.Context, cluster

func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1015,7 +1009,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteCloudMachine(ctx context.Cont

func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, cluster *clusterv1.Cluster, _ *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1040,9 +1033,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

etcdMember := fmt.Sprintf("etcd-%s", inMemoryMachine.Name)
Expand All @@ -1055,7 +1049,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteETCD(ctx context.Context, clu
if err := inmemoryClient.Delete(ctx, etcdPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete etcd Pod")
}
if err := r.APIServerMux.DeleteEtcdMember(resourceGroup, etcdMember); err != nil {
if err := r.APIServerMux.DeleteEtcdMember(listenerName, etcdMember); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1073,9 +1067,10 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
return ctrl.Result{}, nil
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
// Compute the resource group and listener unique name.
// NOTE: we are using the same name for convenience, but it is not required.
resourceGroup := klog.KObj(cluster).String()
listenerName := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

apiServer := fmt.Sprintf("kube-apiserver-%s", inMemoryMachine.Name)
Expand All @@ -1088,7 +1083,7 @@ func (r *InMemoryMachineReconciler) reconcileDeleteAPIServer(ctx context.Context
if err := inmemoryClient.Delete(ctx, apiServerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete apiServer Pod")
}
if err := r.APIServerMux.DeleteAPIServer(resourceGroup, apiServer); err != nil {
if err := r.APIServerMux.DeleteAPIServer(listenerName, apiServer); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -1102,7 +1097,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteScheduler(ctx context.Context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand All @@ -1126,7 +1120,6 @@ func (r *InMemoryMachineReconciler) reconcileDeleteControllerManager(ctx context
}

// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
inmemoryClient := r.InMemoryManager.GetResourceGroup(resourceGroup).GetClient()

Expand Down
7 changes: 7 additions & 0 deletions test/infrastructure/inmemory/pkg/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type WorkloadClusterListener struct {
host string
port int

resourceGroup string

scheme *runtime.Scheme

apiServers sets.Set[string]
Expand All @@ -65,6 +67,11 @@ func (s *WorkloadClusterListener) Port() int {
return s.port
}

// ResourceGroup returns the resource group that hosts in memory resources for a WorkloadClusterListener.
func (s *WorkloadClusterListener) ResourceGroup() string {
return s.resourceGroup
}

// Address returns the address of a WorkloadClusterListener.
func (s *WorkloadClusterListener) Address() string {
return fmt.Sprintf("https://%s", s.HostPort())
Expand Down
67 changes: 61 additions & 6 deletions test/infrastructure/inmemory/pkg/server/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,25 @@ func NewWorkloadClustersMux(manager inmemoryruntime.Manager, host string, opts .
func (m *WorkloadClustersMux) mixedHandler() http.Handler {
// Prepare a function that can identify which workloadCluster/resourceGroup a
// request targets to.
// IMPORTANT: this function assumes that both the listener and the resourceGroup
// for a workload cluster have the same name.
resourceGroupResolver := func(host string) (string, error) {
m.lock.RLock()
defer m.lock.RUnlock()
wclName, ok := m.workloadClusterNameByHost[host]
if !ok {
return "", errors.Errorf("failed to get workloadClusterListener for host %s", host)
}
return wclName, nil
wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
// NOTE: this should never happen because initWorkloadClusterListenerWithPortLocked always add a workload cluster in both maps
panic(fmt.Sprintf("workloadCluster with name %s exists in workloadClusterNameByHost but not workloadClusterListeners", wclName))
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}

return resourceGroup, nil
}

// build the handlers for API server and etcd.
Expand Down Expand Up @@ -260,12 +269,12 @@ func (m *WorkloadClustersMux) HotRestart(clusters *infrav1.InMemoryClusterList)
return errors.Errorf("unable to restart the WorkloadClustersMux, there are two or more clusters using port %d", c.Spec.ControlPlaneEndpoint.Port)
}

resourceGroup, ok := c.Annotations[infrav1.ResourceGroupAnnotationName]
listenerName, ok := c.Annotations[infrav1.ListenerAnnotationName]
if !ok {
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ResourceGroupAnnotationName)
return errors.Errorf("unable to restart the WorkloadClustersMux, cluster %s doesn't have the %s annotation", klog.KRef(c.Namespace, c.Name), infrav1.ListenerAnnotationName)
}

m.initWorkloadClusterListenerWithPortLocked(resourceGroup, c.Spec.ControlPlaneEndpoint.Port)
m.initWorkloadClusterListenerWithPortLocked(listenerName, c.Spec.ControlPlaneEndpoint.Port)

if maxPort < c.Spec.ControlPlaneEndpoint.Port {
maxPort = c.Spec.ControlPlaneEndpoint.Port
Expand Down Expand Up @@ -307,13 +316,59 @@ func (m *WorkloadClustersMux) initWorkloadClusterListenerWithPortLocked(wclName
etcdMembers: sets.New[string](),
etcdServingCertificates: map[string]*tls.Certificate{},
}

// NOTE: it is required to add on both maps and keep them in sync
// In order to get the resourceGroupResolver to work.
m.workloadClusterListeners[wclName] = wcl
m.workloadClusterNameByHost[wcl.HostPort()] = wclName

m.log.Info("Workload cluster listener created", "listenerName", wclName, "address", wcl.Address())
return wcl
}

// RegisterResourceGroup registers the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) RegisterResourceGroup(wclName, resourceGroup string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before registering a resource group", wclName)
}
wcl.resourceGroup = resourceGroup
return nil
}

// ResourceGroupByWorkloadCluster returns the resource group that host in memory resources for a WorkloadClusterListener.
func (m *WorkloadClustersMux) ResourceGroupByWorkloadCluster(wclName string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return "", errors.Errorf("workloadClusterListener with name %s not yet initialized", wclName)
}

resourceGroup := wcl.ResourceGroup()
if resourceGroup == "" {
return "", errors.Errorf("workloadClusterListener with name %s does not have a registered resource group", wclName)
}
return resourceGroup, nil
}

// WorkloadClusterByResourceGroup returns the WorkloadClusterListener that serves resources from a given resource.
func (m *WorkloadClustersMux) WorkloadClusterByResourceGroup(resouceGroup string) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()

for wclName, wcl := range m.workloadClusterListeners {
if wcl.ResourceGroup() == resouceGroup {
return wclName, nil
}
}
return "", errors.Errorf("resouceGroup with name %s not yet registered to a workloadClusterListener", resouceGroup)
}

// AddAPIServer mimics adding an API server instance behind the WorkloadClusterListener.
// When the first API server instance is added the serving certificates and the admin certificate
// for tests are generated, and the listener is started.
Expand Down
Loading
Loading