Skip to content

Commit

Permalink
Add Upgrade flow to scale test
Browse files Browse the repository at this point in the history
Signed-off-by: killianmuldoon <kmuldoon@vmware.com>
  • Loading branch information
killianmuldoon committed Jul 25, 2023
1 parent ce33c29 commit 846b51a
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 36 deletions.
156 changes: 153 additions & 3 deletions test/e2e/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/test/e2e/internal/log"
"sigs.k8s.io/cluster-api/test/framework"
"sigs.k8s.io/cluster-api/test/framework/clusterctl"
Expand Down Expand Up @@ -123,6 +124,9 @@ type scaleSpecInput struct {
// if the test suit should fail as soon as c6 fails or if it should fail after all cluster creations are done.
FailFast bool

// SkipUpgrade if set to true will skip upgrading the workload clusters.
SkipUpgrade bool

// SkipCleanup if set to true will skip deleting the workload clusters.
SkipCleanup bool

Expand Down Expand Up @@ -253,7 +257,7 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Flavor: flavor,
Namespace: scaleClusterNamespacePlaceholder,
ClusterName: scaleClusterNamePlaceholder,
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersion),
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeFrom),
ControlPlaneMachineCount: controlPlaneMachineCount,
WorkerMachineCount: workerMachineCount,
})
Expand Down Expand Up @@ -333,6 +337,49 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Fail("")
}

if !input.SkipUpgrade {
By("Upgrade the workload clusters concurrently")
// Use the correct creator function for creating the workload clusters.
// Default to using the "create and wait" creator function. If SkipWaitForCreation=true then
// use the "create only" creator function.
upgrader := getClusterUpgradeAndWaitFn(framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.BootstrapClusterProxy,
KubernetesUpgradeVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeTo),
EtcdImageTag: input.E2EConfig.GetVariable(EtcdVersionUpgradeTo),
DNSImageTag: input.E2EConfig.GetVariable(CoreDNSVersionUpgradeTo),
WaitForMachinesToBeUpgraded: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
})

clusterNamesToUpgrade := []string{}
for _, result := range clusterCreateResults {
clusterNamesToUpgrade = append(clusterNamesToUpgrade, result.clusterName)
}

// Upgrade all the workload clusters.
_, err = workConcurrentlyAndWait(ctx, workConcurrentlyAndWaitInput{
ClusterNames: clusterNamesToUpgrade,
Concurrency: concurrency,
FailFast: input.FailFast,
WorkerFunc: func(ctx context.Context, inputChan chan string, resultChan chan workResult, wg *sync.WaitGroup) {
upgradeClusterAndWaitWorker(ctx, inputChan, resultChan, wg, namespace.Name, input.DeployClusterInSeparateNamespaces, baseClusterTemplateYAML, upgrader)
},
})
if err != nil {
// Call Fail to notify ginkgo that the suit has failed.
// Ginkgo will print the first observed error failure in this case.
// Example: If cluster c1, c2 and c3 failed then ginkgo will only print the first
// observed failure among the these 3 clusters.
// Since ginkgo only captures one failure, to help with this we are logging the error
// that will contain the full stack trace of failure for each cluster to help with debugging.
// TODO(ykakarap): Follow-up: Explore options for improved error reporting.
log.Logf("Failed to upgrade clusters. Error: %s", err.Error())
Fail("")
}
}

// TODO(ykakarap): Follow-up: Dump resources for the failed clusters (creation).

clusterNamesToDelete := []string{}
Expand Down Expand Up @@ -494,11 +541,9 @@ func getClusterCreateAndWaitFn(input clusterctl.ApplyCustomClusterTemplateAndWai
CustomTemplateYAML: clusterTemplateYAML,
ClusterName: clusterName,
Namespace: namespace,
CNIManifestPath: input.CNIManifestPath,
WaitForClusterIntervals: input.WaitForClusterIntervals,
WaitForControlPlaneIntervals: input.WaitForControlPlaneIntervals,
WaitForMachineDeployments: input.WaitForMachineDeployments,
WaitForMachinePools: input.WaitForMachinePools,
Args: input.Args,
PreWaitForCluster: input.PreWaitForCluster,
PostMachinesProvisioned: input.PostMachinesProvisioned,
Expand Down Expand Up @@ -648,6 +693,111 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
}
}

type clusterUpgrader func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte)

