Skip to content

Commit

Permalink
Switch Kanister PrepareData to use pod instead of a job (#4411)
Browse files Browse the repository at this point in the history
* Modify PrepareData to use pod - K10-1711

* Update prepareData to create pod

* Minor changes

* Doc update

* Apply Suggestions
  • Loading branch information
pavannd1 authored and Ilya Kislenko committed Nov 26, 2018
1 parent 830c8d4 commit 3383bd5
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 26 deletions.
12 changes: 6 additions & 6 deletions docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ Example of scaling up:
PrepareData
-----------

This function allows running a Kubernetes Job that will mount one or more PVCs
This function allows running a new Pod that will mount one or more PVCs
and execute a command or script that manipulates the data on the PVCs.

The function can be useful when it is necessary to perform operations on the
Expand Down Expand Up @@ -302,13 +302,13 @@ RestoreData
-----------

This function restores data backed up by the BackupData function.
It creates a Kubernetes Job that mounts the PVCs referenced
by the specified Pod and restores data to the specified path.
It creates a new Pod that mounts the PVCs referenced by the specified Pod
and restores data to the specified path.

.. note::
It is extremely important that, the PVCs are not be currently
in use by an active application container, as the Kubernetes Job
requires to mount the PVCs to a new Pod (ensure by using
in use by an active application container, as they are required
to be mounted to the new Pod (ensure by using
ScaleWorkload with replicas=0 first).
For advanced use cases, it is possible to have concurrent access but
the PV needs to have RWX mode enabled and the volume needs to use a
Expand Down Expand Up @@ -406,7 +406,7 @@ If the ActionSet `Object` is a PersistentVolumeClaim:
DeleteData
----------

This function uses a Kubernetes Job to delete the specified artifact
This function uses a new Pod to delete the specified artifact
from an S3 compatible object store.

.. csv-table::
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newRestoreDataBlueprint(pvc string) *crv1alpha1.Blueprint {
RestoreDataNamespaceArg: "{{ .StatefulSet.Namespace }}",
RestoreDataImageArg: "kanisterio/kanister-tools:0.13.0",
RestoreDataBackupArtifactPrefixArg: "{{ .Profile.Location.S3Compliant.Bucket }}/{{ .Profile.Location.S3Compliant.Prefix }}",
RestoreDataRestorePathArg: "/",
RestoreDataRestorePathArg: "/mnt/data",
RestoreDataBackupIdentifierArg: "{{ .Time }}",
RestoreDataEncryptionKeyArg: "{{ .Secrets.backupKey.Data.password | toString }}",
RestoreDataVolsArg: map[string]string{
Expand Down
9 changes: 7 additions & 2 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -64,7 +65,11 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) (m
if err != nil {
return nil, errors.Wrapf(err, "Failed to create pod for KubeTask")
}
defer kube.DeletePod(context.Background(), clientset, pod)
defer func() {
if err := kube.DeletePod(context.Background(), clientset, pod); err != nil {
log.Error("Failed to delete pod ", err.Error())
}
}()

// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, clientset, pod.Namespace, pod.Name); err != nil {
Expand All @@ -78,7 +83,7 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) (m
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)

out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to generate output")
return out, errors.Wrap(err, "Failed to parse phase output")
}

func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *KubeTaskSuite) SetUpSuite(c *C) {

ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "kanisterdeletetest-",
GenerateName: "kanisterkubetasktest-",
},
}
cns, err := s.cli.Core().Namespaces().Create(ns)
Expand Down Expand Up @@ -85,7 +85,7 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint {
}

func (s *KubeTaskSuite) TestKubeTask(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Expand Down
43 changes: 31 additions & 12 deletions pkg/function/prepare_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
)
Expand Down Expand Up @@ -57,26 +59,43 @@ func getVolumes(tp param.TemplateParams) (map[string]string, error) {
return vols, nil
}

func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, serviceAccount, image string, vols map[string]string, command ...string) error {
func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, serviceAccount, image string, vols map[string]string, command ...string) (map[string]interface{}, error) {
// Validate volumes
for pvc := range vols {
if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil {
return errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc)
return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc)
}
}
jobName := generateJobName(prepareDataJobPrefix)
job, err := kube.NewJob(cli, jobName, namespace, serviceAccount, image, vols, command...)
pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{
Namespace: namespace,
GenerateName: prepareDataJobPrefix,
Image: image,
Command: command,
Volumes: vols,
ServiceAccountName: serviceAccount,
})
if err != nil {
return errors.Wrap(err, "Failed to create prepare data job")
return nil, errors.Wrapf(err, "Failed to create pod to run prepare data job")
}
if err := job.Create(); err != nil {
return errors.Wrapf(err, "Failed to create job %s in Kubernetes", jobName)
defer func() {
if err := kube.DeletePod(context.Background(), cli, pod); err != nil {
log.Error("Failed to delete pod ", err.Error())
}
}()

// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
defer job.Delete()
if err := job.WaitForCompletion(ctx); err != nil {
return errors.Wrapf(err, "Failed while waiting for job %s to complete", jobName)
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
return nil
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)

out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
}

func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down Expand Up @@ -108,7 +127,7 @@ func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
return nil, err
}
}
return nil, prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...)
return prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...)
}

func (*prepareDataFunc) RequiredArgs() []string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
return nil, prepareData(ctx, cli, namespace, "", image, vols, cmd...)
return prepareData(ctx, cli, namespace, "", image, vols, cmd...)
}

func (*restoreDataFunc) RequiredArgs() []string {
Expand Down
7 changes: 5 additions & 2 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa
if err != nil {
return true, err
}
return (p.Status.Phase == v1.PodSucceeded) || (p.Status.Phase == v1.PodFailed), nil
if p.Status.Phase == v1.PodFailed {
return false, errors.Errorf("Pod %s failed", name)
}
return (p.Status.Phase == v1.PodSucceeded), nil
})
if err == nil {
return nil
}
return errors.Wrapf(err, "Pod did not transition into a terminal state. Namespace:%s, Name:%s", namespace, name)
return errors.Wrap(err, "Pod did not transition into complete state")
}

0 comments on commit 3383bd5

Please sign in to comment.