Skip to content

Commit

Permalink
Modify prepare_data to use PodRunner (#5428)
Browse files Browse the repository at this point in the history
  • Loading branch information
DeepikaDixit authored and Ilya Kislenko committed Apr 18, 2019
1 parent 0044bfe commit 15ae295
Showing 1 changed file with 20 additions and 22 deletions.
42 changes: 20 additions & 22 deletions pkg/function/prepare_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -66,36 +66,34 @@ func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, servi
return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc)
}
}
pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{
options := &kube.PodOptions{
Namespace: namespace,
GenerateName: prepareDataJobPrefix,
Image: image,
Command: command,
Volumes: vols,
ServiceAccountName: serviceAccount,
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to create pod to run prepare data job")
}
defer func() {
if err := kube.DeletePod(context.Background(), cli, pod); err != nil {
log.Error("Failed to delete pod ", err.Error())
}
}()
pr := kube.NewPodRunner(cli, options)
podFunc := prepareDataPodFunc(cli)
return pr.Run(ctx, podFunc)
}

// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)
out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
}
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)

out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
}

func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down

0 comments on commit 15ae295

Please sign in to comment.