Skip to content

Commit

Permalink
Set original replica count as op artifact for ScaleWorkload fun (#2533
Browse files Browse the repository at this point in the history
)

* Set original replica count as op artifact for `ScaleWorkload` fun

* Address small review comments

1. Remove extra new line
2. Change STS -> StatefulSet

* Update example for scaleworkload

* Add test and update the scaleworkload tasks README
  • Loading branch information
viveksinghggits authored Dec 21, 2023
1 parent 0caa0df commit c4ccb53
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 14 deletions.
10 changes: 7 additions & 3 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,18 @@ Example:
ScaleWorkload
-------------

ScaleWorkload is used to scale up or scale down a Kubernetes workload.
ScaleWorkload is used to scale up or scale down a Kubernetes workload. It
also sets the original replica count of the workload as output artifact with
the key ``originalReplicaCount``.
The function only returns after the desired replica state is achieved:

* When reducing the replica count, wait until all terminating pods
complete.

* When increasing the replica count, wait until all pods are ready.

Currently the function supports Deployments and StatefulSets.
Currently the function supports Deployments, StatefulSets and
DeploymentConfigs.

It is similar to running

Expand All @@ -165,7 +168,8 @@ It is similar to running
This can be useful if the workload needs to be shutdown before processing
certain data operations. For example, it may be useful to use ``ScaleWorkload``
to stop a database process before restoring files.
to stop a database process before restoring files. See
:ref:`scaleworkloadexample` for an example with new ``ScaleWorkload`` function.

.. csv-table::
:header: "Argument", "Required", "Type", "Description"
Expand Down
1 change: 1 addition & 0 deletions docs/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Tasks
tasks/argo.rst
tasks/logs_level.rst
tasks/logs.rst
tasks/scaleworkload.rst
46 changes: 46 additions & 0 deletions docs/tasks/scaleworkload.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
.. _scaleworkloadexample:

Using ScaleWorkload function with output artifact
-------------------------------------------------

``ScaleWorkload`` function can be used to scale a workload to specified
replicas. It automatically sets the original replica count of the workload
as output artifact, which makes using ``ScaleWorkload`` function in blueprints
a lot easier.

Below is an example of how this function can be used


.. code-block:: yaml
apiVersion: cr.kanister.io/v1alpha1
kind: Blueprint
metadata:
name: my-blueprint
actions:
backup:
outputArtifacts:
backupOutput:
keyValue:
origReplicas: "{{ .Phases.shutdownPod.Output.originalReplicaCount }}"
phases:
# before scaling replicas 0, ScaleWorkload will get the original replica count
# to set that as output artifact (originalReplicaCount)
- func: ScaleWorkload
name: shutdownPod
args:
namespace: "{{ .StatefulSet.Namespace }}"
name: "{{ .StatefulSet.Name }}"
kind: StatefulSet
replicas: 0 # this is the replica count, the STS will scaled to
restore:
inputArtifactNames:
- backupOutput
phases:
- func: ScaleWorkload
name: bringUpPod
args:
namespace: "{{ .StatefulSet.Namespace }}"
name: "{{ .StatefulSet.Name }}"
kind: StatefulSet
replicas: "{{ .ArtifactsIn.backupOutput.KeyValue.origReplicas }}"
42 changes: 34 additions & 8 deletions pkg/function/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *ScaleSuite) TearDownTest(c *C) {
}
}

