From c4ccb530da5c7b1671103fdeadb2c57f0e3b38fa Mon Sep 17 00:00:00 2001 From: Vivek Singh Date: Thu, 21 Dec 2023 11:40:57 +0100 Subject: [PATCH] Set original replica count as op artifact for `ScaleWorkload` fun (#2533) * Set original replica count as op artifact for `ScaleWorkload` fun * Address small review comments 1. Remove extra new line 2. Change STS -> StatefulSet * Update example for scaleworkload * Add test and update the scaleworkload tasks README --- docs/functions.rst | 10 +++++--- docs/tasks.rst | 1 + docs/tasks/scaleworkload.rst | 46 ++++++++++++++++++++++++++++++++++ pkg/function/scale_test.go | 42 +++++++++++++++++++++++++------ pkg/function/scale_workload.go | 26 ++++++++++++++++--- pkg/kube/workload.go | 24 ++++++++++++++++++ 6 files changed, 135 insertions(+), 14 deletions(-) create mode 100644 docs/tasks/scaleworkload.rst diff --git a/docs/functions.rst b/docs/functions.rst index b6757f4f57..23cb82941a 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -147,7 +147,9 @@ Example: ScaleWorkload ------------- -ScaleWorkload is used to scale up or scale down a Kubernetes workload. +ScaleWorkload is used to scale up or scale down a Kubernetes workload. It +also sets the original replica count of the workload as output artifact with +the key ``originalReplicaCount``. The function only returns after the desired replica state is achieved: * When reducing the replica count, wait until all terminating pods @@ -155,7 +157,8 @@ The function only returns after the desired replica state is achieved: * When increasing the replica count, wait until all pods are ready. -Currently the function supports Deployments and StatefulSets. +Currently the function supports Deployments, StatefulSets and +DeploymentConfigs. It is similar to running @@ -165,7 +168,8 @@ It is similar to running This can be useful if the workload needs to be shutdown before processing certain data operations. For example, it may be useful to use ``ScaleWorkload`` -to stop a database process before restoring files. +to stop a database process before restoring files. See +:ref:`scaleworkloadexample` for an example with new ``ScaleWorkload`` function. .. csv-table:: :header: "Argument", "Required", "Type", "Description" diff --git a/docs/tasks.rst b/docs/tasks.rst index 3897ee7052..284b1ac1d0 100644 --- a/docs/tasks.rst +++ b/docs/tasks.rst @@ -8,3 +8,4 @@ Tasks tasks/argo.rst tasks/logs_level.rst tasks/logs.rst + tasks/scaleworkload.rst diff --git a/docs/tasks/scaleworkload.rst b/docs/tasks/scaleworkload.rst new file mode 100644 index 0000000000..777c0f0a0a --- /dev/null +++ b/docs/tasks/scaleworkload.rst @@ -0,0 +1,46 @@ +.. _scaleworkloadexample: + +Using ScaleWorkload function with output artifact +------------------------------------------------- + +``ScaleWorkload`` function can be used to scale a workload to specified +replicas. It automatically sets the original replica count of the workload +as output artifact, which makes using ``ScaleWorkload`` function in blueprints +a lot easier. + +Below is an example of how this function can be used + + +.. code-block:: yaml + + apiVersion: cr.kanister.io/v1alpha1 + kind: Blueprint + metadata: + name: my-blueprint + actions: + backup: + outputArtifacts: + backupOutput: + keyValue: + origReplicas: "{{ .Phases.shutdownPod.Output.originalReplicaCount }}" + phases: + # before scaling replicas 0, ScaleWorkload will get the original replica count + # to set that as output artifact (originalReplicaCount) + - func: ScaleWorkload + name: shutdownPod + args: + namespace: "{{ .StatefulSet.Namespace }}" + name: "{{ .StatefulSet.Name }}" + kind: StatefulSet + replicas: 0 # this is the replica count, the STS will scaled to + restore: + inputArtifactNames: + - backupOutput + phases: + - func: ScaleWorkload + name: bringUpPod + args: + namespace: "{{ .StatefulSet.Namespace }}" + name: "{{ .StatefulSet.Name }}" + kind: StatefulSet + replicas: "{{ .ArtifactsIn.backupOutput.KeyValue.origReplicas }}" diff --git a/pkg/function/scale_test.go b/pkg/function/scale_test.go index 86b6730036..b4bd5d675d 100644 --- a/pkg/function/scale_test.go +++ b/pkg/function/scale_test.go @@ -85,7 +85,7 @@ func (s *ScaleSuite) TearDownTest(c *C) { } } -func newScaleBlueprint(kind string) *crv1alpha1.Blueprint { +func newScaleBlueprint(kind string, scaleUpCount string) *crv1alpha1.Blueprint { return &crv1alpha1.Blueprint{ Actions: map[string]*crv1alpha1.BlueprintAction{ "echoHello": { @@ -122,7 +122,7 @@ func newScaleBlueprint(kind string) *crv1alpha1.Blueprint { Name: "testScale", Func: ScaleWorkloadFuncName, Args: map[string]interface{}{ - ScaleWorkloadReplicas: "2", + ScaleWorkloadReplicas: scaleUpCount, }, }, }, @@ -133,7 +133,8 @@ func newScaleBlueprint(kind string) *crv1alpha1.Blueprint { func (s *ScaleSuite) TestScaleDeployment(c *C) { ctx := context.Background() - d := testutil.NewTestDeployment(1) + var originalReplicaCount int32 = 1 + d := testutil.NewTestDeployment(originalReplicaCount) d.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{ @@ -160,15 +161,27 @@ func (s *ScaleSuite) TestScaleDeployment(c *C) { Namespace: s.namespace, }, } + var scaleUpToReplicas int32 = 2 for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} { tp, err := param.New(ctx, s.cli, fake.NewSimpleDynamicClient(k8sscheme.Scheme, d), s.crCli, s.osCli, as) c.Assert(err, IsNil) - bp := newScaleBlueprint(kind) + bp := newScaleBlueprint(kind, fmt.Sprintf("%d", scaleUpToReplicas)) phases, err := kanister.GetPhases(*bp, action, kanister.DefaultVersion, *tp) c.Assert(err, IsNil) for _, p := range phases { - _, err = p.Exec(context.Background(), *bp, action, *tp) + out, err := p.Exec(context.Background(), *bp, action, *tp) c.Assert(err, IsNil) + // at the start workload has `originalReplicaCount` replicas, the first phase that is going to get executed is + // `scaleUp` which would change that count to 2, but the function would return the count that workload originally had + // i.e., `originalReplicaCount` + if action == "scaleUp" { + c.Assert(out[outputArtifactOriginalReplicaCount], Equals, originalReplicaCount) + } + // `scaleDown` is going to change the replica count to 0 from 2. Because the workload already had 2 replicas + // (previous phase), so ouptut artifact from the function this time would be what the workload already had i.e., 2 + if action == "scaleDown" { + c.Assert(out[outputArtifactOriginalReplicaCount], Equals, scaleUpToReplicas) + } } ok, _, err := kube.DeploymentReady(ctx, s.cli, d.GetNamespace(), d.GetName()) c.Assert(err, IsNil) @@ -182,7 +195,8 @@ func (s *ScaleSuite) TestScaleDeployment(c *C) { func (s *ScaleSuite) TestScaleStatefulSet(c *C) { ctx := context.Background() - ss := testutil.NewTestStatefulSet(1) + var originalReplicaCount int32 = 1 + ss := testutil.NewTestStatefulSet(originalReplicaCount) ss.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{ @@ -209,15 +223,27 @@ func (s *ScaleSuite) TestScaleStatefulSet(c *C) { }, } + var scaleUpToReplicas int32 = 2 for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} { tp, err := param.New(ctx, s.cli, fake.NewSimpleDynamicClient(k8sscheme.Scheme, ss), s.crCli, s.osCli, as) c.Assert(err, IsNil) - bp := newScaleBlueprint(kind) + bp := newScaleBlueprint(kind, fmt.Sprintf("%d", scaleUpToReplicas)) phases, err := kanister.GetPhases(*bp, action, kanister.DefaultVersion, *tp) c.Assert(err, IsNil) for _, p := range phases { - _, err = p.Exec(context.Background(), *bp, action, *tp) + out, err := p.Exec(context.Background(), *bp, action, *tp) c.Assert(err, IsNil) + // at the start workload has `originalReplicaCount` replicas, the first phase that is going to get executed is + // `scaleUp` which would change that count to 2, but the function would return the count that workload originally had + // i.e., `originalReplicaCount` + if action == "scaleUp" { + c.Assert(out[outputArtifactOriginalReplicaCount], Equals, originalReplicaCount) + } + // `scaleDown` is going to change the replica count to 0 from 2. Because the workload already had 2 replicas + // (previous phase), so ouptut artifact from the function this time would be what the workload already had i.e., 2 + if action == "scaleDown" { + c.Assert(out[outputArtifactOriginalReplicaCount], Equals, scaleUpToReplicas) + } } ok, _, err := kube.StatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName()) c.Assert(err, IsNil) diff --git a/pkg/function/scale_workload.go b/pkg/function/scale_workload.go index 819e771759..45df873968 100644 --- a/pkg/function/scale_workload.go +++ b/pkg/function/scale_workload.go @@ -40,6 +40,8 @@ const ( ScaleWorkloadKindArg = "kind" ScaleWorkloadReplicas = "replicas" ScaleWorkloadWaitArg = "waitForReady" + + outputArtifactOriginalReplicaCount = "originalReplicaCount" ) func init() { @@ -83,15 +85,33 @@ func (s *scaleWorkloadFunc) Exec(ctx context.Context, tp param.TemplateParams, a } switch strings.ToLower(s.kind) { case param.StatefulSetKind: - return nil, kube.ScaleStatefulSet(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady) + count, err := kube.StatefulSetReplicas(ctx, cli, s.namespace, s.name) + if err != nil { + return nil, err + } + return map[string]interface{}{ + outputArtifactOriginalReplicaCount: count, + }, kube.ScaleStatefulSet(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady) case param.DeploymentKind: - return nil, kube.ScaleDeployment(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady) + count, err := kube.DeploymentReplicas(ctx, cli, s.namespace, s.name) + if err != nil { + return nil, err + } + return map[string]interface{}{ + outputArtifactOriginalReplicaCount: count, + }, kube.ScaleDeployment(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady) case param.DeploymentConfigKind: osCli, err := osversioned.NewForConfig(cfg) if err != nil { return nil, errors.Wrapf(err, "Failed to create OpenShift client") } - return nil, kube.ScaleDeploymentConfig(ctx, cli, osCli, s.namespace, s.name, s.replicas, s.waitForReady) + count, err := kube.DeploymentConfigReplicas(ctx, osCli, s.namespace, s.name) + if err != nil { + return nil, err + } + return map[string]interface{}{ + outputArtifactOriginalReplicaCount: count, + }, kube.ScaleDeploymentConfig(ctx, cli, osCli, s.namespace, s.name, s.replicas, s.waitForReady) } return nil, errors.New("Workload type not supported " + s.kind) } diff --git a/pkg/kube/workload.go b/pkg/kube/workload.go index bf08210cf4..8ead5548fd 100644 --- a/pkg/kube/workload.go +++ b/pkg/kube/workload.go @@ -514,3 +514,27 @@ func IsPodRunning(cli kubernetes.Interface, podName, podNamespace string) (bool, return true, nil } + +func StatefulSetReplicas(ctx context.Context, kubeCli kubernetes.Interface, namespace, name string) (int32, error) { + sts, err := kubeCli.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return 0, errors.Wrapf(err, "Could not get StatefulSet{Namespace %s, Name: %s}, to figure out replicas", namespace, name) + } + return *sts.Spec.Replicas, nil +} + +func DeploymentReplicas(ctx context.Context, kubeCli kubernetes.Interface, namespace, name string) (int32, error) { + d, err := kubeCli.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return 0, errors.Wrapf(err, "Could not get Deployment{Namespace %s, Name: %s}, to figure out replicas", namespace, name) + } + return *d.Spec.Replicas, nil +} + +func DeploymentConfigReplicas(ctx context.Context, osCli osversioned.Interface, namespace, name string) (int32, error) { + dc, err := osCli.AppsV1().DeploymentConfigs(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return 0, errors.Wrapf(err, "Could not get DeploymentConfig{Namespace %s, Name: %s}, to figure out replicas", namespace, name) + } + return dc.Spec.Replicas, nil +}