Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add scale testing for upgrades #9077

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 153 additions & 5 deletions test/e2e/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/test/e2e/internal/log"
"sigs.k8s.io/cluster-api/test/framework"
"sigs.k8s.io/cluster-api/test/framework/clusterctl"
Expand Down Expand Up @@ -123,6 +124,9 @@ type scaleSpecInput struct {
// if the test suit should fail as soon as c6 fails or if it should fail after all cluster creations are done.
FailFast bool

// SkipUpgrade if set to true will skip upgrading the workload clusters.
SkipUpgrade bool

// SkipCleanup if set to true will skip deleting the workload clusters.
SkipCleanup bool

Expand All @@ -133,7 +137,7 @@ type scaleSpecInput struct {
SkipWaitForCreation bool
}

// scaleSpec implements a scale test.
// scaleSpec implements a scale test for clusters with MachineDeployments.
func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
var (
specName = "scale"
Expand Down Expand Up @@ -253,7 +257,7 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Flavor: flavor,
Namespace: scaleClusterNamespacePlaceholder,
ClusterName: scaleClusterNamePlaceholder,
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersion),
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeFrom),
ControlPlaneMachineCount: controlPlaneMachineCount,
WorkerMachineCount: workerMachineCount,
})
Expand Down Expand Up @@ -333,6 +337,47 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
Fail("")
}

if !input.SkipUpgrade {
By("Upgrade the workload clusters concurrently")
// Get the upgrade function for upgrading the workload clusters.
upgrader := getClusterUpgradeAndWaitFn(framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.BootstrapClusterProxy,
KubernetesUpgradeVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeTo),
EtcdImageTag: input.E2EConfig.GetVariable(EtcdVersionUpgradeTo),
DNSImageTag: input.E2EConfig.GetVariable(CoreDNSVersionUpgradeTo),
WaitForMachinesToBeUpgraded: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
})

clusterNamesToUpgrade := []string{}
for _, result := range clusterCreateResults {
clusterNamesToUpgrade = append(clusterNamesToUpgrade, result.clusterName)
}

// Upgrade all the workload clusters.
_, err = workConcurrentlyAndWait(ctx, workConcurrentlyAndWaitInput{
ClusterNames: clusterNamesToUpgrade,
Concurrency: concurrency,
FailFast: input.FailFast,
WorkerFunc: func(ctx context.Context, inputChan chan string, resultChan chan workResult, wg *sync.WaitGroup) {
upgradeClusterAndWaitWorker(ctx, inputChan, resultChan, wg, namespace.Name, input.DeployClusterInSeparateNamespaces, baseClusterTemplateYAML, upgrader)
},
})
if err != nil {
// Call Fail to notify ginkgo that the suit has failed.
// Ginkgo will print the first observed error failure in this case.
// Example: If cluster c1, c2 and c3 failed then ginkgo will only print the first
// observed failure among the these 3 clusters.
// Since ginkgo only captures one failure, to help with this we are logging the error
// that will contain the full stack trace of failure for each cluster to help with debugging.
// TODO(ykakarap): Follow-up: Explore options for improved error reporting.
log.Logf("Failed to upgrade clusters. Error: %s", err.Error())
Fail("")
}
}

// TODO(ykakarap): Follow-up: Dump resources for the failed clusters (creation).

clusterNamesToDelete := []string{}
Expand Down Expand Up @@ -494,11 +539,9 @@ func getClusterCreateAndWaitFn(input clusterctl.ApplyCustomClusterTemplateAndWai
CustomTemplateYAML: clusterTemplateYAML,
ClusterName: clusterName,
Namespace: namespace,
CNIManifestPath: input.CNIManifestPath,
WaitForClusterIntervals: input.WaitForClusterIntervals,
WaitForControlPlaneIntervals: input.WaitForControlPlaneIntervals,
WaitForMachineDeployments: input.WaitForMachineDeployments,
WaitForMachinePools: input.WaitForMachinePools,
Args: input.Args,
PreWaitForCluster: input.PreWaitForCluster,
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
PostMachinesProvisioned: input.PostMachinesProvisioned,
Expand Down Expand Up @@ -594,7 +637,7 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
return true
case clusterName, open := <-inputChan:
// Read the cluster name from the channel.
// If the channel is closed it implies there is not more work to be done. Return.
// If the channel is closed it implies there is no more work to be done. Return.
if !open {
return true
}
Expand Down Expand Up @@ -648,6 +691,111 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
}
}

