Skip to content

Commit

Permalink
Refactor PodRunner and KubeTask to use PodController under the hood. (#…
Browse files Browse the repository at this point in the history
…1986)

* Remove dead code

* Introduce RunEx functionality in PodRunner

* Refactor kube_task

* Refactor copy_volume_data.

* Deduplicate code in PodRunner

* Better naming for internal context

Co-authored-by: Denis Voytyuk <5462781+denisvmedia@users.noreply.github.com>

* Add comment explaining purpose of PodRunner.RunEx

* Adjust return type of PodRunner.Run

* Keep PodRunner function signatures consistent

---------

Co-authored-by: Denis Voytyuk <5462781+denisvmedia@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 23, 2023
1 parent 1cf25e5 commit 2babd1a
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 47 deletions.
44 changes: 29 additions & 15 deletions pkg/function/copy_volume_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"fmt"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"go.uber.org/zap/buffer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -80,20 +80,28 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp
}
pr := kube.NewPodRunner(cli, options)
podFunc := copyVolumeDataPodFunc(cli, tp, namespace, mountPoint, targetPath, encryptionKey)
return pr.Run(ctx, podFunc)
return pr.RunEx(ctx, podFunc)
}

func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, namespace, mountPoint, targetPath, encryptionKey string) func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
// Wait for pod to reach running state
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName())
}
pw, err := GetPodWriter(cli, ctx, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, tp.Profile)
pw1, err := pc.GetFileWriter()
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}
defer CleanUpCredsFile(ctx, pw, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)

remover, err := WriteCredsToPod(ctx, pw1, tp.Profile)
if err != nil {
return nil, errors.Wrapf(err, "Failed to write credentials to Pod %s", pc.PodName())
}

defer remover.Remove(context.Background()) //nolint:errcheck

pod := pc.Pod()
// Get restic repository
if err := restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil {
return nil, err
Expand All @@ -104,18 +112,24 @@ func copyVolumeDataPodFunc(cli kubernetes.Interface, tp param.TemplateParams, na
if err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, cmd, nil)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr)
ex, err := pc.GetCommandExecutor()
if err != nil {
return nil, err
}
var stdout buffer.Buffer
var stderr buffer.Buffer
err = ex.Exec(ctx, cmd, nil, &stdout, &stderr)
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stdout.String())
format.LogWithCtx(ctx, pod.Name, pod.Spec.Containers[0].Name, stderr.String())
if err != nil {
return nil, errors.Wrapf(err, "Failed to create and upload backup")
}
// Get the snapshot ID from log
backupID := restic.SnapshotIDFromBackupLog(stdout)
backupID := restic.SnapshotIDFromBackupLog(stdout.String())
if backupID == "" {
return nil, errors.Errorf("Failed to parse the backup ID from logs, backup logs %s", stdout)
return nil, errors.Errorf("Failed to parse the backup ID from logs, backup logs %s", stdout.String())
}
fileCount, backupSize, phySize := restic.SnapshotStatsFromBackupLog(stdout)
fileCount, backupSize, phySize := restic.SnapshotStatsFromBackupLog(stdout.String())
if backupSize == "" {
log.Debug().Print("Could not parse backup stats from backup log")
}
Expand Down
20 changes: 9 additions & 11 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -62,19 +61,18 @@ func kubeTask(ctx context.Context, cli kubernetes.Interface, namespace, image st
}

pr := kube.NewPodRunner(cli, options)
podFunc := kubeTaskPodFunc(cli)
return pr.Run(ctx, podFunc)
podFunc := kubeTaskPodFunc()
return pr.RunEx(ctx, podFunc)
}

func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name)
func kubeTaskPodFunc() func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) {
if err := pc.WaitForPodReady(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName())
}
ctx = field.Context(ctx, consts.PodNameKey, pod.Name)
ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath)
// Fetch logs from the pod
r, err := kube.StreamPodLogs(ctx, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
r, err := pc.StreamPodLogs(ctx)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
Expand All @@ -83,8 +81,8 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1
return nil, err
}
// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
if err := pc.WaitForPodCompletion(ctx); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pc.PodName())
}
return out, err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/function/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ func ValidateProfile(profile *param.Profile) error {
return nil
}

// WriteCredsToPod creates a file with Google credentials if the given profile points to a GCS location
func WriteCredsToPod(ctx context.Context, writer kube.PodFileWriter, profile *param.Profile) (kube.PodFileRemover, error) {
if profile.Location.Type == crv1alpha1.LocationTypeGCS {
remover, err := writer.Write(ctx, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret))
if err != nil {
return nil, errors.Wrapf(err, "Unable to write Google credentials to the pod.")
}

return remover, nil
}
return nil, nil
}

