Skip to content

Commit

Permalink
Provide a way to use PodController object to create PodRunner object (#…
Browse files Browse the repository at this point in the history
…2229)

* add comments to explain the issue better

* implement feedback

* revert unnecessary changes

* export fakePodControllerProcessor

* separate fake pod controller processor fromt the test

* move fake controller processor to a separate file

* export struct fields to be used outside

* export functions to be used outside

* pull out fake PodCommandExecutorProcessor into a separate file

* export pod command executor processor field

* create getter for pod command executor

* add podCommandExecutorProcessor to podController

* add fake pod controller and fake pod command executor

* export fields of fake pod controller

* export fields of fake pod controller

* revert redundant changes

* remove commented code

* add a new constructor for pod controller

* add a new constructor for pod controller

* address feedback

* Apply suggestions from code review

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* address feedbacl

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>
  • Loading branch information
chaitalisg and pavannd1 committed Aug 10, 2023
1 parent ae250ce commit ae09b1f
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 115 deletions.
42 changes: 42 additions & 0 deletions pkg/kube/fake_pod_command_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"context"
"io"
)

type FakePodCommandExecutor struct {
ExecErr error
inExecCommand []string

ExecStdout string
ExecStderr string
}

// Exec
func (fce *FakePodCommandExecutor) Exec(_ context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error {
fce.inExecCommand = make([]string, len(command))
copy(fce.inExecCommand, command)
if stdout != nil && len(fce.ExecStdout) > 0 {
stdout.Write([]byte(fce.ExecStdout)) //nolint: errcheck
}
if stderr != nil && len(fce.ExecStderr) > 0 {
stderr.Write([]byte(fce.ExecStderr)) //nolint: errcheck
}

return fce.ExecErr
}
124 changes: 124 additions & 0 deletions pkg/kube/fake_pod_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"context"
"io"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
)

type FakePodController struct {
Podname string
PodObj *corev1.Pod

StartPodCalled bool
StartPodErr error

WaitForPodReadyCalled bool
WaitForPodReadyErr error

GetCommandExecutorRet PodCommandExecutor
GetCommandExecutorErr error

GetFileWriterCalled bool
GetFileWriterRet *FakePodFileWriter
GetFileWriterErr error

StopPodCalled bool
StopPodErr error
InStopPodStopTimeout time.Duration
InStopPodGracePeriod int64
}

func (fpc *FakePodController) Pod() *corev1.Pod {
return nil
}

func (fpc *FakePodController) PodName() string {
return fpc.Podname
}

func (fpc *FakePodController) Run(ctx context.Context, fn func(context.Context, *corev1.Pod) (map[string]interface{}, error)) (map[string]interface{}, error) {
return nil, errors.New("Not implemented")
}

func (fpc *FakePodController) StartPod(_ context.Context) error {
fpc.StartPodCalled = true
return fpc.StartPodErr
}

func (fpc *FakePodController) WaitForPodReady(_ context.Context) error {
fpc.WaitForPodReadyCalled = true
return fpc.WaitForPodReadyErr
}

func (fpc *FakePodController) WaitForPodCompletion(_ context.Context) error {
return errors.New("Not implemented")
}

func (fpc *FakePodController) StreamPodLogs(_ context.Context) (io.ReadCloser, error) {
return nil, errors.New("Not implemented")
}

func (fpc *FakePodController) GetCommandExecutor() (PodCommandExecutor, error) {
return fpc.GetCommandExecutorRet, fpc.GetCommandExecutorErr
}

func (fpc *FakePodController) GetFileWriter() (PodFileWriter, error) {
fpc.GetFileWriterCalled = true
return fpc.GetFileWriterRet, fpc.GetFileWriterErr
}

func (fpc *FakePodController) StopPod(ctx context.Context, stopTimeout time.Duration, gracePeriodSeconds int64) error {
fpc.StopPodCalled = true
fpc.InStopPodStopTimeout = stopTimeout
fpc.InStopPodGracePeriod = gracePeriodSeconds
return fpc.StopPodErr
}

type FakePodFileWriter struct {
writeCalled bool
writeErr error
writeRet *FakePodFileRemover
inWriteFilePath string
inWriteContent io.Reader
}

func (fpfw *FakePodFileWriter) Write(_ context.Context, filePath string, content io.Reader) (PodFileRemover, error) {
fpfw.writeCalled = true
fpfw.inWriteFilePath = filePath
fpfw.inWriteContent = content

return PodFileRemover(fpfw.writeRet), fpfw.writeErr
}

type FakePodFileRemover struct {
removeCalled bool
removeErr error
path string
}

func (fr *FakePodFileRemover) Remove(_ context.Context) error {
fr.removeCalled = true
return fr.removeErr
}

