From 71bc6feb3c565e26efd56bb9390e3bc3b66b0aee Mon Sep 17 00:00:00 2001 From: killianmuldoon Date: Tue, 25 Jul 2023 12:06:42 +0100 Subject: [PATCH] Add Upgrade flow to scale test Signed-off-by: killianmuldoon --- test/e2e/scale.go | 158 +++++++++++++++++- test/e2e/scale_test.go | 1 + test/framework/cluster_topology_helpers.go | 14 +- test/framework/controlplane_helpers.go | 7 - test/framework/machine_helpers.go | 8 +- .../inmemory/pkg/server/api/handler.go | 17 +- .../inmemory/pkg/server/mux_test.go | 12 +- 7 files changed, 193 insertions(+), 24 deletions(-) diff --git a/test/e2e/scale.go b/test/e2e/scale.go index edfee9ee372e..c6d400e4c723 100644 --- a/test/e2e/scale.go +++ b/test/e2e/scale.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1" "sigs.k8s.io/cluster-api/test/e2e/internal/log" "sigs.k8s.io/cluster-api/test/framework" "sigs.k8s.io/cluster-api/test/framework/clusterctl" @@ -123,6 +124,9 @@ type scaleSpecInput struct { // if the test suit should fail as soon as c6 fails or if it should fail after all cluster creations are done. FailFast bool + // SkipUpgrade if set to true will skip upgrading the workload clusters. + SkipUpgrade bool + // SkipCleanup if set to true will skip deleting the workload clusters. SkipCleanup bool @@ -133,7 +137,7 @@ type scaleSpecInput struct { SkipWaitForCreation bool } -// scaleSpec implements a scale test. +// scaleSpec implements a scale test for clusters with MachineDeployments. func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) { var ( specName = "scale" @@ -253,7 +257,7 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) { Flavor: flavor, Namespace: scaleClusterNamespacePlaceholder, ClusterName: scaleClusterNamePlaceholder, - KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersion), + KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeFrom), ControlPlaneMachineCount: controlPlaneMachineCount, WorkerMachineCount: workerMachineCount, }) @@ -333,6 +337,47 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) { Fail("") } + if !input.SkipUpgrade { + By("Upgrade the workload clusters concurrently") + // Get the upgrade function for upgrading the workload clusters. + upgrader := getClusterUpgradeAndWaitFn(framework.UpgradeClusterTopologyAndWaitForUpgradeInput{ + ClusterProxy: input.BootstrapClusterProxy, + KubernetesUpgradeVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeTo), + EtcdImageTag: input.E2EConfig.GetVariable(EtcdVersionUpgradeTo), + DNSImageTag: input.E2EConfig.GetVariable(CoreDNSVersionUpgradeTo), + WaitForMachinesToBeUpgraded: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), + WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), + WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), + WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), + }) + + clusterNamesToUpgrade := []string{} + for _, result := range clusterCreateResults { + clusterNamesToUpgrade = append(clusterNamesToUpgrade, result.clusterName) + } + + // Upgrade all the workload clusters. + _, err = workConcurrentlyAndWait(ctx, workConcurrentlyAndWaitInput{ + ClusterNames: clusterNamesToUpgrade, + Concurrency: concurrency, + FailFast: input.FailFast, + WorkerFunc: func(ctx context.Context, inputChan chan string, resultChan chan workResult, wg *sync.WaitGroup) { + upgradeClusterAndWaitWorker(ctx, inputChan, resultChan, wg, namespace.Name, input.DeployClusterInSeparateNamespaces, baseClusterTemplateYAML, upgrader) + }, + }) + if err != nil { + // Call Fail to notify ginkgo that the suit has failed. + // Ginkgo will print the first observed error failure in this case. + // Example: If cluster c1, c2 and c3 failed then ginkgo will only print the first + // observed failure among the these 3 clusters. + // Since ginkgo only captures one failure, to help with this we are logging the error + // that will contain the full stack trace of failure for each cluster to help with debugging. + // TODO(ykakarap): Follow-up: Explore options for improved error reporting. + log.Logf("Failed to upgrade clusters. Error: %s", err.Error()) + Fail("") + } + } + // TODO(ykakarap): Follow-up: Dump resources for the failed clusters (creation). clusterNamesToDelete := []string{} @@ -494,11 +539,9 @@ func getClusterCreateAndWaitFn(input clusterctl.ApplyCustomClusterTemplateAndWai CustomTemplateYAML: clusterTemplateYAML, ClusterName: clusterName, Namespace: namespace, - CNIManifestPath: input.CNIManifestPath, WaitForClusterIntervals: input.WaitForClusterIntervals, WaitForControlPlaneIntervals: input.WaitForControlPlaneIntervals, WaitForMachineDeployments: input.WaitForMachineDeployments, - WaitForMachinePools: input.WaitForMachinePools, Args: input.Args, PreWaitForCluster: input.PreWaitForCluster, PostMachinesProvisioned: input.PostMachinesProvisioned, @@ -594,7 +637,7 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re return true case clusterName, open := <-inputChan: // Read the cluster name from the channel. - // If the channel is closed it implies there is not more work to be done. Return. + // If the channel is closed it implies there is no more work to be done. Return. if !open { return true } @@ -648,6 +691,111 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re } } +type clusterUpgrader func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) + +func getClusterUpgradeAndWaitFn(input framework.UpgradeClusterTopologyAndWaitForUpgradeInput) clusterUpgrader { + return func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) { + resources := getClusterResourcesForUpgrade(ctx, input.ClusterProxy.GetClient(), namespace, clusterName) + // Nb. We cannot directly modify and use `input` in this closure function because this function + // will be called multiple times and this closure will keep modifying the same `input` multiple + // times. It is safer to pass the values explicitly into `UpgradeClusterTopologyAndWaitForUpgradeInput`. + framework.UpgradeClusterTopologyAndWaitForUpgrade(ctx, framework.UpgradeClusterTopologyAndWaitForUpgradeInput{ + ClusterProxy: input.ClusterProxy, + Cluster: resources.cluster, + ControlPlane: resources.controlPlane, + MachineDeployments: resources.machineDeployments, + KubernetesUpgradeVersion: input.KubernetesUpgradeVersion, + WaitForMachinesToBeUpgraded: input.WaitForMachinesToBeUpgraded, + WaitForKubeProxyUpgrade: input.WaitForKubeProxyUpgrade, + WaitForDNSUpgrade: input.WaitForDNSUpgrade, + WaitForEtcdUpgrade: input.WaitForEtcdUpgrade, + // TODO: (killianmuldoon) Checking the kube-proxy, etcd and DNS version doesn't work as we can't access the control plane endpoint for the workload cluster + // from the host. Need to figure out a way to route the calls to the workload Cluster correctly. + EtcdImageTag: "", + DNSImageTag: "", + SkipKubeProxyCheck: true, + }) + } +} + +func upgradeClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, resultChan chan<- workResult, wg *sync.WaitGroup, defaultNamespace string, deployClusterInSeparateNamespaces bool, clusterTemplateYAML []byte, upgrade clusterUpgrader) { + defer wg.Done() + + for { + done := func() bool { + select { + case <-ctx.Done(): + // If the context is cancelled, return and shutdown the worker. + return true + case clusterName, open := <-inputChan: + // Read the cluster name from the channel. + // If the channel is closed it implies there is no more work to be done. Return. + if !open { + return true + } + log.Logf("Upgrading cluster %s", clusterName) + + // This defer will catch ginkgo failures and record them. + // The recorded panics are then handled by the parent goroutine. + defer func() { + e := recover() + resultChan <- workResult{ + clusterName: clusterName, + err: e, + } + }() + + // Calculate namespace. + namespaceName := defaultNamespace + if deployClusterInSeparateNamespaces { + namespaceName = clusterName + } + upgrade(ctx, namespaceName, clusterName, clusterTemplateYAML) + return false + } + }() + if done { + break + } + } +} + +type clusterResources struct { + cluster *clusterv1.Cluster + machineDeployments []*clusterv1.MachineDeployment + controlPlane *controlplanev1.KubeadmControlPlane +} + +func getClusterResourcesForUpgrade(ctx context.Context, c client.Client, namespace, clusterName string) clusterResources { + cluster := &clusterv1.Cluster{} + err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, cluster) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting Cluster %s: %s", klog.KRef(namespace, clusterName), err)) + + controlPlane := &controlplanev1.KubeadmControlPlane{} + err = c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: cluster.Spec.ControlPlaneRef.Name}, controlPlane) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting ControlPlane for Cluster %s: %s,", klog.KObj(cluster), err)) + + mds := []*clusterv1.MachineDeployment{} + machineDeployments := &clusterv1.MachineDeploymentList{} + err = c.List(ctx, machineDeployments, + client.MatchingLabels{ + clusterv1.ClusterNameLabel: cluster.Name, + clusterv1.ClusterTopologyOwnedLabel: "", + }, + client.InNamespace(cluster.Namespace), + ) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting MachineDeployments for Cluster %s: %s", klog.KObj(cluster), err)) + for _, md := range machineDeployments.Items { + mds = append(mds, md.DeepCopy()) + } + + return clusterResources{ + cluster: cluster, + machineDeployments: mds, + controlPlane: controlPlane, + } +} + type workResult struct { clusterName string err any diff --git a/test/e2e/scale_test.go b/test/e2e/scale_test.go index b804cec88590..ac184f78b66d 100644 --- a/test/e2e/scale_test.go +++ b/test/e2e/scale_test.go @@ -25,6 +25,7 @@ import ( ) var _ = Describe("When testing the machinery for scale testing using in-memory provider", func() { + // Note: This test does not support MachinePools. scaleSpec(ctx, func() scaleSpecInput { return scaleSpecInput{ E2EConfig: e2eConfig, diff --git a/test/framework/cluster_topology_helpers.go b/test/framework/cluster_topology_helpers.go index 13364d88779b..2c83da78d80c 100644 --- a/test/framework/cluster_topology_helpers.go +++ b/test/framework/cluster_topology_helpers.go @@ -75,6 +75,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct { WaitForEtcdUpgrade []interface{} PreWaitForControlPlaneToBeUpgraded func() PreWaitForWorkersToBeUpgraded func() + SkipKubeProxyCheck bool } // UpgradeClusterTopologyAndWaitForUpgrade upgrades a Cluster topology and waits for it to be upgraded. @@ -123,13 +124,16 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC KubernetesUpgradeVersion: input.KubernetesUpgradeVersion, }, input.WaitForMachinesToBeUpgraded...) - log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version") workloadCluster := input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name) workloadClient := workloadCluster.GetClient() - WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{ - Getter: workloadClient, - KubernetesVersion: input.KubernetesUpgradeVersion, - }, input.WaitForKubeProxyUpgrade...) + + if !input.SkipKubeProxyCheck { + log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version") + WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{ + Getter: workloadClient, + KubernetesVersion: input.KubernetesUpgradeVersion, + }, input.WaitForKubeProxyUpgrade...) + } // Wait for the CoreDNS upgrade if the DNSImageTag is set. if input.DNSImageTag != "" { diff --git a/test/framework/controlplane_helpers.go b/test/framework/controlplane_helpers.go index bb32c41f6819..301a43237f5e 100644 --- a/test/framework/controlplane_helpers.go +++ b/test/framework/controlplane_helpers.go @@ -393,13 +393,6 @@ func UpgradeControlPlaneAndWaitForUpgrade(ctx context.Context, input UpgradeCont } } -// controlPlaneMachineOptions returns a set of ListOptions that allows to get all machine objects belonging to control plane. -func controlPlaneMachineOptions() []client.ListOption { - return []client.ListOption{ - client.HasLabels{clusterv1.MachineControlPlaneLabel}, - } -} - type ScaleAndWaitControlPlaneInput struct { ClusterProxy ClusterProxy Cluster *clusterv1.Cluster diff --git a/test/framework/machine_helpers.go b/test/framework/machine_helpers.go index 8ff8b1480178..6ee3eb867979 100644 --- a/test/framework/machine_helpers.go +++ b/test/framework/machine_helpers.go @@ -127,7 +127,13 @@ func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlane Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster") Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster") - options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...) + options := []client.ListOption{ + client.InNamespace(input.Namespace), + client.MatchingLabels{ + clusterv1.ClusterNameLabel: input.ClusterName, + clusterv1.MachineControlPlaneLabel: "", + }, + } machineList := &clusterv1.MachineList{} Eventually(func() error { diff --git a/test/infrastructure/inmemory/pkg/server/api/handler.go b/test/infrastructure/inmemory/pkg/server/api/handler.go index 418d57652247..8f5636dc916b 100644 --- a/test/infrastructure/inmemory/pkg/server/api/handler.go +++ b/test/infrastructure/inmemory/pkg/server/api/handler.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -317,16 +318,24 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace"))) } - // TODO: The only field selector which works is for `spec.nodeName` on pods. - selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector")) + // TODO: The only field Selector which works is for `spec.nodeName` on pods. + fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector")) if err != nil { _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) return } - if selector != nil { - listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector}) + if fieldSelector != nil { + listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector}) } + labelSelector, err := labels.Parse(req.QueryParameter("labelSelector")) + if err != nil { + _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) + return + } + if labelSelector != nil { + listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector}) + } if err := inmemoryClient.List(ctx, list, listOpts...); err != nil { if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) { _ = resp.WriteHeaderAndEntity(int(status.Status().Code), status) diff --git a/test/infrastructure/inmemory/pkg/server/mux_test.go b/test/infrastructure/inmemory/pkg/server/mux_test.go index 7aba29c2b524..6390583d3a89 100644 --- a/test/infrastructure/inmemory/pkg/server/mux_test.go +++ b/test/infrastructure/inmemory/pkg/server/mux_test.go @@ -36,6 +36,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -155,7 +156,7 @@ func TestAPI_corev1_CRUD(t *testing.T) { Spec: corev1.PodSpec{NodeName: n.Name}, })).To(Succeed()) g.Expect(c.Create(ctx, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault}, + ObjectMeta: metav1.ObjectMeta{Name: "labelSelectedPod", Namespace: metav1.NamespaceDefault, Labels: map[string]string{"name": "labelSelectedPod"}}, })).To(Succeed()) pl := &corev1.PodList{} @@ -166,8 +167,15 @@ func TestAPI_corev1_CRUD(t *testing.T) { g.Expect(pl.Items).To(HaveLen(1)) g.Expect(pl.Items[0].Name).To(Equal("bar")) - // get + // list with label selector on pod + labelSelector := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{"name": "labelSelectedPod"}), + } + g.Expect(c.List(ctx, pl, labelSelector)).To(Succeed()) + g.Expect(pl.Items).To(HaveLen(1)) + g.Expect(pl.Items[0].Name).To(Equal("labelSelectedPod")) + // get n = &corev1.Node{} err = c.Get(ctx, client.ObjectKey{Name: "foo"}, n) g.Expect(err).ToNot(HaveOccurred())