Skip to content

Commit

Permalink
Add Upgrade flow to scale test
Browse files Browse the repository at this point in the history
Signed-off-by: killianmuldoon <kmuldoon@vmware.com>
  • Loading branch information
killianmuldoon committed Jan 22, 2024
1 parent cd7de82 commit b1598e8
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 22 deletions.
156 changes: 153 additions & 3 deletions test/e2e/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/test/e2e/internal/log"
"sigs.k8s.io/cluster-api/test/framework"
"sigs.k8s.io/cluster-api/test/framework/clusterctl"
Expand Down Expand Up @@ -123,6 +124,9 @@ type scaleSpecInput struct {
// if the test suit should fail as soon as c6 fails or if it should fail after all cluster creations are done.
FailFast bool

// SkipUpgrade if set to true will skip upgrading the workload clusters.
SkipUpgrade bool

// SkipCleanup if set to true will skip deleting the workload clusters.
SkipCleanup bool

Expand Down Expand Up @@ -253,7 +257,7 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Flavor: flavor,
Namespace: scaleClusterNamespacePlaceholder,
ClusterName: scaleClusterNamePlaceholder,
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersion),
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeFrom),
ControlPlaneMachineCount: controlPlaneMachineCount,
WorkerMachineCount: workerMachineCount,
})
Expand Down Expand Up @@ -333,6 +337,49 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Fail("")
}

if !input.SkipUpgrade {
By("Upgrade the workload clusters concurrently")
// Use the correct creator function for creating the workload clusters.
// Default to using the "create and wait" creator function. If SkipWaitForCreation=true then
// use the "create only" creator function.
upgrader := getClusterUpgradeAndWaitFn(framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.BootstrapClusterProxy,
KubernetesUpgradeVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeTo),
EtcdImageTag: input.E2EConfig.GetVariable(EtcdVersionUpgradeTo),
DNSImageTag: input.E2EConfig.GetVariable(CoreDNSVersionUpgradeTo),
WaitForMachinesToBeUpgraded: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
})

clusterNamesToUpgrade := []string{}
for _, result := range clusterCreateResults {
clusterNamesToUpgrade = append(clusterNamesToUpgrade, result.clusterName)
}

// Upgrade all the workload clusters.
_, err = workConcurrentlyAndWait(ctx, workConcurrentlyAndWaitInput{
ClusterNames: clusterNamesToUpgrade,
Concurrency: concurrency,
FailFast: input.FailFast,
WorkerFunc: func(ctx context.Context, inputChan chan string, resultChan chan workResult, wg *sync.WaitGroup) {
upgradeClusterAndWaitWorker(ctx, inputChan, resultChan, wg, namespace.Name, input.DeployClusterInSeparateNamespaces, baseClusterTemplateYAML, upgrader)
},
})
if err != nil {
// Call Fail to notify ginkgo that the suit has failed.
// Ginkgo will print the first observed error failure in this case.
// Example: If cluster c1, c2 and c3 failed then ginkgo will only print the first
// observed failure among the these 3 clusters.
// Since ginkgo only captures one failure, to help with this we are logging the error
// that will contain the full stack trace of failure for each cluster to help with debugging.
// TODO(ykakarap): Follow-up: Explore options for improved error reporting.
log.Logf("Failed to upgrade clusters. Error: %s", err.Error())
Fail("")
}
}

// TODO(ykakarap): Follow-up: Dump resources for the failed clusters (creation).

clusterNamesToDelete := []string{}
Expand Down Expand Up @@ -494,11 +541,9 @@ func getClusterCreateAndWaitFn(input clusterctl.ApplyCustomClusterTemplateAndWai
CustomTemplateYAML: clusterTemplateYAML,
ClusterName: clusterName,
Namespace: namespace,
CNIManifestPath: input.CNIManifestPath,
WaitForClusterIntervals: input.WaitForClusterIntervals,
WaitForControlPlaneIntervals: input.WaitForControlPlaneIntervals,
WaitForMachineDeployments: input.WaitForMachineDeployments,
WaitForMachinePools: input.WaitForMachinePools,
Args: input.Args,
PreWaitForCluster: input.PreWaitForCluster,
PostMachinesProvisioned: input.PostMachinesProvisioned,
Expand Down Expand Up @@ -648,6 +693,111 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
}
}

type clusterUpgrader func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte)

func getClusterUpgradeAndWaitFn(input framework.UpgradeClusterTopologyAndWaitForUpgradeInput) clusterUpgrader {
return func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) {
resources := getClusterResourcesForUpgrade(ctx, input.ClusterProxy.GetClient(), namespace, clusterName)
// Nb. We cannot directly modify and use `input` in this closure function because this function
// will be called multiple times and this closure will keep modifying the same `input` multiple
// times. It is safer to pass the values explicitly into `UpgradeClusterTopologyAndWaitForUpgradeInput`.
framework.UpgradeClusterTopologyAndWaitForUpgrade(ctx, framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.ClusterProxy,
Cluster: resources.cluster,
ControlPlane: resources.controlPlane,
MachineDeployments: resources.machineDeployments,
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
WaitForMachinesToBeUpgraded: input.WaitForMachinesToBeUpgraded,
WaitForKubeProxyUpgrade: input.WaitForKubeProxyUpgrade,
WaitForDNSUpgrade: input.WaitForDNSUpgrade,
WaitForEtcdUpgrade: input.WaitForEtcdUpgrade,
// TODO: (killianmuldoon) Checking the kube-proxy, etcd and DNS version doesn't work as we can't access the control plane endpoint for the workload cluster
// from the host. Need to figure out a way to route the calls to the workload Cluster correctly.
EtcdImageTag: "",
DNSImageTag: "",
SkipKubeProxyCheck: true,
})
}
}

func upgradeClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, resultChan chan<- workResult, wg *sync.WaitGroup, defaultNamespace string, deployClusterInSeparateNamespaces bool, clusterTemplateYAML []byte, upgrade clusterUpgrader) {
defer wg.Done()

for {
done := func() bool {
select {
case <-ctx.Done():
// If the context is cancelled, return and shutdown the worker.
return true
case clusterName, open := <-inputChan:
// Read the cluster name from the channel.
// If the channel is closed it implies there is not more work to be done. Return.
if !open {
return true
}
log.Logf("Upgrading cluster %s", clusterName)

// This defer will catch ginkgo failures and record them.
// The recorded panics are then handled by the parent goroutine.
defer func() {
e := recover()
resultChan <- workResult{
clusterName: clusterName,
err: e,
}
}()

// Calculate namespace.
namespaceName := defaultNamespace
if deployClusterInSeparateNamespaces {
namespaceName = clusterName
}
upgrade(ctx, namespaceName, clusterName, clusterTemplateYAML)
return false
}
}()
if done {
break
}
}
}

type clusterResources struct {
cluster *clusterv1.Cluster
machineDeployments []*clusterv1.MachineDeployment
controlPlane *controlplanev1.KubeadmControlPlane
}

func getClusterResourcesForUpgrade(ctx context.Context, c client.Client, namespace, clusterName string) clusterResources {
cluster := &clusterv1.Cluster{}
err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, cluster)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting Cluster %s: %s", klog.KRef(namespace, clusterName), err))

controlPlane := &controlplanev1.KubeadmControlPlane{}
err = c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: cluster.Spec.ControlPlaneRef.Name}, controlPlane)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting ControlPlane for Cluster %s: %s,", klog.KObj(cluster), err))

mds := []*clusterv1.MachineDeployment{}
machineDeployments := &clusterv1.MachineDeploymentList{}
err = c.List(ctx, machineDeployments,
client.MatchingLabels{
clusterv1.ClusterNameLabel: cluster.Name,
clusterv1.ClusterTopologyOwnedLabel: "",
},
client.InNamespace(cluster.Namespace),
)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting MachineDeployments for Cluster %s: %s", klog.KObj(cluster), err))
for _, md := range machineDeployments.Items {
mds = append(mds, md.DeepCopy())
}

return clusterResources{
cluster: cluster,
machineDeployments: mds,
controlPlane: controlPlane,
}
}

type workResult struct {
clusterName string
err any
Expand Down
14 changes: 9 additions & 5 deletions test/framework/cluster_topology_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct {
WaitForEtcdUpgrade []interface{}
PreWaitForControlPlaneToBeUpgraded func()
PreWaitForWorkersToBeUpgraded func()
SkipKubeProxyCheck bool
}

// UpgradeClusterTopologyAndWaitForUpgrade upgrades a Cluster topology and waits for it to be upgraded.
Expand Down Expand Up @@ -123,13 +124,16 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
}, input.WaitForMachinesToBeUpgraded...)

log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
workloadCluster := input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name)
workloadClient := workloadCluster.GetClient()
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)

if !input.SkipKubeProxyCheck {
log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)
}

// Wait for the CoreDNS upgrade if the DNSImageTag is set.
if input.DNSImageTag != "" {
Expand Down
7 changes: 0 additions & 7 deletions test/framework/controlplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,6 @@ func UpgradeControlPlaneAndWaitForUpgrade(ctx context.Context, input UpgradeCont
}
}

// controlPlaneMachineOptions returns a set of ListOptions that allows to get all machine objects belonging to control plane.
func controlPlaneMachineOptions() []client.ListOption {
return []client.ListOption{
client.HasLabels{clusterv1.MachineControlPlaneLabel},
}
}

type ScaleAndWaitControlPlaneInput struct {
ClusterProxy ClusterProxy
Cluster *clusterv1.Cluster
Expand Down
8 changes: 7 additions & 1 deletion test/framework/machine_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlane
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster")
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster")

options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...)
options := []client.ListOption{
client.InNamespace(input.Namespace),
client.MatchingLabels{
clusterv1.ClusterNameLabel: input.ClusterName,
clusterv1.MachineControlPlaneLabel: "",
},
}

machineList := &clusterv1.MachineList{}
Eventually(func() error {
Expand Down
17 changes: 13 additions & 4 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -317,16 +318,24 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
}

// TODO: The only field selector which works is for `spec.nodeName` on pods.
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if selector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
if fieldSelector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
}

labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if labelSelector != nil {
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
}
if err := cloudClient.List(ctx, list, listOpts...); err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
Expand Down
12 changes: 10 additions & 2 deletions test/infrastructure/inmemory/internal/server/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestAPI_corev1_CRUD(t *testing.T) {
Spec: corev1.PodSpec{NodeName: n.Name},
})).To(Succeed())
g.Expect(c.Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault},
ObjectMeta: metav1.ObjectMeta{Name: "labelSelectedPod", Namespace: metav1.NamespaceDefault, Labels: map[string]string{"name": "labelSelectedPod"}},
})).To(Succeed())

pl := &corev1.PodList{}
Expand All @@ -166,8 +167,15 @@ func TestAPI_corev1_CRUD(t *testing.T) {
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("bar"))

// get
// list with label selector on pod
labelSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"name": "labelSelectedPod"}),
}
g.Expect(c.List(ctx, pl, labelSelector)).To(Succeed())
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("labelSelectedPod"))

// get
n = &corev1.Node{}
err = c.Get(ctx, client.ObjectKey{Name: "foo"}, n)
g.Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit b1598e8

Please sign in to comment.