Skip to content

Commit

Permalink
Switch Kanister KubeTask from job to pod (#4387)
Browse files Browse the repository at this point in the history
1. Switch KubeTask to spin up a pod instead of a Kubernetes Job
2. Parse the pods logs and capture output
3. Modify unit tests
  • Loading branch information
pavannd1 authored and Ilya Kislenko committed Nov 16, 2018
1 parent 711af01 commit 8db13c4
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 42 deletions.
2 changes: 1 addition & 1 deletion docs/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Example:
KubeTask
--------

KubeTask spins up a new container and executes a command via a Kubernetes job.
KubeTask spins up a new container and executes a command via a Pod.
This allows you to run a new Pod from a Blueprint.

.. csv-table::
Expand Down
5 changes: 5 additions & 0 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp
}
defer kube.DeletePod(context.Background(), cli, pod)

// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
}

// Get restic repository
if err = restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/delete_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m
// Generate delete command
cmd := generateDeleteCommand(artifact, tp.Profile)
// Use KubeTask to delete the artifact
return nil, kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.13.0", cmd)
return kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.13.0", cmd)
}

func (*deleteDataFunc) RequiredArgs() []string {
Expand Down
42 changes: 28 additions & 14 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/pkg/errors"
Expand Down Expand Up @@ -35,36 +36,49 @@ func generateJobName(jobPrefix string) string {
return jobPrefix + jobNameSuffix
}

func kubeTask(ctx context.Context, namespace, image string, command []string) error {
func kubeTask(ctx context.Context, namespace, image string, command []string) (map[string]interface{}, error) {
var serviceAccount string
var err error
clientset, err := kube.NewClient()
if err != nil {
return errors.Wrapf(err, "Failed to create Kubernetes client")
return nil, errors.Wrapf(err, "Failed to create Kubernetes client")
}
if namespace == "" {
namespace, err = kube.GetControllerNamespace()
if err != nil {
return errors.Wrapf(err, "Failed to get controller namespace")
return nil, errors.Wrapf(err, "Failed to get controller namespace")
}
serviceAccount, err = kube.GetControllerServiceAccount(clientset)
if err != nil {
return errors.Wrap(err, "Failed to get Controller Service Account")
return nil, errors.Wrap(err, "Failed to get Controller Service Account")
}
}
jobName := generateJobName(jobPrefix)
job, err := kube.NewJob(clientset, jobName, namespace, serviceAccount, image, nil, command...)
// Create a pod to run the command
pod, err := kube.CreatePod(ctx, clientset, &kube.PodOptions{
Namespace: namespace,
GenerateName: jobPrefix,
Image: image,
Command: command,
ServiceAccountName: serviceAccount,
})
if err != nil {
return errors.Wrap(err, "Failed to create job")
return nil, errors.Wrapf(err, "Failed to create pod for KubeTask")
}
if err := job.Create(); err != nil {
return errors.Wrapf(err, "Failed to create job %s in Kubernetes", jobName)
defer kube.DeletePod(context.Background(), clientset, pod)

// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, clientset, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
defer job.Delete()
if err := job.WaitForCompletion(ctx); err != nil {
return errors.Wrapf(err, "Failed while waiting for job %s to complete", jobName)
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, clientset, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
return nil
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)

out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to generate output")
}

func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand All @@ -80,7 +94,7 @@ func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err = OptArg(args, KubeTaskNamespaceArg, &namespace, ""); err != nil {
return nil, err
}
return nil, kubeTask(ctx, namespace, image, command)
return kubeTask(ctx, namespace, image, command)
}

func (*kubeTaskFunc) RequiredArgs() []string {
Expand Down
22 changes: 14 additions & 8 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package function
import (
"context"
"os"
"time"

. "gopkg.in/check.v1"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -53,19 +54,20 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint {
Kind: "StatefulSet",
Phases: []crv1alpha1.BlueprintPhase{
{
Name: "test",
Name: "testOutput",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "busybox",
KubeTaskImageArg: "kanisterio/kanister-tools:0.13.0",
KubeTaskCommandArg: []string{
"sleep",
"2",
"sh",
"-c",
"kando output version 0.13.0",
},
},
},
{
Name: "test2",
Name: "testSleep",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
Expand All @@ -83,7 +85,8 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint {
}

func (s *KubeTaskSuite) TestKubeTask(c *C) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
Expand All @@ -95,7 +98,10 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) {
phases, err := kanister.GetPhases(*bp, action, tp)
c.Assert(err, IsNil)
for _, p := range phases {
_, err = p.Exec(ctx, *bp, action, tp)
c.Assert(err, IsNil)
out, err := p.Exec(ctx, *bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
if out != nil {
c.Assert(out["version"], NotNil)
}
}
}
74 changes: 57 additions & 17 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kube

import (
"context"
"io/ioutil"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -14,11 +15,12 @@ import (

// PodOptions specifies options for `CreatePod`
type PodOptions struct {
Namespace string
GenerateName string
Image string
Command []string
Volumes map[string]string
Namespace string
GenerateName string
Image string
Command []string
Volumes map[string]string
ServiceAccountName string
}

// CreatePod creates a pod with a single container based on the specified image
Expand All @@ -39,24 +41,18 @@ func CreatePod(ctx context.Context, cli kubernetes.Interface, opts *PodOptions)
VolumeMounts: volumeMounts,
},
},
Volumes: podVolumes,
// RestartPolicy dictates when the containers of the pod should be restarted.
// The possible values include Always, OnFailure and Never with Always being the default.
// OnFailure policy will result in failed containers being restarted with an exponential back-off delay.
RestartPolicy: v1.RestartPolicyOnFailure,
Volumes: podVolumes,
ServiceAccountName: opts.ServiceAccountName,
},
}
pod, err := cli.Core().Pods(opts.Namespace).Create(pod)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create pod. Namespace: %s, NameFmt: %s", opts.Namespace, opts.GenerateName)
}
err = poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.Core().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return true, err
}
return (p.Status.Phase == v1.PodRunning), nil
})
if err != nil {
defer DeletePod(context.Background(), cli, pod)
return nil, errors.Wrapf(err, "Pod did not transition to running state. Namespace:%s, Name:%s", pod.Namespace, pod.Name)
}
return pod, nil
}

