Skip to content

Commit

Permalink
Extend pod runner (#1925)
Browse files Browse the repository at this point in the history
* Make PodWriter an interface (#1899)

* Pod controller interface (#1900)

* Introduce interfaces for extended POD manipulation

* Update copyright year

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Fix spelling

* Describe purpose of interface

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Pod controller implementation (#1901)

* Implement PodController interface

* Add `corev1` alias

* Fix variable name.

* Fix naming

* Rename errors and minor rewording.

* Update copyright year and spelling.

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Address review note

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Pod file writer implementation (#1902)

* Implement PodFileWriter interface

* Fix naming

* Rename `pfwp` property to `fileWriterProcessor` to make it more understandable.

* Update copyright year

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Pod command executor implementation (#1903)

* Implement PodController interface

* Implement PodCommandExecutor interface

* Update copyright year

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>

* Refactor pod runner (#1904)

* Introduce interfaces for extended POD manipulation

* Refactor PodRunner, use PodController under the hood

---------

Co-authored-by: Pavan Navarathna <6504783+pavannd1@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 23, 2023
1 parent 9bef15f commit 34de811
Show file tree
Hide file tree
Showing 11 changed files with 1,153 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pkg/function/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ValidateProfile(profile *param.Profile) error {
}

// GetPodWriter creates a file with Google credentials if the given profile points to a GCS location
func GetPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (*kube.PodWriter, error) {
func GetPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podName, containerName string, profile *param.Profile) (kube.PodWriter, error) {
if profile.Location.Type == crv1alpha1.LocationTypeGCS {
pw := kube.NewPodWriter(cli, consts.GoogleCloudCredsFilePath, bytes.NewBufferString(profile.Credential.KeyPair.Secret))
if err := pw.Write(ctx, namespace, podName, containerName); err != nil {
Expand All @@ -112,7 +112,7 @@ func GetPodWriter(cli kubernetes.Interface, ctx context.Context, namespace, podN
}

// CleanUpCredsFile is used to remove the file created by the given PodWriter
func CleanUpCredsFile(ctx context.Context, pw *kube.PodWriter, namespace, podName, containerName string) {
func CleanUpCredsFile(ctx context.Context, pw kube.PodWriter, namespace, podName, containerName string) {
if pw != nil {
if err := pw.Remove(ctx, namespace, podName, containerName); err != nil {
log.Error().WithContext(ctx).Print("Could not delete the temp file")
Expand Down
76 changes: 76 additions & 0 deletions pkg/kube/pod_command_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"context"
"io"

"k8s.io/client-go/kubernetes"
)

// PodCommandExecutor allows us to execute command within the pod
type PodCommandExecutor interface {
Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error
}

// podCommandExecutorProcessor aids in unit testing
type podCommandExecutorProcessor interface {
execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error)
}

// podCommandExecutor keeps everything required to execute command within a pod
type podCommandExecutor struct {
cli kubernetes.Interface
namespace string
podName string
containerName string

pcep podCommandExecutorProcessor
}

// Exec runs the command and logs stdout and stderr.
func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error {
var (
opts = ExecOptions{
Command: command,
Namespace: p.namespace,
PodName: p.podName,
ContainerName: p.containerName,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
}
cmdDone = make(chan struct{})
err error
)

go func() {
_, _, err = p.pcep.execWithOptions(p.cli, opts)
close(cmdDone)
}()

select {
case <-ctx.Done():
err = ctx.Err()
case <-cmdDone:
}

return err
}

func (p *podCommandExecutor) execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error) {
return ExecWithOptions(p.cli, opts)
}
207 changes: 207 additions & 0 deletions pkg/kube/pod_command_executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2023 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kube

import (
"bytes"
"context"
"os"
"sync"
"time"

"github.com/pkg/errors"
. "gopkg.in/check.v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)

type PodCommandExecutorTestSuite struct{}

var _ = Suite(&PodCommandExecutorTestSuite{})

const (
podCommandExecutorNS = "pod-runner-test"
podCommandExecutorPodName = "test-pod"
podCommandExecutorContainerName = "test-container"
)

func (s *PodCommandExecutorTestSuite) SetUpSuite(c *C) {
os.Setenv("POD_NAMESPACE", podCommandExecutorNS)
}

// testBarrier supports race-free synchronization between a controller and a background goroutine.
type testBarrier struct {
fgStartedChan chan (struct{})
bgStartedChan chan (struct{})
}

func (s *testBarrier) Setup() {
s.bgStartedChan = make(chan struct{})
s.fgStartedChan = make(chan struct{})
}

func (s *testBarrier) Sync() {
if s.bgStartedChan != nil {
<-s.bgStartedChan
close(s.fgStartedChan)
}
}

func (s *testBarrier) SyncWithController() { // background method
if s.bgStartedChan != nil {
close(s.bgStartedChan)
<-s.fgStartedChan
}
}

type fakePodCommandExecutorProcessor struct {
inExecWithOptionsCli kubernetes.Interface
inExecWithOptionsOpts *ExecOptions
execWithOptionsStdout string
execWithOptionsStderr string
execWithOptionsRet1 string
execWithOptionsRet2 string
execWithOptionsErr error

// Signal to `execWithOptions` to start "executing" command.
// Command will remain "executing" until `execWithOptionsSyncEnd.Sync()`
execWithOptionsSyncStart testBarrier
execWithOptionsSyncEnd testBarrier
}

func (fprp *fakePodCommandExecutorProcessor) execWithOptions(cli kubernetes.Interface, opts ExecOptions) (string, string, error) {
fprp.inExecWithOptionsCli = cli
fprp.inExecWithOptionsOpts = &opts
fprp.execWithOptionsSyncStart.SyncWithController()
if opts.Stdout != nil && len(fprp.execWithOptionsStdout) > 0 {
opts.Stdout.Write([]byte(fprp.execWithOptionsStdout)) //nolint: errcheck
}
if opts.Stderr != nil && len(fprp.execWithOptionsStderr) > 0 {
opts.Stderr.Write([]byte(fprp.execWithOptionsStderr)) //nolint: errcheck
}
fprp.execWithOptionsSyncEnd.SyncWithController()

return fprp.execWithOptionsRet1, fprp.execWithOptionsRet2, fprp.execWithOptionsErr
}

func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
ctx := context.Background()
cli := fake.NewSimpleClientset()

//simulatedError := errors.New("SimulatedError")
command := []string{"command", "arg1"}

cases := map[string]func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor){
"Timed out": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) {
var err error
// Prepare context which will timeout immediately
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Millisecond) // timeout within the call
defer cancel()

prp.execWithOptionsSyncStart.Setup()
prp.execWithOptionsSyncEnd.Setup()
var bStdin, bStdout, bStderr bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr)
wg.Done()
}()
// Signal to `Exec` to start "executing" command. Command will remain "executing"
// until `syncEndKubeExecWithOptions.Sync()`, which won't happen until an error is returned
// from `Exec` and `WaitGroup` is released. This guarantees the error returned is from
// the expired Context.
prp.execWithOptionsSyncStart.Sync()
wg.Wait()
// allow the background goroutine to terminate (no-op if not Setup)
prp.execWithOptionsSyncEnd.Sync()

c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, context.DeadlineExceeded), Equals, true)
},
"Cancelled": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) {
var err error
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*100)
cancel() // prepare cancelled context
prp.execWithOptionsSyncStart.Setup()
prp.execWithOptionsSyncEnd.Setup()

var bStdin, bStdout, bStderr bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr)
wg.Done()
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release execWithOptions

c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, context.Canceled), Equals, true)
},
"Successfull execution": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) {
var err error
prp.execWithOptionsStdout = "{\"where\":\"standard output\"}\n{\"what\":\"output json\"}"
prp.execWithOptionsStderr = "{\"where\":\"standard error\"}\n{\"what\":\"error json\"}"
expStdout := prp.execWithOptionsStdout
expStderr := prp.execWithOptionsStderr

var bStdin, bStdout, bStderr bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr)
wg.Done()
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release execWithOptions

c.Assert(err, IsNil)
c.Assert(prp.inExecWithOptionsCli, Equals, cli)
c.Assert(prp.inExecWithOptionsOpts, DeepEquals, &ExecOptions{
Command: command,
Namespace: podCommandExecutorNS,
PodName: podCommandExecutorPodName,
ContainerName: podCommandExecutorContainerName,
Stdin: &bStdin,
Stdout: &bStdout,
Stderr: &bStderr,
})
c.Assert(bStdout.Len() > 0, Equals, true)
c.Assert(bStderr.Len() > 0, Equals, true)
c.Assert(bStdout.String(), Equals, expStdout)
c.Assert(bStderr.String(), Equals, expStderr)
},
}

for l, tc := range cases {
c.Log(l)
prp := &fakePodCommandExecutorProcessor{}

pr := &podCommandExecutor{
cli: cli,
namespace: podCommandExecutorNS,
podName: podCommandExecutorPodName,
containerName: podCommandExecutorContainerName,
pcep: prp,
}

tc(ctx, pr, prp)
}
}
Loading

0 comments on commit 34de811

Please sign in to comment.