func getClusterUpgradeAndWaitFn(input framework.UpgradeClusterTopologyAndWaitForUpgradeInput) clusterUpgrader {
return func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) {
resources := getClusterResourcesForUpgrade(ctx, input.ClusterProxy.GetClient(), namespace, clusterName)
// Nb. We cannot directly modify and use `input` in this closure function because this function
// will be called multiple times and this closure will keep modifying the same `input` multiple
// times. It is safer to pass the values explicitly into `UpgradeClusterTopologyAndWaitForUpgradeInput`.
framework.UpgradeClusterTopologyAndWaitForUpgrade(ctx, framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.ClusterProxy,
Cluster: resources.cluster,
ControlPlane: resources.controlPlane,
MachineDeployments: resources.machineDeployments,
EtcdImageTag: "",
DNSImageTag: "",
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
WaitForMachinesToBeUpgraded: input.WaitForMachinesToBeUpgraded,
WaitForKubeProxyUpgrade: input.WaitForKubeProxyUpgrade,
WaitForDNSUpgrade: input.WaitForDNSUpgrade,
WaitForEtcdUpgrade: input.WaitForEtcdUpgrade,
// TODO: (killianmuldoon) Checking the kube-proxy version doesn't work as we can't access the control plane endpoint for the workload cluster
// from the host. Need to figure out a way to route the calls to the workload Cluster correctly.
SkipKubeProxyCheck: true,
})
}
}

func upgradeClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, resultChan chan<- workResult, wg *sync.WaitGroup, defaultNamespace string, deployClusterInSeparateNamespaces bool, clusterTemplateYAML []byte, upgrade clusterUpgrader) {
defer wg.Done()

for {
done := func() bool {
select {
case <-ctx.Done():
// If the context is cancelled, return and shutdown the worker.
return true
case clusterName, open := <-inputChan:
// Read the cluster name from the channel.
// If the channel is closed it implies there is not more work to be done. Return.
if !open {
return true
}
log.Logf("Upgrading cluster %s", clusterName)

// This defer will catch ginkgo failures and record them.
// The recorded panics are then handled by the parent goroutine.
defer func() {
e := recover()
resultChan <- workResult{
clusterName: clusterName,
err: e,
}
}()

// Calculate namespace.
namespaceName := defaultNamespace
if deployClusterInSeparateNamespaces {
namespaceName = clusterName
}
upgrade(ctx, namespaceName, clusterName, clusterTemplateYAML)
return false
}
}()
if done {
break
}
}
}

type clusterResources struct {
cluster *clusterv1.Cluster
machineDeployments []*clusterv1.MachineDeployment
controlPlane *controlplanev1.KubeadmControlPlane
}

func getClusterResourcesForUpgrade(ctx context.Context, c client.Client, namespace, clusterName string) clusterResources {
cluster := &clusterv1.Cluster{}
err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, cluster)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting Cluster %s: %s", klog.KRef(namespace, clusterName), err))

controlPlane := &controlplanev1.KubeadmControlPlane{}
err = c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: cluster.Spec.ControlPlaneRef.Name}, controlPlane)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting ControlPlane for Cluster %s: %s,", klog.KObj(cluster), err))

mds := []*clusterv1.MachineDeployment{}
machineDeployments := &clusterv1.MachineDeploymentList{}
err = c.List(ctx, machineDeployments,
client.MatchingLabels{
clusterv1.ClusterNameLabel: cluster.Name,
clusterv1.ClusterTopologyOwnedLabel: "",
},
client.InNamespace(cluster.Namespace),
)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting MachineDeployments for Cluster %s: %s", klog.KObj(cluster), err))
for _, md := range machineDeployments.Items {
mds = append(mds, &md)

Check failure on line 791 in test/e2e/scale.go

View workflow job for this annotation

GitHub Actions / lint (test)

G601: Implicit memory aliasing in for loop. (gosec)
}

return clusterResources{
cluster: cluster,
machineDeployments: mds,
controlPlane: controlPlane,
}
}

