Skip to content

Commit

Permalink
Modify copy_volume_data to use PodRunner (#5422)
Browse files Browse the repository at this point in the history
* Add podOptions to closure func

* Address review comments

* Remove podOptions from closure func

* Modify copyVolumeData to use PodRunner

* Change podFunc name
  • Loading branch information
DeepikaDixit authored and Ilya Kislenko committed Apr 17, 2019
1 parent 0b46e3d commit 34518c6
Showing 1 changed file with 33 additions and 35 deletions.
68 changes: 33 additions & 35 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -49,49 +50,46 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp
}
// Create a pod with PVCs attached
mountPoint := fmt.Sprintf(copyVolumeDataMountPoint, pvc)
pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{
options := &kube.PodOptions{
Namespace: namespace,
GenerateName: copyVolumeDataJobPrefix,
Image: kanisterToolsImage,
Command: []string{"sh", "-c", "tail -f /dev/null"},
Volumes: map[string]string{pvc: mountPoint},
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to create pod to copy volume data")
}
defer kube.DeletePod(context.Background(), cli, pod)

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}

// Get restic repository
if err = restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
}
pr := kube.NewPodRunner(cli, options)
podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey)
return pr.Run(ctx, podFunc)
}

// Copy data to object store
backupTag := rand.String(10)
cmd := restic.BackupCommandByTag(tp.Profile, targetPath, backupTag, mountPoint, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
if backupID == "" {
return nil, errors.New("Failed to parse the backup ID from logs")
func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
}
// Copy data to object store
backupTag := rand.String(10)
cmd := restic.BackupCommandByTag(tp.Profile, targetPath, backupTag, mountPoint, encryptionKey)
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stdout)
format.Log(pod.Name, pod.Spec.Containers[0].Name, stderr)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
if backupID == "" {
return nil, errors.New("Failed to parse the backup ID from logs")
}
return map[string]interface{}{
CopyVolumeDataOutputBackupID: backupID,
CopyVolumeDataOutputBackupRoot: mountPoint,
CopyVolumeDataOutputBackupArtifactLocation: targetPath,
CopyVolumeDataOutputBackupTag: backupTag,
},
nil
}
return map[string]interface{}{
CopyVolumeDataOutputBackupID: backupID,
CopyVolumeDataOutputBackupRoot: mountPoint,
CopyVolumeDataOutputBackupArtifactLocation: targetPath,
CopyVolumeDataOutputBackupTag: backupTag,
},
nil
}

func (*copyVolumeDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down

0 comments on commit 34518c6

Please sign in to comment.