Expand All @@ -67,3 +63,47 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error
}
return nil
}

// GetPodLogs fetches the logs from the given pod
func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (string, error) {
reader, err := cli.Core().Pods(namespace).GetLogs(name, &v1.PodLogOptions{}).Stream()
if err != nil {
return "", err
}
defer reader.Close()
bytes, err := ioutil.ReadAll(reader)
if err != nil {
return "", err
}
return string(bytes), nil
}

// WaitForPodReady waits for a pod to reach Running state
func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, name string) error {
err := poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.Core().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
return (p.Status.Phase == v1.PodRunning), nil
})
if err == nil {
return nil
}
return errors.Wrapf(err, "Pod did not transition into running state. Namespace:%s, Name:%s", namespace, name)
}

// WaitForPodCompletion waits for a pod to reach a terminal state
func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespace, name string) error {
err := poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.Core().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
return (p.Status.Phase == v1.PodSucceeded) || (p.Status.Phase == v1.PodFailed), nil
})
if err == nil {
return nil
}
return errors.Wrapf(err, "Pod did not transition into a terminal state. Namespace:%s, Name:%s", namespace, name)
}
21 changes: 20 additions & 1 deletion pkg/kube/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kube
import (
"context"
"fmt"
"strings"
"time"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -47,13 +48,14 @@ func (s *PodSuite) TearDownSuite(c *C) {
func (s *PodSuite) TestPod(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
pod, err := CreatePod(ctx, s.cli, &PodOptions{
pod, err := CreatePod(context.Background(), s.cli, &PodOptions{
Namespace: s.namespace,
GenerateName: "test-",
Image: "kanisterio/kanister-tools:0.13.0",
Command: []string{"sh", "-c", "tail -f /dev/null"},
})
c.Assert(err, IsNil)
c.Assert(WaitForPodReady(ctx, s.cli, s.namespace, pod.Name), IsNil)
c.Assert(DeletePod(context.Background(), s.cli, pod), IsNil)
}

Expand Down Expand Up @@ -85,3 +87,20 @@ func (s *PodSuite) TestPodWithVolumes(c *C) {
c.Assert(pod.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, Equals, "pvc-test")
c.Assert(pod.Spec.Containers[0].VolumeMounts[0].MountPath, Equals, "/mnt/data1")
}

func (s *PodSuite) TestGetPodLogs(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
pod, err := CreatePod(context.Background(), s.cli, &PodOptions{
Namespace: s.namespace,
GenerateName: "test-",
Image: "kanisterio/kanister-tools:0.13.0",
Command: []string{"sh", "-c", "echo hello"},
})
c.Assert(err, IsNil)
c.Assert(WaitForPodCompletion(ctx, s.cli, s.namespace, pod.Name), IsNil)
logs, err := GetPodLogs(ctx, s.cli, s.namespace, pod.Name)
c.Assert(err, IsNil)
c.Assert(strings.Contains(logs, "hello"), Equals, true)
c.Assert(DeletePod(context.Background(), s.cli, pod), IsNil)
}

0 comments on commit 8db13c4

Please sign in to comment.