type workResult struct {
clusterName string
err any
Expand Down
28 changes: 15 additions & 13 deletions test/e2e/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import (
"k8s.io/utils/pointer"
)

var _ = Describe("When scale testing using in-memory provider [Scale]", func() {
var _ = Describe("When scale testing using in-memory provider with upgrades [Scale]", func() {
scaleSpec(ctx, func() scaleSpecInput {
return scaleSpecInput{
E2EConfig: e2eConfig,
ClusterctlConfigPath: clusterctlConfigPath,
InfrastructureProvider: pointer.String("in-memory"),
BootstrapClusterProxy: bootstrapClusterProxy,
ArtifactFolder: artifactFolder,
ClusterCount: pointer.Int64(10),
Concurrency: pointer.Int64(5),
Flavor: pointer.String(""),
ControlPlaneMachineCount: pointer.Int64(1),
MachineDeploymentCount: pointer.Int64(1),
WorkerMachineCount: pointer.Int64(3),
SkipCleanup: skipCleanup,
E2EConfig: e2eConfig,
ClusterctlConfigPath: clusterctlConfigPath,
InfrastructureProvider: pointer.String("in-memory"),
BootstrapClusterProxy: bootstrapClusterProxy,
ArtifactFolder: artifactFolder,
ClusterCount: pointer.Int64(10),
Concurrency: pointer.Int64(5),
Flavor: pointer.String(""),
ControlPlaneMachineCount: pointer.Int64(1),
MachineDeploymentCount: pointer.Int64(1),
WorkerMachineCount: pointer.Int64(3),
SkipCleanup: skipCleanup,
SkipUpgrade: false,
DeployClusterInSeparateNamespaces: true,
}
})
})
14 changes: 9 additions & 5 deletions test/framework/cluster_topology_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct {
WaitForEtcdUpgrade []interface{}
PreWaitForControlPlaneToBeUpgraded func()
PreWaitForMachineDeploymentToBeUpgraded func()
SkipKubeProxyCheck bool
}

// UpgradeClusterTopologyAndWaitForUpgrade upgrades a Cluster topology and waits for it to be upgraded.
Expand Down Expand Up @@ -121,13 +122,16 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
}, input.WaitForMachinesToBeUpgraded...)

log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
workloadCluster := input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name)
workloadClient := workloadCluster.GetClient()
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)

if !input.SkipKubeProxyCheck {
log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)
}

// Wait for the CoreDNS upgrade if the DNSImageTag is set.
if input.DNSImageTag != "" {
Expand Down
2 changes: 1 addition & 1 deletion test/framework/clusterctl/clusterctl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func ApplyCustomClusterTemplateAndWait(ctx context.Context, input ApplyCustomClu
Expect(input.Namespace).NotTo(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling ApplyCustomClusterTemplateAndWait")
Expect(result).ToNot(BeNil(), "Invalid argument. result can't be nil when calling ApplyClusterTemplateAndWait")

log.Logf("Creating the workload cluster with name %q form the provided yaml", input.ClusterName)
log.Logf("Creating the workload cluster with name %q from the provided yaml", input.ClusterName)

// Ensure we have a Cluster for dump and cleanup steps in AfterEach even if ApplyClusterTemplateAndWait fails.
result.Cluster = &clusterv1.Cluster{
Expand Down
7 changes: 0 additions & 7 deletions test/framework/controlplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,6 @@ func UpgradeControlPlaneAndWaitForUpgrade(ctx context.Context, input UpgradeCont
}, input.WaitForEtcdUpgrade...)
}

// controlPlaneMachineOptions returns a set of ListOptions that allows to get all machine objects belonging to control plane.
func controlPlaneMachineOptions() []client.ListOption {
return []client.ListOption{
client.HasLabels{clusterv1.MachineControlPlaneLabel},
}
}

type ScaleAndWaitControlPlaneInput struct {
ClusterProxy ClusterProxy
Cluster *clusterv1.Cluster
Expand Down
8 changes: 7 additions & 1 deletion test/framework/machine_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlane
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster")
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster")

options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...)
options := []client.ListOption{
client.InNamespace(input.Namespace),
client.MatchingLabels{
clusterv1.ClusterNameLabel: input.ClusterName,
clusterv1.MachineControlPlaneLabel: "",
},
}

machineList := &clusterv1.MachineList{}
Eventually(func() error {
Expand Down
17 changes: 13 additions & 4 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -313,16 +314,24 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
}

// TODO: The only field selector which works is for `spec.nodeName` on pods.
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if selector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
if fieldSelector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
}

labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if labelSelector != nil {
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
}
if err := cloudClient.List(ctx, list, listOpts...); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
Expand Down
12 changes: 10 additions & 2 deletions test/infrastructure/inmemory/internal/server/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestAPI_corev1_CRUD(t *testing.T) {
Spec: corev1.PodSpec{NodeName: n.Name},
})).To(Succeed())
g.Expect(c.Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault},
ObjectMeta: metav1.ObjectMeta{Name: "labelSelectedPod", Namespace: metav1.NamespaceDefault, Labels: map[string]string{"name": "labelSelectedPod"}},
})).To(Succeed())

pl := &corev1.PodList{}
Expand All @@ -166,8 +167,15 @@ func TestAPI_corev1_CRUD(t *testing.T) {
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("bar"))

// get
// list with label selector on pod
labelSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"name": "labelSelectedPod"}),
}
g.Expect(c.List(ctx, pl, labelSelector)).To(Succeed())
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("labelSelectedPod"))

// get
n = &corev1.Node{}
err = c.Get(ctx, client.ObjectKey{Name: "foo"}, n)
g.Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit 846b51a

Please sign in to comment.