func newScaleBlueprint(kind string) *crv1alpha1.Blueprint {
func newScaleBlueprint(kind string, scaleUpCount string) *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"echoHello": {
Expand Down Expand Up @@ -122,7 +122,7 @@ func newScaleBlueprint(kind string) *crv1alpha1.Blueprint {
Name: "testScale",
Func: ScaleWorkloadFuncName,
Args: map[string]interface{}{
ScaleWorkloadReplicas: "2",
ScaleWorkloadReplicas: scaleUpCount,
},
},
},
Expand All @@ -133,7 +133,8 @@ func newScaleBlueprint(kind string) *crv1alpha1.Blueprint {

func (s *ScaleSuite) TestScaleDeployment(c *C) {
ctx := context.Background()
d := testutil.NewTestDeployment(1)
var originalReplicaCount int32 = 1
d := testutil.NewTestDeployment(originalReplicaCount)
d.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Expand All @@ -160,15 +161,27 @@ func (s *ScaleSuite) TestScaleDeployment(c *C) {
Namespace: s.namespace,
},
}
var scaleUpToReplicas int32 = 2
for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} {
tp, err := param.New(ctx, s.cli, fake.NewSimpleDynamicClient(k8sscheme.Scheme, d), s.crCli, s.osCli, as)
c.Assert(err, IsNil)
bp := newScaleBlueprint(kind)
bp := newScaleBlueprint(kind, fmt.Sprintf("%d", scaleUpToReplicas))
phases, err := kanister.GetPhases(*bp, action, kanister.DefaultVersion, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
_, err = p.Exec(context.Background(), *bp, action, *tp)
out, err := p.Exec(context.Background(), *bp, action, *tp)
c.Assert(err, IsNil)
// at the start workload has `originalReplicaCount` replicas, the first phase that is going to get executed is
// `scaleUp` which would change that count to 2, but the function would return the count that workload originally had
// i.e., `originalReplicaCount`
if action == "scaleUp" {
c.Assert(out[outputArtifactOriginalReplicaCount], Equals, originalReplicaCount)
}
// `scaleDown` is going to change the replica count to 0 from 2. Because the workload already had 2 replicas
// (previous phase), so ouptut artifact from the function this time would be what the workload already had i.e., 2
if action == "scaleDown" {
c.Assert(out[outputArtifactOriginalReplicaCount], Equals, scaleUpToReplicas)
}
}
ok, _, err := kube.DeploymentReady(ctx, s.cli, d.GetNamespace(), d.GetName())
c.Assert(err, IsNil)
Expand All @@ -182,7 +195,8 @@ func (s *ScaleSuite) TestScaleDeployment(c *C) {

func (s *ScaleSuite) TestScaleStatefulSet(c *C) {
ctx := context.Background()
ss := testutil.NewTestStatefulSet(1)
var originalReplicaCount int32 = 1
ss := testutil.NewTestStatefulSet(originalReplicaCount)
ss.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Expand All @@ -209,15 +223,27 @@ func (s *ScaleSuite) TestScaleStatefulSet(c *C) {
},
}

var scaleUpToReplicas int32 = 2
for _, action := range []string{"scaleUp", "echoHello", "scaleDown"} {
tp, err := param.New(ctx, s.cli, fake.NewSimpleDynamicClient(k8sscheme.Scheme, ss), s.crCli, s.osCli, as)
c.Assert(err, IsNil)
bp := newScaleBlueprint(kind)
bp := newScaleBlueprint(kind, fmt.Sprintf("%d", scaleUpToReplicas))
phases, err := kanister.GetPhases(*bp, action, kanister.DefaultVersion, *tp)
c.Assert(err, IsNil)
for _, p := range phases {
_, err = p.Exec(context.Background(), *bp, action, *tp)
out, err := p.Exec(context.Background(), *bp, action, *tp)
c.Assert(err, IsNil)
// at the start workload has `originalReplicaCount` replicas, the first phase that is going to get executed is
// `scaleUp` which would change that count to 2, but the function would return the count that workload originally had
// i.e., `originalReplicaCount`
if action == "scaleUp" {
c.Assert(out[outputArtifactOriginalReplicaCount], Equals, originalReplicaCount)
}
// `scaleDown` is going to change the replica count to 0 from 2. Because the workload already had 2 replicas
// (previous phase), so ouptut artifact from the function this time would be what the workload already had i.e., 2
if action == "scaleDown" {
c.Assert(out[outputArtifactOriginalReplicaCount], Equals, scaleUpToReplicas)
}
}
ok, _, err := kube.StatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName())
c.Assert(err, IsNil)
Expand Down
26 changes: 23 additions & 3 deletions pkg/function/scale_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
ScaleWorkloadKindArg = "kind"
ScaleWorkloadReplicas = "replicas"
ScaleWorkloadWaitArg = "waitForReady"

outputArtifactOriginalReplicaCount = "originalReplicaCount"
)

func init() {
Expand Down Expand Up @@ -83,15 +85,33 @@ func (s *scaleWorkloadFunc) Exec(ctx context.Context, tp param.TemplateParams, a
}
switch strings.ToLower(s.kind) {
case param.StatefulSetKind:
return nil, kube.ScaleStatefulSet(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady)
count, err := kube.StatefulSetReplicas(ctx, cli, s.namespace, s.name)
if err != nil {
return nil, err
}
return map[string]interface{}{
outputArtifactOriginalReplicaCount: count,
}, kube.ScaleStatefulSet(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady)
case param.DeploymentKind:
return nil, kube.ScaleDeployment(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady)
count, err := kube.DeploymentReplicas(ctx, cli, s.namespace, s.name)
if err != nil {
return nil, err
}
return map[string]interface{}{
outputArtifactOriginalReplicaCount: count,
}, kube.ScaleDeployment(ctx, cli, s.namespace, s.name, s.replicas, s.waitForReady)
case param.DeploymentConfigKind:
osCli, err := osversioned.NewForConfig(cfg)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create OpenShift client")
}
return nil, kube.ScaleDeploymentConfig(ctx, cli, osCli, s.namespace, s.name, s.replicas, s.waitForReady)
count, err := kube.DeploymentConfigReplicas(ctx, osCli, s.namespace, s.name)
if err != nil {
return nil, err
}
return map[string]interface{}{
outputArtifactOriginalReplicaCount: count,
}, kube.ScaleDeploymentConfig(ctx, cli, osCli, s.namespace, s.name, s.replicas, s.waitForReady)
}
return nil, errors.New("Workload type not supported " + s.kind)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kube/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,27 @@ func IsPodRunning(cli kubernetes.Interface, podName, podNamespace string) (bool,

return true, nil
}

func StatefulSetReplicas(ctx context.Context, kubeCli kubernetes.Interface, namespace, name string) (int32, error) {
sts, err := kubeCli.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return 0, errors.Wrapf(err, "Could not get StatefulSet{Namespace %s, Name: %s}, to figure out replicas", namespace, name)
}
return *sts.Spec.Replicas, nil
}

func DeploymentReplicas(ctx context.Context, kubeCli kubernetes.Interface, namespace, name string) (int32, error) {
d, err := kubeCli.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return 0, errors.Wrapf(err, "Could not get Deployment{Namespace %s, Name: %s}, to figure out replicas", namespace, name)
}
return *d.Spec.Replicas, nil
}

func DeploymentConfigReplicas(ctx context.Context, osCli osversioned.Interface, namespace, name string) (int32, error) {
dc, err := osCli.AppsV1().DeploymentConfigs(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return 0, errors.Wrapf(err, "Could not get DeploymentConfig{Namespace %s, Name: %s}, to figure out replicas", namespace, name)
}
return dc.Spec.Replicas, nil
}

0 comments on commit c4ccb53

Please sign in to comment.