func (fr *FakePodFileRemover) Path() string {
return fr.path
}
70 changes: 70 additions & 0 deletions pkg/kube/fake_pod_controller_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// FakePodControllerProcessor implements podControllerProcessor
type FakePodControllerProcessor struct {
InWaitForPodReadyNamespace string
InWaitForPodReadyPodName string
WaitForPodReadyErr error

InWaitForPodCompletionNamespace string
InWaitForPodCompletionPodName string
WaitForPodCompletionErr error

InDeletePodNamespace string
InDeletePodPodName string
InDeletePodOptions metav1.DeleteOptions
DeletePodErr error

InCreatePodCli kubernetes.Interface
InCreatePodOptions *PodOptions
CreatePodRet *corev1.Pod
CreatePodErr error
}

func (f *FakePodControllerProcessor) CreatePod(_ context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error) {
f.InCreatePodCli = cli
f.InCreatePodOptions = options
return f.CreatePodRet, f.CreatePodErr
}

func (f *FakePodControllerProcessor) WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error {
f.InWaitForPodCompletionNamespace = namespace
f.InWaitForPodCompletionPodName = podName
return f.WaitForPodCompletionErr
}

func (f *FakePodControllerProcessor) WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error {
f.InWaitForPodReadyPodName = podName
f.InWaitForPodReadyNamespace = namespace
return f.WaitForPodReadyErr
}

func (f *FakePodControllerProcessor) DeletePod(_ context.Context, namespace string, podName string, opts metav1.DeleteOptions) error {
f.InDeletePodNamespace = namespace
f.InDeletePodPodName = podName
f.InDeletePodOptions = opts

return f.DeletePodErr
}
41 changes: 28 additions & 13 deletions pkg/kube/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ type PodController interface {

// podControllerProcessor aids in unit testing
type podControllerProcessor interface {
createPod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error)
waitForPodReady(ctx context.Context, namespace, podName string) error
waitForPodCompletion(ctx context.Context, namespace, podName string) error
deletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error
CreatePod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error)
WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error
WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error
DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error
}

// podController specifies Kubernetes Client and PodOptions needed for creating Pod
// podController specifies Kubernetes Client and PodOptions needed for creating
// a Pod. It implements the podControllerProcessor interface.
type podController struct {
cli kubernetes.Interface
podOptions *PodOptions
Expand Down Expand Up @@ -97,6 +98,20 @@ func NewPodController(cli kubernetes.Interface, options *PodOptions, opts ...Pod
return r
}

// NewPodControllerForExistingPod returns a new PodController given Kubernetes
// Client and existing pod details.
func NewPodControllerForExistingPod(cli kubernetes.Interface, pod *corev1.Pod) PodController {
r := &podController{
cli: cli,
pod: pod,
podName: pod.Name,
}

r.pcp = r

return r
}

func (p *podController) PodName() string {
return p.podName
}
Expand All @@ -115,7 +130,7 @@ func (p *podController) StartPod(ctx context.Context) error {
return errors.Wrap(ErrPodControllerNotInitialized, "Failed to create pod")
}

pod, err := p.pcp.createPod(ctx, p.cli, p.podOptions)
pod, err := p.pcp.CreatePod(ctx, p.cli, p.podOptions)
if err != nil {
log.WithError(err).Print("Failed to create pod", field.M{"PodName": p.podOptions.Name, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Failed to create pod")
Expand All @@ -133,7 +148,7 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {
return ErrPodControllerPodNotStarted
}

if err := p.pcp.waitForPodReady(ctx, p.pod.Namespace, p.pod.Name); err != nil {
if err := p.pcp.WaitForPodReadyPCP(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to become ready in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to become ready in time")
}
Expand All @@ -153,7 +168,7 @@ func (p *podController) WaitForPodCompletion(ctx context.Context) error {
return ErrPodControllerPodNotReady
}

if err := p.pcp.waitForPodCompletion(ctx, p.pod.Namespace, p.pod.Name); err != nil {
if err := p.pcp.WaitForPodCompletionPCP(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to complete in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to complete in time")
}
Expand All @@ -177,7 +192,7 @@ func (p *podController) StopPod(ctx context.Context, stopTimeout time.Duration,
defer cancel()
}

if err := p.pcp.deletePod(ctx, p.pod.Namespace, p.podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds}); err != nil {
if err := p.pcp.DeletePod(ctx, p.pod.Namespace, p.podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds}); err != nil {
log.WithError(err).Print("Failed to delete pod", field.M{"PodName": p.podName, "Namespace": p.pod.Namespace})
return err
}
Expand Down Expand Up @@ -245,21 +260,21 @@ func (p *podController) GetFileWriter() (PodFileWriter, error) {
}

// This is wrapped for unit testing.
func (p *podController) createPod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error) {
func (p *podController) CreatePod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error) {
return CreatePod(ctx, cli, options)
}

// This is wrapped for unit testing.
func (p *podController) waitForPodReady(ctx context.Context, namespace, podName string) error {
func (p *podController) WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error {
return WaitForPodReady(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing
func (p *podController) waitForPodCompletion(ctx context.Context, namespace, podName string) error {
func (p *podController) WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error {
return WaitForPodCompletion(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing.
func (p *podController) deletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error {
func (p *podController) DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error {
return p.cli.CoreV1().Pods(namespace).Delete(ctx, podName, opts)
}
Loading

0 comments on commit ae09b1f

Please sign in to comment.