Skip to content

Commit

Permalink
Extract PodControllerProcessor interface and default implementation (#…
Browse files Browse the repository at this point in the history
…2297)

* Extract PodControllerProcessor interface and default implementation

* Extract PodCommandExecutorProcessor interface and default implementation

* Extract PodFileWriterProcessor interface and default implementation

* Apply suggestions from code review - copyright header

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Apply suggestions from code review - metav1

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 14, 2023
1 parent f93f456 commit dbae8b8
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 86 deletions.
11 changes: 4 additions & 7 deletions pkg/kube/fake_pod_controller_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// FakePodControllerProcessor implements podControllerProcessor
// FakePodControllerProcessor implements PodControllerProcessor
type FakePodControllerProcessor struct {
InWaitForPodReadyNamespace string
InWaitForPodReadyPodName string
Expand All @@ -37,25 +36,23 @@ type FakePodControllerProcessor struct {
InDeletePodOptions metav1.DeleteOptions
DeletePodErr error

InCreatePodCli kubernetes.Interface
InCreatePodOptions *PodOptions
CreatePodRet *corev1.Pod
CreatePodErr error
}

func (f *FakePodControllerProcessor) CreatePod(_ context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error) {
f.InCreatePodCli = cli
func (f *FakePodControllerProcessor) CreatePod(_ context.Context, options *PodOptions) (*corev1.Pod, error) {
f.InCreatePodOptions = options
return f.CreatePodRet, f.CreatePodErr
}

func (f *FakePodControllerProcessor) WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error {
func (f *FakePodControllerProcessor) WaitForPodCompletion(_ context.Context, namespace, podName string) error {
f.InWaitForPodCompletionNamespace = namespace
f.InWaitForPodCompletionPodName = podName
return f.WaitForPodCompletionErr
}

func (f *FakePodControllerProcessor) WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error {
func (f *FakePodControllerProcessor) WaitForPodReady(_ context.Context, namespace, podName string) error {
f.InWaitForPodReadyPodName = podName
f.InWaitForPodReadyNamespace = namespace
return f.WaitForPodReadyErr
Expand Down
13 changes: 2 additions & 11 deletions pkg/kube/pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,14 @@ type PodCommandExecutor interface {
Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error
}

// podCommandExecutorProcessor aids in unit testing
type podCommandExecutorProcessor interface {
execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error)
}

// podCommandExecutor keeps everything required to execute command within a pod
type podCommandExecutor struct {
cli kubernetes.Interface
namespace string
podName string
containerName string

pcep podCommandExecutorProcessor
pcep PodCommandExecutorProcessor
}

// Exec runs the command and logs stdout and stderr.
Expand Down Expand Up @@ -95,7 +90,7 @@ func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin i
}

go func() {
_, _, err = p.pcep.execWithOptions(p.cli, opts)
_, _, err = p.pcep.ExecWithOptions(opts)
close(cmdDone)
}()

Expand All @@ -114,7 +109,3 @@ func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin i

return err
}

func (p *podCommandExecutor) execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error) {
return ExecWithOptions(p.cli, opts)
}
35 changes: 35 additions & 0 deletions pkg/kube/pod_command_executor_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"k8s.io/client-go/kubernetes"
)

// PodCommandExecutorProcessor is an interface wrapping kubernetes API invocation
// it is purposed to be replaced by fake implementation in tests
type PodCommandExecutorProcessor interface {
ExecWithOptions(opts ExecOptions) (string, string, error)
}

type podCommandExecutorProcessor struct {
cli kubernetes.Interface
}

// ExecWithOptions executes a command in the specified pod and container,
// returning stdout, stderr and error.
func (p *podCommandExecutorProcessor) ExecWithOptions(opts ExecOptions) (string, string, error) {
return ExecWithOptions(p.cli, opts)
}
14 changes: 5 additions & 9 deletions pkg/kube/pod_command_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/pkg/errors"
. "gopkg.in/check.v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)

Expand Down Expand Up @@ -69,22 +68,20 @@ func (s *testBarrier) SyncWithController() { // background method
}