// GetPodWriter creates a file with Google credentials if the given profile points to a GCS location
func GetPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (kube.PodWriter, error) {
if profile.Location.Type == crv1alpha1.LocationTypeGCS {
Expand Down
54 changes: 49 additions & 5 deletions pkg/kube/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kube

import (
"context"
"io"
"time"

"github.com/pkg/errors"
Expand All @@ -42,15 +43,20 @@ type PodController interface {
Pod() *corev1.Pod
StartPod(ctx context.Context) error
WaitForPodReady(ctx context.Context) error
WaitForPodCompletion(ctx context.Context) error
StopPod(ctx context.Context, timeout time.Duration, gracePeriodSeconds int64) error

StreamPodLogs(ctx context.Context) (io.ReadCloser, error)

GetCommandExecutor() (PodCommandExecutor, error)
GetFileWriter() (PodFileWriter, error)
}

// podControllerProcessor aids in unit testing
type podControllerProcessor interface {
createPod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error)
waitForPodReady(ctx context.Context, podName string) error
waitForPodReady(ctx context.Context, namespace, podName string) error
waitForPodCompletion(ctx context.Context, namespace, podName string) error
deletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error
}

Expand Down Expand Up @@ -127,7 +133,7 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {
return ErrPodControllerPodNotStarted
}

if err := p.pcp.waitForPodReady(ctx, p.podName); err != nil {
if err := p.pcp.waitForPodReady(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to become ready in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to become ready in time")
}
Expand All @@ -137,6 +143,26 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {
return nil
}

// WaitForPodCompletion waits for a pod to reach a terminal state.
func (p *podController) WaitForPodCompletion(ctx context.Context) error {
if p.podName == "" {
return ErrPodControllerPodNotStarted
}

if !p.podReady {
return ErrPodControllerPodNotReady
}

if err := p.pcp.waitForPodCompletion(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to complete in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to complete in time")
}

p.podReady = false

return nil
}

// StopPod stops the pod which was previously started, otherwise it will return ErrPodControllerPodNotStarted error.
// stopTimeout is used to limit execution time
// gracePeriodSeconds is used to specify pod deletion grace period. If set to zero, pod should be deleted immediately
Expand All @@ -163,6 +189,14 @@ func (p *podController) StopPod(ctx context.Context, stopTimeout time.Duration,
return nil
}

func (p *podController) StreamPodLogs(ctx context.Context) (io.ReadCloser, error) {
if p.podName == "" {
return nil, ErrPodControllerPodNotStarted
}

return StreamPodLogs(ctx, p.cli, p.pod.Namespace, p.pod.Name, p.pod.Spec.Containers[0].Name)
}

func (p *podController) GetCommandExecutor() (PodCommandExecutor, error) {
if p.podName == "" {
return nil, ErrPodControllerPodNotStarted
Expand All @@ -172,11 +206,16 @@ func (p *podController) GetCommandExecutor() (PodCommandExecutor, error) {
return nil, ErrPodControllerPodNotReady
}

containerName := p.podOptions.ContainerName
if containerName == "" {
containerName = p.pod.Spec.Containers[0].Name
}

pce := &podCommandExecutor{
cli: p.cli,
namespace: p.podOptions.Namespace,
podName: p.podName,
containerName: p.podOptions.ContainerName,
containerName: containerName,
}

pce.pcep = pce
Expand Down Expand Up @@ -211,8 +250,13 @@ func (p *podController) createPod(ctx context.Context, cli kubernetes.Interface,
}

// This is wrapped for unit testing.
func (p *podController) waitForPodReady(ctx context.Context, podName string) error {
return WaitForPodReady(ctx, p.cli, p.podOptions.Namespace, podName)
func (p *podController) waitForPodReady(ctx context.Context, namespace, podName string) error {
return WaitForPodReady(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing
func (p *podController) waitForPodCompletion(ctx context.Context, namespace, podName string) error {
return WaitForPodCompletion(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing.
Expand Down
27 changes: 23 additions & 4 deletions pkg/kube/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ func (s *PodControllerTestSuite) SetUpSuite(c *C) {
}

type fakePodControllerProcessor struct {
inWaitForPodReadyPodName string
waitForPodReadyErr error
inWaitForPodReadyNamespace string
inWaitForPodReadyPodName string
waitForPodReadyErr error

inWaitForPodCompletionNamespace string
inWaitForPodCompletionPodName string
waitForPodCompletionErr error

inDeletePodNamespace string
inDeletePodPodName string
Expand All @@ -62,8 +67,15 @@ func (f *fakePodControllerProcessor) createPod(_ context.Context, cli kubernetes
return f.createPodRet, f.createPodErr
}

func (f *fakePodControllerProcessor) waitForPodReady(_ context.Context, podName string) error {
func (f *fakePodControllerProcessor) waitForPodCompletion(ctx context.Context, namespace, podName string) error {
f.inWaitForPodCompletionNamespace = namespace
f.inWaitForPodCompletionPodName = podName
return f.waitForPodCompletionErr
}

func (f *fakePodControllerProcessor) waitForPodReady(ctx context.Context, namespace, podName string) error {
f.inWaitForPodReadyPodName = podName
f.inWaitForPodReadyNamespace = namespace
return f.waitForPodReadyErr
}

Expand Down Expand Up @@ -158,7 +170,8 @@ func (s *PodControllerTestSuite) TestPodControllerWaitPod(c *C) {
"Waiting failed due to timeout": func(pcp *fakePodControllerProcessor, pc PodController) {
pcp.createPodRet = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podControllerPodName,
Name: podControllerPodName,
Namespace: podControllerNS,
},
}
err := pc.StartPod(ctx)
Expand All @@ -168,6 +181,7 @@ func (s *PodControllerTestSuite) TestPodControllerWaitPod(c *C) {
err = pc.WaitForPodReady(ctx)
c.Assert(err, Not(IsNil))
c.Assert(pcp.inWaitForPodReadyPodName, Equals, podControllerPodName)
c.Assert(pcp.inWaitForPodReadyNamespace, Equals, podControllerNS)
c.Assert(errors.Is(err, pcp.waitForPodReadyErr), Equals, true)
c.Assert(err.Error(), Equals, fmt.Sprintf("Pod failed to become ready in time: %s", simulatedError.Error()))
// Check that POD deletion was also invoked with expected arguments
Expand Down Expand Up @@ -304,6 +318,11 @@ func (s *PodControllerTestSuite) TestPodControllerGetCommandExecutorAndFileWrite
ObjectMeta: metav1.ObjectMeta{
Name: podControllerPodName,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "some-test-pod"},
},
},
}
err := pc.StartPod(ctx)
c.Assert(err, IsNil)
Expand Down
16 changes: 14 additions & 2 deletions pkg/kube/pod_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
// PodRunner allows us to start / stop pod, write file to pod and execute command within it
type PodRunner interface {
Run(ctx context.Context, fn func(context.Context, *v1.Pod) (map[string]interface{}, error)) (map[string]interface{}, error)
// RunEx utilizes the PodController interface and forwards it to the functor, simplifying the manipulation with
// particular pod from the functor.
// TODO: Since significant number of functions are currently using PodRunner, we'll keep Run for now.
// However, once all these functions have been refactored to use PodController,
// Run should be removed and RunEx has to be renamed to Run.
RunEx(ctx context.Context, fn func(context.Context, PodController) (map[string]interface{}, error)) (map[string]interface{}, error)
}

// PodRunner specifies Kubernetes Client and PodOptions needed for creating Pod
Expand All @@ -45,11 +51,17 @@ func NewPodRunner(cli kubernetes.Interface, options *PodOptions) PodRunner {

// Run will create a new Pod based on PodRunner contents and execute the given function
func (p *podRunner) Run(ctx context.Context, fn func(context.Context, *v1.Pod) (map[string]interface{}, error)) (map[string]interface{}, error) {
return p.RunEx(ctx, func(innerCtx context.Context, pc PodController) (map[string]interface{}, error) {
return fn(innerCtx, pc.Pod())
})
}

// RunEx will create a new Pod based on PodRunner contents and execute the given function
func (p *podRunner) RunEx(ctx context.Context, fn func(context.Context, PodController) (map[string]interface{}, error)) (map[string]interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := p.pc.StartPod(ctx)

if err != nil {
return nil, errors.Wrap(err, "Failed to create pod")
}
Expand All @@ -64,5 +76,5 @@ func (p *podRunner) Run(ctx context.Context, fn func(context.Context, *v1.Pod) (
log.WithError(err).Print("Failed to delete pod", field.M{"PodName": pod.Name})
}
}()
return fn(ctx, pod)
return fn(ctx, p.pc)
}
Loading

0 comments on commit 2babd1a

Please sign in to comment.