type clusterUpgrader func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte)

func getClusterUpgradeAndWaitFn(input framework.UpgradeClusterTopologyAndWaitForUpgradeInput) clusterUpgrader {
return func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) {
resources := getClusterResourcesForUpgrade(ctx, input.ClusterProxy.GetClient(), namespace, clusterName)
// Nb. We cannot directly modify and use `input` in this closure function because this function
// will be called multiple times and this closure will keep modifying the same `input` multiple
// times. It is safer to pass the values explicitly into `UpgradeClusterTopologyAndWaitForUpgradeInput`.
framework.UpgradeClusterTopologyAndWaitForUpgrade(ctx, framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
ClusterProxy: input.ClusterProxy,
Cluster: resources.cluster,
ControlPlane: resources.controlPlane,
MachineDeployments: resources.machineDeployments,
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
WaitForMachinesToBeUpgraded: input.WaitForMachinesToBeUpgraded,
WaitForKubeProxyUpgrade: input.WaitForKubeProxyUpgrade,
WaitForDNSUpgrade: input.WaitForDNSUpgrade,
WaitForEtcdUpgrade: input.WaitForEtcdUpgrade,
// TODO: (killianmuldoon) Checking the kube-proxy, etcd and DNS version doesn't work as we can't access the control plane endpoint for the workload cluster
// from the host. Need to figure out a way to route the calls to the workload Cluster correctly.
EtcdImageTag: "",
DNSImageTag: "",
SkipKubeProxyCheck: true,
})
}
}

func upgradeClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, resultChan chan<- workResult, wg *sync.WaitGroup, defaultNamespace string, deployClusterInSeparateNamespaces bool, clusterTemplateYAML []byte, upgrade clusterUpgrader) {
defer wg.Done()

for {
done := func() bool {
select {
case <-ctx.Done():
// If the context is cancelled, return and shutdown the worker.
return true
case clusterName, open := <-inputChan:
// Read the cluster name from the channel.
// If the channel is closed it implies there is no more work to be done. Return.
if !open {
return true
}
log.Logf("Upgrading cluster %s", clusterName)

// This defer will catch ginkgo failures and record them.
// The recorded panics are then handled by the parent goroutine.
defer func() {
e := recover()
resultChan <- workResult{
clusterName: clusterName,
err: e,
}
}()

// Calculate namespace.
namespaceName := defaultNamespace
if deployClusterInSeparateNamespaces {
namespaceName = clusterName
}
upgrade(ctx, namespaceName, clusterName, clusterTemplateYAML)
return false
}
}()
if done {
break
}
}
}

type clusterResources struct {
cluster *clusterv1.Cluster
machineDeployments []*clusterv1.MachineDeployment
controlPlane *controlplanev1.KubeadmControlPlane
}

func getClusterResourcesForUpgrade(ctx context.Context, c client.Client, namespace, clusterName string) clusterResources {
cluster := &clusterv1.Cluster{}
err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, cluster)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting Cluster %s: %s", klog.KRef(namespace, clusterName), err))

controlPlane := &controlplanev1.KubeadmControlPlane{}
err = c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: cluster.Spec.ControlPlaneRef.Name}, controlPlane)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting ControlPlane for Cluster %s: %s,", klog.KObj(cluster), err))

mds := []*clusterv1.MachineDeployment{}
machineDeployments := &clusterv1.MachineDeploymentList{}
err = c.List(ctx, machineDeployments,
client.MatchingLabels{
clusterv1.ClusterNameLabel: cluster.Name,
clusterv1.ClusterTopologyOwnedLabel: "",
},
client.InNamespace(cluster.Namespace),
)
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting MachineDeployments for Cluster %s: %s", klog.KObj(cluster), err))
for _, md := range machineDeployments.Items {
mds = append(mds, md.DeepCopy())
}

return clusterResources{
cluster: cluster,
machineDeployments: mds,
controlPlane: controlPlane,
}
}