type fakePodCommandExecutorProcessor struct {
inExecWithOptionsCli kubernetes.Interface
inExecWithOptionsOpts *ExecOptions
execWithOptionsStdout string
execWithOptionsStderr string
execWithOptionsRet1 string
execWithOptionsRet2 string
execWithOptionsErr error

// Signal to `execWithOptions` to start "executing" command.
// Signal to `ExecWithOptions` to start "executing" command.
// Command will remain "executing" until `execWithOptionsSyncEnd.Sync()`
execWithOptionsSyncStart testBarrier
execWithOptionsSyncEnd testBarrier
}

func (fprp *fakePodCommandExecutorProcessor) execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error) {
fprp.inExecWithOptionsCli = cli
func (fprp *fakePodCommandExecutorProcessor) ExecWithOptions(opts ExecOptions) (string, string, error) {
fprp.inExecWithOptionsOpts = &opts
fprp.execWithOptionsSyncStart.SyncWithController()
if opts.Stdout != nil && len(fprp.execWithOptionsStdout) > 0 {
Expand Down Expand Up @@ -150,7 +147,7 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release execWithOptions
prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions

c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, context.Canceled), Equals, true)
Expand All @@ -171,10 +168,9 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release execWithOptions
prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions

c.Assert(err, IsNil)
c.Assert(prp.inExecWithOptionsCli, Equals, cli)
c.Assert(prp.inExecWithOptionsOpts.Command, DeepEquals, command)
c.Assert(prp.inExecWithOptionsOpts.Namespace, Equals, podCommandExecutorNS)
c.Assert(prp.inExecWithOptionsOpts.PodName, Equals, podCommandExecutorPodName)
Expand Down Expand Up @@ -214,7 +210,7 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release execWithOptions
prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions

c.Assert(err, Not(IsNil))
c.Assert(prp.inExecWithOptionsOpts.Stdout, Not(IsNil))
Expand Down
62 changes: 22 additions & 40 deletions pkg/kube/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,6 @@ type PodController interface {
GetFileWriter() (PodFileWriter, error)
}

// podControllerProcessor aids in unit testing
type podControllerProcessor interface {
CreatePod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error)
WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error
WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error
DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error
}

