Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize container launch priority performance #1490

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/apps/pub/launch_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ const (
ContainerLaunchPriorityKey = "apps.kruise.io/container-launch-priority"
// ContainerLaunchOrdered is the annotation value that indicates containers in pod should be launched by ordinal.
ContainerLaunchOrdered = "Ordered"

// ContainerLaunchPriorityCompletedKey is the annotation indicates the pod has all its priorities
// patched into its barrier configmap.
ContainerLaunchPriorityCompletedKey = "apps.kruise.io/container-launch-priority-completed"
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@
import (
"context"
"fmt"
"strconv"
"sort"
"time"

"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/util/sets"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -40,6 +38,11 @@
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

appspub "github.com/openkruise/kruise/apis/apps/pub"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority"
)

const (
Expand Down Expand Up @@ -70,15 +73,11 @@
err = c.Watch(&source.Kind{Type: &v1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
pod := e.Object.(*v1.Pod)
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
// If in vk scenario, there will be not containerReady condition
return utilcontainerlaunchpriority.ExistsPriorities(pod) && (containersReady == nil || containersReady.Status != v1.ConditionTrue)
return shouldEnqueue(pod, mgr.GetCache())

Check warning on line 76 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L76

Added line #L76 was not covered by tests
},
UpdateFunc: func(e event.UpdateEvent) bool {
pod := e.ObjectNew.(*v1.Pod)
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
// If in vk scenario, there will be not containerReady condition
return utilcontainerlaunchpriority.ExistsPriorities(pod) && (containersReady == nil || containersReady.Status != v1.ConditionTrue)
return shouldEnqueue(pod, mgr.GetCache())

Check warning on line 80 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L80

Added line #L80 was not covered by tests
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
Expand All @@ -94,6 +93,30 @@
return nil
}

func shouldEnqueue(pod *v1.Pod, r client.Reader) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add ut for shouldEnqueue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

if pod.Annotations[appspub.ContainerLaunchPriorityCompletedKey] == "true" {
return false
}
if _, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady); containersReady != nil && containersReady.Status == v1.ConditionTrue {
return false
}

Check warning on line 102 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L101-L102

Added lines #L101 - L102 were not covered by tests

nextPriorities := findNextPriorities(pod)
if len(nextPriorities) == 0 {
return false
}

Check warning on line 107 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L106-L107

Added lines #L106 - L107 were not covered by tests

var barrier = &v1.ConfigMap{}
var barrierNamespacedName = types.NamespacedName{
Namespace: pod.GetNamespace(),
Name: pod.Name + "-barrier",
}
if err := r.Get(context.TODO(), barrierNamespacedName, barrier); err != nil {
return true
}

Check warning on line 116 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L115-L116

Added lines #L115 - L116 were not covered by tests
return !isExistsInBarrier(nextPriorities[len(nextPriorities)-1], barrier)
}

var _ reconcile.Reconciler = &ReconcileContainerLaunchPriority{}

// ReconcileContainerLaunchPriority reconciles a Pod object
Expand Down Expand Up @@ -151,52 +174,73 @@
return reconcile.Result{}, err
}

// set next starting containers
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
if containersReady != nil && containersReady.Status != v1.ConditionTrue {
patchKey := r.findNextPatchKey(pod)
if patchKey == nil {
return reconcile.Result{}, nil
}
key := "p_" + strconv.Itoa(*patchKey)
if err = r.patchOnKeyNotExist(barrier, key); err != nil {
return reconcile.Result{}, err
}
// handle the pod and barrier
if err = r.handle(pod, barrier); err != nil {
return reconcile.Result{}, err

Check warning on line 179 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L179

Added line #L179 was not covered by tests
}

return reconcile.Result{}, nil
}

func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int {
var priority *int
var containerPendingSet = make(map[string]bool)
func (r *ReconcileContainerLaunchPriority) handle(pod *v1.Pod, barrier *v1.ConfigMap) error {
nextPriorities := findNextPriorities(pod)

// If there is no more priorities, or the lowest priority exists in barrier, mask as completed.
if len(nextPriorities) == 0 || isExistsInBarrier(nextPriorities[0], barrier) {
return r.patchCompleted(pod)
}

// Try to add the current priority if not exists.
if !isExistsInBarrier(nextPriorities[len(nextPriorities)-1], barrier) {
if err := r.addPriorityIntoBarrier(barrier, nextPriorities[len(nextPriorities)-1]); err != nil {
return err
}

Check warning on line 197 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L196-L197

Added lines #L196 - L197 were not covered by tests
}

// After adding the current priority, if the lowest priority is same to the current one, mark as completed.
if nextPriorities[len(nextPriorities)-1] == nextPriorities[0] {
return r.patchCompleted(pod)
}
return nil
}

func (r *ReconcileContainerLaunchPriority) addPriorityIntoBarrier(barrier *v1.ConfigMap, priority int) error {
klog.V(3).Infof("Adding priority %d into barrier %s/%s", priority, barrier.Namespace, barrier.Name)
body := fmt.Sprintf(`{"data":{"%s":"true"}}`, utilcontainerlaunchpriority.GetKey(priority))
return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func (r *ReconcileContainerLaunchPriority) patchCompleted(pod *v1.Pod) error {
klog.V(3).Infof("Marking pod %s/%s as launch priority completed", pod.Namespace, pod.Name)
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"true"}}}`, appspub.ContainerLaunchPriorityCompletedKey)
return r.Client.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func findNextPriorities(pod *v1.Pod) (priorities []int) {
containerReadySet := sets.NewString()
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
continue
containerReadySet.Insert(status.Name)
}
containerPendingSet[status.Name] = true
}
for _, c := range pod.Spec.Containers {
if _, ok := containerPendingSet[c.Name]; ok {
p := utilcontainerlaunchpriority.GetContainerPriority(&c)
if p == nil {
continue
}
if priority == nil || *p > *priority {
priority = p
}
if containerReadySet.Has(c.Name) {
continue
}
priority := utilcontainerlaunchpriority.GetContainerPriority(&c)
if priority == nil {
continue
}

priorities = append(priorities, *priority)
}
if len(priorities) > 0 {
sort.Ints(priorities)
}
return priority
return
}

func (r *ReconcileContainerLaunchPriority) patchOnKeyNotExist(barrier *v1.ConfigMap, key string) error {
if _, ok := barrier.Data[key]; !ok {
body := fmt.Sprintf(
`{"data":{"%s":"true"}}`,
key,
)
return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}
return nil
func isExistsInBarrier(priority int, barrier *v1.ConfigMap) bool {
_, exists := barrier.Data[utilcontainerlaunchpriority.GetKey(priority)]
return exists
}
Loading
Loading