type workResult struct {
clusterName string
err any
Expand Down
1 change: 1 addition & 0 deletions test/e2e/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

var _ = Describe("When testing the machinery for scale testing using in-memory provider", func() {
// Note: This test does not support MachinePools.
scaleSpec(ctx, func() scaleSpecInput {
return scaleSpecInput{
E2EConfig: e2eConfig,
Expand Down
14 changes: 9 additions & 5 deletions test/framework/cluster_topology_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct {
WaitForEtcdUpgrade []interface{}
PreWaitForControlPlaneToBeUpgraded func()
PreWaitForWorkersToBeUpgraded func()
SkipKubeProxyCheck bool
}

// UpgradeClusterTopologyAndWaitForUpgrade upgrades a Cluster topology and waits for it to be upgraded.
Expand Down Expand Up @@ -123,13 +124,16 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
}, input.WaitForMachinesToBeUpgraded...)

log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
workloadCluster := input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name)
workloadClient := workloadCluster.GetClient()
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)

if !input.SkipKubeProxyCheck {
Copy link
Contributor Author

@killianmuldoon killianmuldoon Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this knob on the testing input to allow skipping this check. This requires routing from the host to the port on the in-memory provider pod which represents the API server of the workload cluster. I didn't figure out a way to enable this, but in this iteration upgrade checks for etcd, kube-proxy and coreDNS do not work.

log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
Getter: workloadClient,
KubernetesVersion: input.KubernetesUpgradeVersion,
}, input.WaitForKubeProxyUpgrade...)
}

// Wait for the CoreDNS upgrade if the DNSImageTag is set.
if input.DNSImageTag != "" {
Expand Down
7 changes: 0 additions & 7 deletions test/framework/controlplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,6 @@ func UpgradeControlPlaneAndWaitForUpgrade(ctx context.Context, input UpgradeCont
}
}

// controlPlaneMachineOptions returns a set of ListOptions that allows to get all machine objects belonging to control plane.
func controlPlaneMachineOptions() []client.ListOption {
return []client.ListOption{
client.HasLabels{clusterv1.MachineControlPlaneLabel},
}
}

type ScaleAndWaitControlPlaneInput struct {
ClusterProxy ClusterProxy
Cluster *clusterv1.Cluster
Expand Down
8 changes: 7 additions & 1 deletion test/framework/machine_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlane
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster")
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster")

options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this never worked with multiple clusters as one label selector overrode the other - this ended up getting all machineDeployments in the namespace for me. Replaced it with a simpler inline definition of the label selector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun! nice catch!

options := []client.ListOption{
client.InNamespace(input.Namespace),
client.MatchingLabels{
clusterv1.ClusterNameLabel: input.ClusterName,
clusterv1.MachineControlPlaneLabel: "",
},
}

machineList := &clusterv1.MachineList{}
Eventually(func() error {
Expand Down
17 changes: 13 additions & 4 deletions test/infrastructure/inmemory/pkg/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -317,16 +318,24 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
}

// TODO: The only field selector which works is for `spec.nodeName` on pods.
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if selector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
if fieldSelector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
}

labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if labelSelector != nil {
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
}
if err := inmemoryClient.List(ctx, list, listOpts...); err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
Expand Down
12 changes: 10 additions & 2 deletions test/infrastructure/inmemory/pkg/server/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestAPI_corev1_CRUD(t *testing.T) {
Spec: corev1.PodSpec{NodeName: n.Name},
})).To(Succeed())
g.Expect(c.Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault},
ObjectMeta: metav1.ObjectMeta{Name: "labelSelectedPod", Namespace: metav1.NamespaceDefault, Labels: map[string]string{"name": "labelSelectedPod"}},
})).To(Succeed())

pl := &corev1.PodList{}
Expand All @@ -166,8 +167,15 @@ func TestAPI_corev1_CRUD(t *testing.T) {
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("bar"))

// get
// list with label selector on pod
labelSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"name": "labelSelectedPod"}),
}
g.Expect(c.List(ctx, pl, labelSelector)).To(Succeed())
g.Expect(pl.Items).To(HaveLen(1))
g.Expect(pl.Items[0].Name).To(Equal("labelSelectedPod"))

// get
n = &corev1.Node{}
err = c.Get(ctx, client.ObjectKey{Name: "foo"}, n)
g.Expect(err).ToNot(HaveOccurred())
Expand Down
Loading