Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: acejilam <acejilam@gmail.com>
  • Loading branch information
ls-2018 committed Dec 12, 2023
1 parent 12e6c38 commit cfe5886
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 14 deletions.
4 changes: 4 additions & 0 deletions apis/apps/pub/launch_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
// ContainerLaunchPriorityEnvName is the env name that users have to define in pod container
// to identity the launch priority of this container.
ContainerLaunchPriorityEnvName = "KRUISE_CONTAINER_PRIORITY"
// ContainerLaunchTimeOutEnvName is high priority container startup times out.
ContainerLaunchTimeOutEnvName = "KRUISE_CONTAINER_LAUNCH_TIMEOUT"
// ContainerLaunchPriorityUpdateTimeKey a label used to record the update time.
ContainerLaunchPriorityUpdateTimeKey = "apps.kruise.io/container-launch-priority-update-time"
// ContainerLaunchBarrierEnvName is the env name that Kruise webhook will inject into containers
// if the pod have configured launch priority.
ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_BARRIER"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strconv"
"time"

appspub "github.com/openkruise/kruise/apis/apps/pub"

"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority"
Expand All @@ -43,7 +45,8 @@ import (
)

const (
concurrentReconciles = 4
concurrentReconciles = 4
defaultContainerLaunchTimeout = 60
)

func Add(mgr manager.Manager) error {
Expand Down Expand Up @@ -132,6 +135,9 @@ func (r *ReconcileContainerLaunchPriority) Reconcile(_ context.Context, request
}
err = r.Get(context.TODO(), barrierNamespacedName, barrier)
if errors.IsNotFound(err) {
barrier.Annotations = map[string]string{
appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339),
}

Check warning on line 140 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L138-L140

Added lines #L138 - L140 were not covered by tests
barrier.Namespace = pod.GetNamespace()
barrier.Name = pod.Name + "-barrier"
barrier.OwnerReferences = append(barrier.OwnerReferences, metav1.OwnerReference{
Expand All @@ -151,23 +157,42 @@ func (r *ReconcileContainerLaunchPriority) Reconcile(_ context.Context, request
return reconcile.Result{}, err
}

var requeueTime time.Duration
// set next starting containers
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
if containersReady != nil && containersReady.Status != v1.ConditionTrue {
patchKey := r.findNextPatchKey(pod)
patchKey, timeout, containers := r.findNextPatchKey(pod)
if patchKey == nil {
return reconcile.Result{}, nil
}
updateTime := time.Now()
if barrier.Annotations != nil {
updateStr := barrier.Annotations[appspub.ContainerLaunchPriorityUpdateTimeKey]
parse, err := time.Parse(time.RFC3339, updateStr)
if err == nil {
updateTime = parse
}
}
for _, container := range containers {
containerStatus := util.GetContainerStatus(container.Name, pod)
if timeout > 0 && time.Duration(timeout)*time.Second < time.Since(updateTime) && (containerStatus == nil || containerStatus.Ready == false) {
r.recorder.Eventf(barrier, v1.EventTypeWarning, "ContainerLaunchTimeout", "Container %s has not launched successfully more than %ss.", container.Name, strconv.Itoa(timeout))
}
}

if time.Duration(timeout)*time.Second-time.Since(updateTime) > 0 {
requeueTime = time.Duration(timeout)*time.Second - time.Since(updateTime)
}
key := "p_" + strconv.Itoa(*patchKey)
if err = r.patchOnKeyNotExist(barrier, key); err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: requeueTime}, nil
}

func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int {
func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) (*int, int, []*v1.Container) {
var priority *int
var containerPendingSet = make(map[string]bool)
for _, status := range pod.Status.ContainerStatuses {
Expand All @@ -176,6 +201,9 @@ func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int {
}
containerPendingSet[status.Name] = true
}

timeout := 0
var priorityMap = make(map[int][]*v1.Container, len(pod.Spec.Containers))
for _, c := range pod.Spec.Containers {
if _, ok := containerPendingSet[c.Name]; ok {
p := utilcontainerlaunchpriority.GetContainerPriority(&c)
Expand All @@ -184,19 +212,37 @@ func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int {
}
if priority == nil || *p > *priority {
priority = p
timeout = getTimeout(c)
priorityMap[timeout] = append(priorityMap[timeout], &c)
}
}
}
return priority
return priority, timeout, priorityMap[timeout]
}

func (r *ReconcileContainerLaunchPriority) patchOnKeyNotExist(barrier *v1.ConfigMap, key string) error {
if _, ok := barrier.Data[key]; !ok {
body := fmt.Sprintf(
`{"data":{"%s":"true"}}`,
key,
`{"data":{"%s":"true"},"metadata":{"annotations":{"%s":"%s"}}}`,
key, appspub.ContainerLaunchPriorityUpdateTimeKey, time.Now().Format(time.RFC3339),
)
return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}
return nil
}

func parseContainerLaunchTimeOut(v string) int {
p, _ := strconv.Atoi(v)
if p < 0 {
return defaultContainerLaunchTimeout
}

Check warning on line 238 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L237-L238

Added lines #L237 - L238 were not covered by tests
return p
}
func getTimeout(c v1.Container) int {
for _, e := range c.Env {
if e.Name == appspub.ContainerLaunchTimeOutEnvName {
return parseContainerLaunchTimeOut(e.Value)
}
}
return 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package containerlauchpriority
import (
"context"
"testing"
"time"

appspub "github.com/openkruise/kruise/apis/apps/pub"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -81,6 +83,9 @@ func TestReconcile(t *testing.T) {
Key: "p_1000",
},
},
}, {
Name: appspub.ContainerLaunchTimeOutEnvName,
Value: "3",
}},
}},
},
Expand All @@ -103,15 +108,27 @@ func TestReconcile(t *testing.T) {
}

