Skip to content

Commit

Permalink
pkg/k8s-client: add more wait functions
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed Jun 25, 2020
1 parent a8d34ce commit 98ecccc
Showing 1 changed file with 183 additions and 8 deletions.
191 changes: 183 additions & 8 deletions pkg/k8s-client/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

/*
https://github.com/kubernetes/perf-tests/blob/master/clusterloader2/pkg/framework/client/objects.go
https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/util/apiclient/wait.go#L49
*/

package k8sclient

import (
"context"
"errors"
"fmt"
"net"
"strings"
"time"

"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
apiv1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -35,11 +43,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
)

/*
https://github.com/kubernetes/perf-tests/blob/master/clusterloader2/pkg/framework/client/objects.go
https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/util/apiclient/wait.go#L49
*/

const (
// Parameters for retrying with exponential backoff.
retryBackoffInitialDuration = 100 * time.Millisecond
Expand Down Expand Up @@ -205,7 +208,7 @@ func CreateNamespace(lg *zap.Logger, c clientset.Interface, namespace string) er
lg.Info("created namespace", zap.String("namespace", namespace))
return nil
}
if kerrors.IsAlreadyExists(err) {
if apierrs.IsAlreadyExists(err) {
lg.Info("namespace already exists", zap.String("namespace", namespace), zap.Error(err))
return nil
}
Expand Down Expand Up @@ -249,7 +252,7 @@ func deleteNamespace(lg *zap.Logger, c clientset.Interface, namespace string) er
lg.Info("deleted namespace", zap.String("namespace", namespace))
return nil
}
if kerrors.IsNotFound(err) || kerrors.IsGone(err) {
if apierrs.IsNotFound(err) || apierrs.IsGone(err) {
lg.Info("namespace already deleted", zap.String("namespace", namespace), zap.Error(err))
return nil
}
Expand All @@ -271,6 +274,7 @@ func waitForDeleteNamespace(lg *zap.Logger, c clientset.Interface, namespace str
if timeout == 0 {
timeout = DefaultNamespaceDeletionTimeout
}

retryWaitFunc := func() (done bool, err error) {
lg.Info("waiting for namespace deletion", zap.String("namespace", namespace))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand Down Expand Up @@ -500,6 +504,177 @@ func CreateObject(dynamicClient dynamic.Interface, namespace string, name string
return RetryWithExponentialBackOff(RetryFunction(createFunc, options...))
}

// WaitForJobCompletes waits for all Job completion,
// by counting the number of pods in the namespace.
func WaitForJobCompletes(
lg *zap.Logger,
stopc chan struct{},
k8sClient EKS,
initialWait time.Duration,
interval time.Duration,
timeout time.Duration,
namespace string,
jobName string,
target int,
opts ...OpOption) (job *batchv1.Job, pods []apiv1.Pod, err error) {
job, _, pods, err = waitForJobCompletes(false, lg, stopc, k8sClient, initialWait, interval, timeout, namespace, jobName, target, opts...)
return job, pods, err
}

// WaitForCronJobCompletes waits for all CronJob completion,
// by counting the number of pods in the namespace.
func WaitForCronJobCompletes(
lg *zap.Logger,
stopc chan struct{},
k8sClient EKS,
initialWait time.Duration,
interval time.Duration,
timeout time.Duration,
namespace string,
jobName string,
target int,
opts ...OpOption) (cronJob *batchv1beta1.CronJob, pods []apiv1.Pod, err error) {
_, cronJob, pods, err = waitForJobCompletes(true, lg, stopc, k8sClient, initialWait, interval, timeout, namespace, jobName, target, opts...)
return cronJob, pods, err
}

func waitForJobCompletes(
isCronJob bool,
lg *zap.Logger,
stopc chan struct{},
k8sClient EKS,
initialWait time.Duration,
interval time.Duration,
timeout time.Duration,
namespace string,
jobName string,
target int,
opts ...OpOption) (job *batchv1.Job, cronJob *batchv1beta1.CronJob, pods []apiv1.Pod, err error) {
ret := Op{}
ret.applyOpts(opts)

if interval == 0 {
interval = DefaultNamespaceDeletionInterval
}
if timeout == 0 {
timeout = DefaultNamespaceDeletionTimeout
}

lg.Info("waiting Job completes",
zap.String("namespace", namespace),
zap.String("job-name", jobName),
zap.Bool("cron-job", isCronJob),
zap.String("interval", interval.String()),
zap.String("timeout", timeout.String()),
zap.Int("target", target),
)
select {
case <-stopc:
return nil, nil, nil, errors.New("initial wait aborted")
case <-time.After(initialWait):
}

retryWaitFunc := func() (done bool, err error) {
select {
case <-stopc:
return true, errors.New("wait aborted")
default:
}

lg.Info("listing pods to check Job completion")
pods, err = k8sClient.ListPods(namespace, 150, 5*time.Second)
if err != nil {
lg.Warn("failed to list Pod", zap.Bool("retriable-error", IsRetryableAPIError(err)), zap.Error(err))
return false, err
}
if len(pods) == 0 {
lg.Warn("got an empty list of Pod")
return false, nil
}
podSucceededCnt := 0
for _, item := range pods {
jv, ok := item.Labels["job-name"]
match := ok && jv == jobName
if !match {
match = strings.HasPrefix(item.Name, jobName)
}
if !match {
continue
}
if item.Status.Phase != apiv1.PodSucceeded {
continue
}
podSucceededCnt++
}
if podSucceededCnt < target {
lg.Warn("polled not succeeded yet",
zap.String("namespace", namespace),
zap.String("job-name", jobName),
zap.Int("pod-succeeded-count", podSucceededCnt),
zap.Int("target", target),
)
return false, nil
}
lg.Info("polled pods",
zap.String("namespace", namespace),
zap.String("job-name", jobName),
zap.Int("pod-succeeded-count", podSucceededCnt),
zap.Int("target", target),
)

switch isCronJob {
case false:
lg.Info("checking Job object", zap.String("namespace", namespace))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
job, err = k8sClient.KubernetesClientSet().
BatchV1().
Jobs(namespace).
Get(ctx, jobName, metav1.GetOptions{})
cancel()
if err != nil {
lg.Warn("failed to check Job", zap.Bool("retriable-error", IsRetryableAPIError(err)), zap.Error(err))
return false, err
}
for _, cond := range job.Status.Conditions {
if cond.Status != apiv1.ConditionTrue {
continue
}
if cond.Type == batchv1.JobFailed {
lg.Warn("job failed", zap.String("condition-type", fmt.Sprintf("%s", cond.Type)))
return true, fmt.Errorf("Job %q status %q", jobName, cond.Type)
}
if cond.Type == batchv1.JobComplete {
lg.Info("job complete", zap.String("condition-type", fmt.Sprintf("%s", cond.Type)))
return true, nil
}
lg.Warn("job not complete", zap.String("condition-type", fmt.Sprintf("%s", cond.Type)))
}

case true:
lg.Info("checking CronJob object", zap.String("namespace", namespace))
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
cronJob, err = k8sClient.KubernetesClientSet().
BatchV1beta1().
CronJobs(namespace).
Get(ctx, jobName, metav1.GetOptions{})
cancel()
if err != nil {
lg.Warn("failed to check CronJob", zap.Bool("retriable-error", IsRetryableAPIError(err)), zap.Error(err))
return false, err
}
lg.Info("checked CronJob object", zap.Int("active-jobs", len(cronJob.Status.Active)))
return true, nil
}

if ret.queryFunc != nil {
ret.queryFunc()
}
return false, nil
}
err = wait.PollImmediate(interval, timeout, retryWaitFunc)
return job, cronJob, pods, err
}

// Op represents a SSH operation.
type Op struct {
queryFunc func()
Expand Down

0 comments on commit 98ecccc

Please sign in to comment.