// podController specifies Kubernetes Client and PodOptions needed for creating
// a Pod. It implements the podControllerProcessor interface.
type podController struct {
Expand All @@ -70,13 +62,13 @@ type podController struct {
podReady bool
podName string

pcp podControllerProcessor
pcp PodControllerProcessor
}

type PodControllerOption func(p *podController)

// WithPodControllerProcessor provides mechanism for passing fake podControllerProcessor for testing purposes.
func WithPodControllerProcessor(processor podControllerProcessor) PodControllerOption {
func WithPodControllerProcessor(processor PodControllerProcessor) PodControllerOption {
return func(p *podController) {
p.pcp = processor
}
Expand All @@ -89,20 +81,28 @@ func NewPodController(cli kubernetes.Interface, options *PodOptions, opts ...Pod
podOptions: options,
}

r.pcp = r

for _, opt := range opts {
opt(r)
}

// If pod controller processor has not been set by PodControllerOption, we create default implementation here
if r.pcp == nil {
r.pcp = &podControllerProcessor{
cli: cli,
}
}

return r
}

// NewPodControllerForExistingPod returns a new PodController given Kubernetes
// Client and existing pod details.
func NewPodControllerForExistingPod(cli kubernetes.Interface, pod *corev1.Pod) PodController {
r := &podController{
cli: cli,
cli: cli,
pcp: &podControllerProcessor{
cli: cli,
},
pod: pod,
podName: pod.Name,
}
Expand All @@ -114,8 +114,6 @@ func NewPodControllerForExistingPod(cli kubernetes.Interface, pod *corev1.Pod) P
}
r.podOptions = options

r.pcp = r

return r
}

Expand All @@ -137,7 +135,7 @@ func (p *podController) StartPod(ctx context.Context) error {
return errors.Wrap(ErrPodControllerNotInitialized, "Failed to create pod")
}

pod, err := p.pcp.CreatePod(ctx, p.cli, p.podOptions)
pod, err := p.pcp.CreatePod(ctx, p.podOptions)
if err != nil {
log.WithError(err).Print("Failed to create pod", field.M{"PodName": p.podOptions.Name, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Failed to create pod")
Expand All @@ -155,7 +153,7 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {
return ErrPodControllerPodNotStarted
}

if err := p.pcp.WaitForPodReadyPCP(ctx, p.pod.Namespace, p.pod.Name); err != nil {
if err := p.pcp.WaitForPodReady(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to become ready in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to become ready in time")
}
Expand All @@ -175,7 +173,7 @@ func (p *podController) WaitForPodCompletion(ctx context.Context) error {
return ErrPodControllerPodNotReady
}

if err := p.pcp.WaitForPodCompletionPCP(ctx, p.pod.Namespace, p.pod.Name); err != nil {
if err := p.pcp.WaitForPodCompletion(ctx, p.pod.Namespace, p.pod.Name); err != nil {
log.WithError(err).Print("Pod failed to complete in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return errors.Wrap(err, "Pod failed to complete in time")
}
Expand Down Expand Up @@ -249,7 +247,9 @@ func (p *podController) GetCommandExecutor() (PodCommandExecutor, error) {
containerName: p.getContainerName(),
}

pce.pcep = pce
pce.pcep = &podCommandExecutorProcessor{
cli: p.cli,
}

return pce, nil
}
Expand All @@ -270,27 +270,9 @@ func (p *podController) GetFileWriter() (PodFileWriter, error) {
containerName: p.getContainerName(),
}

pfw.fileWriterProcessor = pfw
pfw.fileWriterProcessor = &podFileWriterProcessor{
cli: p.cli,
}

return pfw, nil
}

// This is wrapped for unit testing.
func (p *podController) CreatePod(ctx context.Context, cli kubernetes.Interface, options *PodOptions) (*corev1.Pod, error) {
return CreatePod(ctx, cli, options)
}

// This is wrapped for unit testing.
func (p *podController) WaitForPodReadyPCP(ctx context.Context, namespace, podName string) error {
return WaitForPodReady(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing
func (p *podController) WaitForPodCompletionPCP(ctx context.Context, namespace, podName string) error {
return WaitForPodCompletion(ctx, p.cli, namespace, podName)
}

// This is wrapped for unit testing.
func (p *podController) DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error {
return p.cli.CoreV1().Pods(namespace).Delete(ctx, podName, opts)
}
51 changes: 51 additions & 0 deletions pkg/kube/pod_controller_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube

import (
"context"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// PodControllerProcessor is an interface wrapping kubernetes API invocation
// it is purposed to be replaced by fake implementation in tests
type PodControllerProcessor interface {
CreatePod(ctx context.Context, options *PodOptions) (*v1.Pod, error)
WaitForPodReady(ctx context.Context, namespace, podName string) error
WaitForPodCompletion(ctx context.Context, namespace, podName string) error
DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error
}

type podControllerProcessor struct {
cli kubernetes.Interface
}

func (p *podControllerProcessor) CreatePod(ctx context.Context, options *PodOptions) (*v1.Pod, error) {
return CreatePod(ctx, p.cli, options)
}

func (p *podControllerProcessor) WaitForPodReady(ctx context.Context, namespace, podName string) error {
return WaitForPodReady(ctx, p.cli, namespace, podName)
}

func (p *podControllerProcessor) WaitForPodCompletion(ctx context.Context, namespace, podName string) error {
return WaitForPodCompletion(ctx, p.cli, namespace, podName)
}

func (p *podControllerProcessor) DeletePod(ctx context.Context, namespace string, podName string, opts metav1.DeleteOptions) error {
return p.cli.CoreV1().Pods(namespace).Delete(ctx, podName, opts)
}
Loading

0 comments on commit dbae8b8

Please sign in to comment.