barrier0 := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "pod0-barrier"},
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "pod0-barrier",
Annotations: map[string]string{
appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339),
},
},
}
barrier1 := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "pod1-barrier"},
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "pod1-barrier",
Annotations: map[string]string{
appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339),
},
},
}

fakeClient := fake.NewFakeClientWithScheme(clientgoscheme.Scheme, pod0, pod1, barrier0, barrier1)
reconciler := &ReconcileContainerLaunchPriority{Client: fakeClient}

recorder := record.NewFakeRecorder(100)
reconciler := &ReconcileContainerLaunchPriority{Client: fakeClient, recorder: recorder}
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pod0.Namespace, Name: pod0.Name}})
if err != nil {
t.Fatal(err)
Expand All @@ -135,9 +152,52 @@ func TestReconcile(t *testing.T) {
if err := fakeClient.Get(context.TODO(), types.NamespacedName{Namespace: barrier1.Namespace, Name: barrier1.Name}, newBarrier1); err != nil {
t.Fatal(err)
}
if v, ok := newBarrier1.Data["p_1000"]; !ok {
if _, ok := newBarrier1.Data["p_1000"]; !ok {
t.Fatalf("expect barrier1 env set, but not")
} else if v != "true" {
t.Fatalf("expect barrier1 p_1000 to be true, but get %s", v)
}

if _, ok := newBarrier1.Data["p_100"]; ok {
t.Fatalf("expect barrier1 p_100 not to be set, but get ")
}
time.Sleep(time.Second * 4)
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pod1.Namespace, Name: pod1.Name}})
if err != nil {
t.Fatal(err)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 {
t.Fatal("expect a event")
}
pod1.Status.ContainerStatuses[0].Ready = true
err = fakeClient.Update(context.Background(), pod1)
if err != nil {
t.Fatal(err)
}
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pod1.Namespace, Name: pod1.Name}})
if err != nil {
t.Fatal(err)
}

newBarrier1 = &v1.ConfigMap{}
if err := fakeClient.Get(context.TODO(), types.NamespacedName{Namespace: barrier1.Namespace, Name: barrier1.Name}, newBarrier1); err != nil {
t.Fatal(err)
}

if _, ok := newBarrier1.Data["p_100"]; !ok {
t.Fatalf("expect barrier1 p_100 to be set, but not ")
}

}
func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)
for !done {
select {
case event := <-source:
events = append(events, event)
default:
done = true
}
}
return events
}

0 comments on commit cfe5886

Please sign in to comment.