Skip to content

Commit

Permalink
Pod controller change stop signature (#2056)
Browse files Browse the repository at this point in the history
* When WaitingForPodReady we should not stop pod.

* Change StartPod and StopPod signatures. Timeout and grace period should be passed to StopPod only.

c

* Update PodRunner according to changed signature of StartPod and StopPod
  • Loading branch information
e-sumin committed May 11, 2023
1 parent 9ce4426 commit 8a60b23
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 57 deletions.
27 changes: 11 additions & 16 deletions pkg/kube/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ var (
type PodController interface {
PodName() string
Pod() *corev1.Pod
StartPod(ctx context.Context, stopTimeout time.Duration) error
StartPod(ctx context.Context) error
WaitForPodReady(ctx context.Context) error
StopPod(ctx context.Context) error
StopPod(ctx context.Context, timeout time.Duration, gracePeriodSeconds int64) error
GetCommandExecutor() (PodCommandExecutor, error)
GetFileWriter() (PodFileWriter, error)
}
Expand All @@ -59,10 +59,9 @@ type podController struct {
cli kubernetes.Interface
podOptions *PodOptions

pod *corev1.Pod
podReady bool
podName string
stopTimeout time.Duration
pod *corev1.Pod
podReady bool
podName string

pcp podControllerProcessor
}
Expand Down Expand Up @@ -101,8 +100,7 @@ func (p *podController) Pod() *corev1.Pod {
}

// StartPod creates pod and in case of success, it stores pod name for further use.
// stopTimeout is also stored and will be used when StopPod will be called
func (p *podController) StartPod(ctx context.Context, stopTimeout time.Duration) error {
func (p *podController) StartPod(ctx context.Context) error {
if p.podName != "" {
return errors.Wrap(ErrPodControllerPodAlreadyStarted, "Failed to create pod")
}
Expand All @@ -119,7 +117,6 @@ func (p *podController) StartPod(ctx context.Context, stopTimeout time.Duration)

p.pod = pod
p.podName = pod.Name
p.stopTimeout = stopTimeout

return nil
}
Expand All @@ -132,7 +129,6 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {

if err := p.pcp.waitForPodReady(ctx, p.podName); err != nil {
log.WithError(err).Print("Pod failed to become ready in time", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
_ = p.StopPod(ctx) // best-effort
return errors.Wrap(err, "Pod failed to become ready in time")
}

Expand All @@ -142,20 +138,19 @@ func (p *podController) WaitForPodReady(ctx context.Context) error {
}

// StopPod stops the pod which was previously started, otherwise it will return ErrPodControllerPodNotStarted error.
// stopTimeout passed to Start will be used
func (p *podController) StopPod(ctx context.Context) error {
// stopTimeout is used to limit execution time
// gracePeriodSeconds is used to specify pod deletion grace period. If set to zero, pod should be deleted immediately
func (p *podController) StopPod(ctx context.Context, stopTimeout time.Duration, gracePeriodSeconds int64) error {
if p.podName == "" {
return ErrPodControllerPodNotStarted
}

if p.stopTimeout != PodControllerInfiniteStopTime {
if stopTimeout != PodControllerInfiniteStopTime {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, p.stopTimeout)
ctx, cancel = context.WithTimeout(ctx, stopTimeout)
defer cancel()
}

gracePeriodSeconds := int64(0) // force immediate deletion

if err := p.pcp.deletePod(ctx, p.podOptions.Namespace, p.podName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds}); err != nil {
log.WithError(err).Print("Failed to delete pod", field.M{"PodName": p.podName, "Namespace": p.podOptions.Namespace})
return err
Expand Down
52 changes: 13 additions & 39 deletions pkg/kube/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *PodControllerTestSuite) TestPodControllerStartPod(c *C) {
cases := map[string]func(prp *fakePodControllerProcessor, pr PodController){
"Pod creation failure": func(pcp *fakePodControllerProcessor, pc PodController) {
pcp.createPodErr = simulatedError
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, simulatedError), Equals, true)
c.Assert(pcp.inCreatePodCli, Equals, cli)
Expand All @@ -99,7 +99,7 @@ func (s *PodControllerTestSuite) TestPodControllerStartPod(c *C) {
Name: podControllerPodName,
},
}
err := pr.StartPod(ctx, 30*time.Second)
err := pr.StartPod(ctx)
c.Assert(err, IsNil)
c.Assert(pr.PodName(), Equals, podControllerPodName)
},
Expand All @@ -110,7 +110,7 @@ func (s *PodControllerTestSuite) TestPodControllerStartPod(c *C) {
},
}

err := pr.StartPod(ctx, 30*time.Second)
err := pr.StartPod(ctx)
c.Assert(err, IsNil)
c.Assert(prp.inCreatePodCli, Equals, cli)

Expand All @@ -119,7 +119,7 @@ func (s *PodControllerTestSuite) TestPodControllerStartPod(c *C) {
prp.createPodRet = nil
prp.createPodErr = errors.New("CreatePod should not be invoked")

err = pr.StartPod(ctx, 30*time.Second)
err = pr.StartPod(ctx)
c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, ErrPodControllerPodAlreadyStarted), Equals, true)
c.Assert(prp.inCreatePodCli, IsNil)
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *PodControllerTestSuite) TestPodControllerWaitPod(c *C) {
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)

pcp.waitForPodReadyErr = simulatedError
Expand All @@ -171,40 +171,14 @@ func (s *PodControllerTestSuite) TestPodControllerWaitPod(c *C) {
c.Assert(errors.Is(err, pcp.waitForPodReadyErr), Equals, true)
c.Assert(err.Error(), Equals, fmt.Sprintf("Pod failed to become ready in time: %s", simulatedError.Error()))
// Check that POD deletion was also invoked with expected arguments
c.Assert(pcp.inDeletePodPodName, Equals, podControllerPodName)
c.Assert(pcp.inDeletePodNamespace, Equals, podControllerNS)
gracePeriodSeconds := int64(0)
c.Assert(pcp.inDeletePodOptions, DeepEquals, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
},
"Waiting failure returned even if pod deletion failed too": func(pcp *fakePodControllerProcessor, pc PodController) { // TODO(e-sumin): Both errors should be returned
pcp.createPodRet = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
c.Assert(err, IsNil)

pcp.waitForPodReadyErr = simulatedError
pcp.deletePodErr = errors.New("This error should not be returned")
err = pc.WaitForPodReady(ctx)
c.Assert(err, Not(IsNil))
c.Assert(pcp.inWaitForPodReadyPodName, Equals, podControllerPodName)
c.Assert(errors.Is(err, pcp.waitForPodReadyErr), Equals, true)
c.Assert(err.Error(), Equals, fmt.Sprintf("Pod failed to become ready in time: %s", simulatedError.Error()))
// Check that POD deletion was invoked
c.Assert(pcp.inDeletePodPodName, Equals, podControllerPodName)
c.Assert(pcp.inDeletePodNamespace, Equals, podControllerNS)
gracePeriodSeconds := int64(0)
c.Assert(pcp.inDeletePodOptions, DeepEquals, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
},
"Waiting succeeded": func(pcp *fakePodControllerProcessor, pc PodController) {
pcp.createPodRet = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)
err = pc.WaitForPodReady(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -234,7 +208,7 @@ func (s *PodControllerTestSuite) TestPodControllerStopPod(c *C) {

cases := map[string]func(pcp *fakePodControllerProcessor, pc PodController){
"Pod not started yet": func(pcp *fakePodControllerProcessor, pc PodController) {
err := pc.StopPod(ctx)
err := pc.StopPod(ctx, 30*time.Second, int64(0))
c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, ErrPodControllerPodNotStarted), Equals, true)
c.Assert(pcp.inDeletePodPodName, Equals, untouchedStr)
Expand All @@ -246,11 +220,11 @@ func (s *PodControllerTestSuite) TestPodControllerStopPod(c *C) {
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)

pcp.deletePodErr = simulatedError
err = pc.StopPod(ctx)
err = pc.StopPod(ctx, 30*time.Second, int64(0))
c.Assert(err, Not(IsNil))
c.Assert(errors.Is(err, simulatedError), Equals, true)
},
Expand All @@ -260,10 +234,10 @@ func (s *PodControllerTestSuite) TestPodControllerStopPod(c *C) {
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)

err = pc.StopPod(ctx)
err = pc.StopPod(ctx, 30*time.Second, int64(0))
c.Assert(err, IsNil)
c.Assert(pcp.inDeletePodPodName, Equals, podControllerPodName)
c.Assert(pcp.inDeletePodNamespace, Equals, podControllerNS)
Expand Down Expand Up @@ -312,7 +286,7 @@ func (s *PodControllerTestSuite) TestPodControllerGetCommandExecutorAndFileWrite
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)

pce, err := pc.GetCommandExecutor()
Expand All @@ -331,7 +305,7 @@ func (s *PodControllerTestSuite) TestPodControllerGetCommandExecutorAndFileWrite
Name: podControllerPodName,
},
}
err := pc.StartPod(ctx, 30*time.Second)
err := pc.StartPod(ctx)
c.Assert(err, IsNil)

err = pc.WaitForPodReady(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kube/pod_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *podRunner) Run(ctx context.Context, fn func(context.Context, *v1.Pod) (
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := p.pc.StartPod(ctx, PodControllerInfiniteStopTime)
err := p.pc.StartPod(ctx)

if err != nil {
return nil, errors.Wrap(err, "Failed to create pod")
Expand All @@ -59,7 +59,7 @@ func (p *podRunner) Run(ctx context.Context, fn func(context.Context, *v1.Pod) (
ctx = field.Context(ctx, consts.ContainerNameKey, pod.Spec.Containers[0].Name)
go func() {
<-ctx.Done()
err := p.pc.StopPod(context.Background())
err := p.pc.StopPod(context.Background(), PodControllerInfiniteStopTime, int64(0))
if err != nil {
log.WithError(err).Print("Failed to delete pod", field.M{"PodName": pod.Name})
}
Expand Down

0 comments on commit 8a60b23

Please sign in to comment.