Skip to content

Commit

Permalink
*: change "Job" RestartPolicy, use "WaitForDeploymentCompletes"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed Jul 12, 2020
1 parent a8a69c5 commit 0d6e2c9
Show file tree
Hide file tree
Showing 15 changed files with 582 additions and 982 deletions.
37 changes: 22 additions & 15 deletions ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,30 +394,30 @@ func (ts *Tester) down() (err error) {
}()

var errs []string

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteSSM [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteSSM(); err != nil {
ts.lg.Warn("deleteSSM failed", zap.Error(err))
errs = append(errs, err.Error())
}

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteASGs [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteASGs(); err != nil {
ts.lg.Warn("deleteASGs failed", zap.Error(err))
errs = append(errs, err.Error())
}

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteKeyPair [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteKeyPair(); err != nil {
ts.lg.Warn("deleteKeyPair failed", zap.Error(err))
errs = append(errs, err.Error())
}

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteRole [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteRole(); err != nil {
ts.lg.Warn("deleteRole failed", zap.Error(err))
errs = append(errs, err.Error())
Expand All @@ -428,16 +428,16 @@ func (ts *Tester) down() (err error) {
ts.lg.Info("sleeping before VPC deletion", zap.Duration("wait", waitDur))
time.Sleep(waitDur)
}

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteVPC [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteVPC(); err != nil {
ts.lg.Warn("deleteVPC failed", zap.Error(err))
errs = append(errs, err.Error())
}

fmt.Fprintf(ts.logWriter, ts.color("\n\n[yellow]*********************************\n"))
fmt.Fprintf(ts.logWriter, ts.color("[light_blue]deleteS3 [default](%q)\n"), ts.cfg.ConfigPath)

if err := ts.deleteS3(); err != nil {
ts.lg.Warn("deleteS3 failed", zap.Error(err))
errs = append(errs, err.Error())
Expand All @@ -449,21 +449,28 @@ func (ts *Tester) down() (err error) {
return ts.cfg.Sync()
}

func catchInterrupt(lg *zap.Logger, stopc chan struct{}, once *sync.Once, sigc chan os.Signal, run func() error) (err error) {
func catchInterrupt(lg *zap.Logger, stopc chan struct{}, stopcCloseOnce *sync.Once, osSigCh chan os.Signal, run func() error) (err error) {
errc := make(chan error)
go func() {
errc <- run()
}()

select {
case <-stopc:
lg.Info("interrupting")
serr := <-errc
lg.Info("interrupted", zap.Error(serr))
err = fmt.Errorf("interrupted (run function returned %v)", serr)
case sig := <-sigc:
once.Do(func() { close(stopc) })
err = fmt.Errorf("received os signal %v, closed stopc (interrupted %v)", sig, <-errc)
case _, ok := <-stopc:
rerr := <-errc
lg.Info("interrupted; stopc received, errc received", zap.Error(rerr))
err = fmt.Errorf("stopc returned, stopc open %v, run function returned %v", ok, rerr)

case osSig := <-osSigCh:
stopcCloseOnce.Do(func() { close(stopc) })
rerr := <-errc
lg.Info("OS signal received, errc received", zap.String("signal", osSig.String()), zap.Error(rerr))
err = fmt.Errorf("received os signal %v, closed stopc, run function returned %v", osSig, rerr)

case err = <-errc:
if err != nil {
err = fmt.Errorf("run function returned %v", err)
}
}
return err
}
202 changes: 64 additions & 138 deletions eks/alb-2048/alb-2048.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,78 +594,41 @@ func (ts *tester) deleteALBDeployment() error {
return ts.cfg.EKSConfig.Sync()
}

func (ts *tester) waitDeploymentALB() error {
ts.cfg.Logger.Info("waiting for ALB Deployment")
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
output, err := exec.New().CommandContext(
func (ts *tester) waitDeploymentALB() (err error) {
timeout := 7*time.Minute + time.Duration(ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicasALB)*time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err = k8s_client.WaitForDeploymentCompletes(
ctx,
ts.cfg.EKSConfig.KubectlPath,
"--kubeconfig="+ts.cfg.EKSConfig.KubeConfigPath,
"--namespace=kube-system",
"describe",
"deployment",
ts.cfg.Logger,
ts.cfg.LogWriter,
ts.cfg.Stopc,
ts.cfg.K8SClient,
time.Minute,
20*time.Second,
"kube-system",
albIngressControllerDeploymentName,
).CombinedOutput()
cancel()
if err != nil {
return fmt.Errorf("'kubectl describe deployment' failed %v", err)
}
out := string(output)
fmt.Fprintf(ts.cfg.LogWriter, "\n\n\"kubectl describe deployment\" output:\n%s\n\n", out)

ready := false
waitDur := 7*time.Minute + time.Duration(ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicasALB)*time.Minute
retryStart := time.Now()
for time.Now().Sub(retryStart) < waitDur {
select {
case <-ts.cfg.Stopc:
return errors.New("check aborted")
case <-time.After(15 * time.Second):
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
dresp, err := ts.cfg.K8SClient.KubernetesClientSet().
AppsV1().
Deployments("kube-system").
Get(ctx, albIngressControllerDeploymentName, metav1.GetOptions{})
cancel()
if err != nil {
return fmt.Errorf("failed to get Deployment (%v)", err)
}
ts.cfg.Logger.Info("get deployment",
zap.Int32("desired-replicas", dresp.Status.Replicas),
zap.Int32("available-replicas", dresp.Status.AvailableReplicas),
zap.Int32("unavailable-replicas", dresp.Status.UnavailableReplicas),
zap.Int32("ready-replicas", dresp.Status.ReadyReplicas),
)
available := false
for _, cond := range dresp.Status.Conditions {
ts.cfg.Logger.Info("condition",
zap.String("last-updated", cond.LastUpdateTime.String()),
zap.String("type", string(cond.Type)),
zap.String("status", string(cond.Status)),
zap.String("reason", cond.Reason),
zap.String("message", cond.Message),
)
if cond.Status != v1.ConditionTrue {
continue
ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicasALB,
k8s_client.WithQueryFunc(func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
output, err := exec.New().CommandContext(
ctx,
ts.cfg.EKSConfig.KubectlPath,
"--kubeconfig="+ts.cfg.EKSConfig.KubeConfigPath,
"--namespace=kube-system",
"describe",
"deployment",
albIngressControllerDeploymentName,
).CombinedOutput()
cancel()
if err != nil {
ts.cfg.Logger.Warn("'kubectl describe deployment' failed", zap.Error(err))
}
if cond.Type == appsv1.DeploymentAvailable {
available = true
break
}
}
if available && dresp.Status.AvailableReplicas >= ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicasALB {
ready = true
break
}
}
if !ready {
return errors.New("deployment not ready")
}

ts.cfg.Logger.Info("waited for ALB Deployment")
return ts.cfg.EKSConfig.Sync()
out := string(output)
fmt.Fprintf(ts.cfg.LogWriter, "\n\n\"kubectl describe deployment\" output:\n%s\n\n", out)
}),
)
cancel()
return err
}

// https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
Expand Down Expand Up @@ -764,78 +727,41 @@ func (ts *tester) delete2048Deployment() error {
return ts.cfg.EKSConfig.Sync()
}

func (ts *tester) waitDeployment2048() error {
ts.cfg.Logger.Info("waiting for 2048 Deployment")
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
output, err := exec.New().CommandContext(
func (ts *tester) waitDeployment2048() (err error) {
timeout := 7*time.Minute + time.Duration(ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicas2048)*time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err = k8s_client.WaitForDeploymentCompletes(
ctx,
ts.cfg.EKSConfig.KubectlPath,
"--kubeconfig="+ts.cfg.EKSConfig.KubeConfigPath,
"--namespace="+ts.cfg.EKSConfig.AddOnALB2048.Namespace,
"describe",
"deployment",
ts.cfg.Logger,
ts.cfg.LogWriter,
ts.cfg.Stopc,
ts.cfg.K8SClient,
time.Minute,
20*time.Second,
ts.cfg.EKSConfig.AddOnALB2048.Namespace,
alb2048DeploymentName,
).CombinedOutput()
cancel()
if err != nil {
return fmt.Errorf("'kubectl describe deployment' failed %v", err)
}
out := string(output)
fmt.Fprintf(ts.cfg.LogWriter, "\n\n\"kubectl describe deployment\" output:\n%s\n\n", out)

ready := false
waitDur := 7*time.Minute + time.Duration(ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicas2048)*time.Minute
retryStart := time.Now()
for time.Now().Sub(retryStart) < waitDur {
select {
case <-ts.cfg.Stopc:
return errors.New("check aborted")
case <-time.After(15 * time.Second):
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
dresp, err := ts.cfg.K8SClient.KubernetesClientSet().
AppsV1().
Deployments(ts.cfg.EKSConfig.AddOnALB2048.Namespace).
Get(ctx, alb2048DeploymentName, metav1.GetOptions{})
cancel()
if err != nil {
return fmt.Errorf("failed to get Deployment (%v)", err)
}
ts.cfg.Logger.Info("get deployment",
zap.Int32("desired-replicas", dresp.Status.Replicas),
zap.Int32("available-replicas", dresp.Status.AvailableReplicas),
zap.Int32("unavailable-replicas", dresp.Status.UnavailableReplicas),
zap.Int32("ready-replicas", dresp.Status.ReadyReplicas),
)
available := false
for _, cond := range dresp.Status.Conditions {
ts.cfg.Logger.Info("condition",
zap.String("last-updated", cond.LastUpdateTime.String()),
zap.String("type", string(cond.Type)),
zap.String("status", string(cond.Status)),
zap.String("reason", cond.Reason),
zap.String("message", cond.Message),
)
if cond.Status != v1.ConditionTrue {
continue
ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicas2048,
k8s_client.WithQueryFunc(func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
output, err := exec.New().CommandContext(
ctx,
ts.cfg.EKSConfig.KubectlPath,
"--kubeconfig="+ts.cfg.EKSConfig.KubeConfigPath,
"--namespace="+ts.cfg.EKSConfig.AddOnALB2048.Namespace,
"describe",
"deployment",
alb2048DeploymentName,
).CombinedOutput()
cancel()
if err != nil {
ts.cfg.Logger.Warn("'kubectl describe deployment' failed", zap.Error(err))
}
if cond.Type == appsv1.DeploymentAvailable {
available = true
break
}
}
if available && dresp.Status.AvailableReplicas >= ts.cfg.EKSConfig.AddOnALB2048.DeploymentReplicas2048 {
ready = true
break
}
}
if !ready {
return errors.New("deployment not ready")
}

ts.cfg.Logger.Info("waited for 2048 Deployment")
return ts.cfg.EKSConfig.Sync()
out := string(output)
fmt.Fprintf(ts.cfg.LogWriter, "\n\n\"kubectl describe deployment\" output:\n%s\n\n", out)
}),
)
cancel()
return err
}

// https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
Expand Down
25 changes: 24 additions & 1 deletion eks/configmaps/remote/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/exec"
)

// Config defines configmaps configuration.
Expand Down Expand Up @@ -137,6 +138,26 @@ func (ts *tester) Create() (err error) {
ts.cfg.EKSConfig.AddOnConfigmapsRemote.Namespace,
configmapsJobName,
ts.cfg.EKSConfig.AddOnConfigmapsRemote.Completes,
k8s_client.WithQueryFunc(func() {
descArgs := []string{
ts.cfg.EKSConfig.KubectlPath,
"--kubeconfig=" + ts.cfg.EKSConfig.KubeConfigPath,
"--namespace=" + ts.cfg.EKSConfig.AddOnCSRsRemote.Namespace,
"describe",
"job",
configmapsJobName,
}
descCmd := strings.Join(descArgs, " ")
ts.cfg.Logger.Info("describing job", zap.String("describe-command", descCmd))
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
descOutput, err := exec.New().CommandContext(ctx, descArgs[0], descArgs[1:]...).CombinedOutput()
cancel()
if err != nil {
ts.cfg.Logger.Warn("'kubectl describe job' failed", zap.Error(err))
}
out := string(descOutput)
fmt.Fprintf(ts.cfg.LogWriter, "\n\n\n\"%s\" output:\n\n%s\n\n", descCmd, out)
}),
)
cancel()
if err != nil {
Expand Down Expand Up @@ -564,7 +585,9 @@ func (ts *tester) createObject() (batchv1.Job, string, error) {
ServiceAccountName: configmapsServiceAccountName,

// spec.template.spec.restartPolicy: Unsupported value: "Always": supported values: "OnFailure", "Never"
RestartPolicy: v1.RestartPolicyOnFailure,
// ref. https://github.com/kubernetes/kubernetes/issues/54870
RestartPolicy: v1.RestartPolicyNever,

// TODO: set resource limits
Containers: []v1.Container{
{
Expand Down
Loading

0 comments on commit 0d6e2c9

Please sign in to comment.