Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: add allpods reconciler func #2112

Merged
merged 2 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
PodLevel ReconcilerLevel = "pod"
ContainerLevel ReconcilerLevel = "container"
SandboxLevel ReconcilerLevel = "sandbox"
AllPodsLevel ReconcilerLevel = "allpods"
)

var globalCgroupReconcilers = struct {
Expand All @@ -49,12 +50,14 @@ var globalCgroupReconcilers = struct {
containerLevel map[string]*cgroupReconciler

sandboxContainerLevel map[string]*cgroupReconciler
allPodsLevel map[string]*cgroupReconciler
}{
kubeQOSLevel: map[string]*cgroupReconciler{},
podLevel: map[string]*cgroupReconciler{},
containerLevel: map[string]*cgroupReconciler{},

sandboxContainerLevel: map[string]*cgroupReconciler{},
allPodsLevel: map[string]*cgroupReconciler{},
}

type cgroupReconciler struct {
Expand All @@ -63,6 +66,7 @@ type cgroupReconciler struct {
level ReconcilerLevel
filter Filter
fn map[string]reconcileFunc
fn4AllPods map[string]reconcileFunc4AllPods
}

// Filter & Conditions:
Expand Down Expand Up @@ -134,6 +138,61 @@ func PodQOSFilter() Filter {
}

type reconcileFunc func(protocol.HooksProtocol) error
type reconcileFunc4AllPods func([]protocol.HooksProtocol) error

func RegisterCgroupReconciler4AllPods(level ReconcilerLevel, cgroupFile system.Resource, description string,
fn reconcileFunc4AllPods, filter Filter, conditions ...string) {
if len(conditions) <= 0 { // default condition
conditions = []string{NoneFilterCondition}
}

for _, r := range globalCgroupReconcilers.all {
if level != r.level || cgroupFile.ResourceType() != r.cgroupFile.ResourceType() {
continue
}

// if reconciler exist
if r.filter.Name() != filter.Name() {
klog.Fatalf("%v of level %v is already registered with filter %v by %v, cannot change to %v by %v",
cgroupFile.ResourceType(), level, r.filter.Name(), r.description, filter.Name(), description)
}

for _, condition := range conditions {
if _, ok := r.fn[condition]; ok {
klog.Fatalf("%v of level %v is already registered with condition %v by %v, cannot change by %v",
cgroupFile.ResourceType(), level, condition, r.description, description)
}

r.fn4AllPods[condition] = fn
}
klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, add conditions=%v",
description, level, cgroupFile.ResourceType(), conditions)
return
}

// if reconciler not exist
r := &cgroupReconciler{
cgroupFile: cgroupFile,
description: description,
level: level,
fn: map[string]reconcileFunc{},
fn4AllPods: map[string]reconcileFunc4AllPods{},
}

globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r)
switch level {
case AllPodsLevel:
r.filter = filter
for _, condition := range conditions {
r.fn4AllPods[condition] = fn
}
globalCgroupReconcilers.allPodsLevel[string(r.cgroupFile.ResourceType())] = r
default:
klog.Fatalf("cgroup level %v is not supported", level)
}
klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, filter=%v, conditions=%v",
description, level, cgroupFile.ResourceType(), filter.Name(), conditions)
}

// RegisterCgroupReconciler registers a cgroup reconciler according to the cgroup file, reconcile function and filter
// conditions. A cgroup file of one level can have multiple reconcile functions with different filtered conditions.
Expand Down Expand Up @@ -178,6 +237,7 @@ func RegisterCgroupReconciler(level ReconcilerLevel, cgroupFile system.Resource,
description: description,
level: level,
fn: map[string]reconcileFunc{},
fn4AllPods: map[string]reconcileFunc4AllPods{},
}

globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r)
Expand Down Expand Up @@ -385,6 +445,34 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}
}
}

for _, r := range globalCgroupReconcilers.allPodsLevel {
var reconcileFn reconcileFunc4AllPods
currentPods := make([]protocol.HooksProtocol, 0)
for _, podMeta := range podsMeta {
if fn, ok := r.fn4AllPods[r.filter.Filter(podMeta)]; ok {
podCtx := protocol.HooksProtocolBuilder.Pod(podMeta)
currentPods = append(currentPods, podCtx)
if reconcileFn == nil {
reconcileFn = fn
saintube marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

if len(currentPods) > 0 {
if reconcileFn == nil {
klog.V(5).Infof("calling reconcile function %v aborted, condition %s not registered",
r.description, r.filter.Name())
continue
}

if err := reconcileFn(currentPods); err != nil {
klog.Warningf("calling reconcile function %v for pod %v failed, error %v",
r.description, err)
}
}

}
case <-stopCh:
klog.V(1).Infof("stop reconcile pod cgroup")
return
Expand Down
81 changes: 61 additions & 20 deletions pkg/koordlet/runtimehooks/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
}
podLevelOutput := map[string]string{}
containerLevelOutput := map[string]string{}
allPodsLevelOutput := map[string]string{}

podReconcilerFn := func(proto protocol.HooksProtocol) error {
podCtx := proto.(*protocol.PodContext)
Expand All @@ -123,11 +124,22 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
tryStopFn()
return nil
}
allpodReconcilerFn := func(protos []protocol.HooksProtocol) error {
for _, proto := range protos {
podCtx := proto.(*protocol.PodContext)
podKey := genPodKey(podCtx.Request.PodMeta.Namespace, podCtx.Request.PodMeta.Name)
allPodsLevelOutput[podKey] = podCtx.Request.PodMeta.UID
}
tryStopFn()
return nil
}

RegisterCgroupReconciler(PodLevel, system.CPUBVTWarpNs, "get pod uid", podReconcilerFn, NoneFilter())
RegisterCgroupReconciler(ContainerLevel, system.CPUBVTWarpNs, "get container uid", containerReconcilerFn, NoneFilter())
RegisterCgroupReconciler4AllPods(AllPodsLevel, system.CPUBVTWarpNs, "get all pods uid", allpodReconcilerFn, NoneFilter())

type fields struct {
podsMeta *statesinformer.PodMeta
podsMeta []*statesinformer.PodMeta
}
type wants struct {
wantPods map[string]string
Expand All @@ -142,25 +154,51 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {

name: "reconcile pod cgroup to get uid",
fields: fields{
podsMeta: &statesinformer.PodMeta{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-pod-name",
UID: "test-pod-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container-name",
podsMeta: []*statesinformer.PodMeta{
{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-pod-name",
UID: "test-pod-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container-name",
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test-container-name",
ContainerID: "test-container-id",
},
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test-container-name",
ContainerID: "test-container-id",
},
{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-pod-name-1",
UID: "test-pod-uid-1",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container-name-1",
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test-container-name-1",
ContainerID: "test-container-id-1",
},
},
},
},
Expand All @@ -169,16 +207,18 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
},
wants: wants{
wantPods: map[string]string{
genPodKey("test-ns", "test-pod-name"): "test-pod-uid",
genPodKey("test-ns", "test-pod-name"): "test-pod-uid",
genPodKey("test-ns", "test-pod-name-1"): "test-pod-uid-1",
},
wantContainers: map[string]string{
genContainerKey("test-ns", "test-pod-name", "test-container-name"): "test-container-id",
genContainerKey("test-ns", "test-pod-name", "test-container-name"): "test-container-id",
genContainerKey("test-ns", "test-pod-name-1", "test-container-name-1"): "test-container-id-1",
},
},
}
t.Run(test.name, func(t *testing.T) {
c := &reconciler{
podsMeta: []*statesinformer.PodMeta{test.fields.podsMeta},
podsMeta: test.fields.podsMeta,
podUpdated: make(chan struct{}, 1),
executor: resourceexecutor.NewTestResourceExecutor(),
}
Expand All @@ -189,6 +229,7 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
c.reconcilePodCgroup(stopCh)
assert.Equal(t, test.wants.wantPods, podLevelOutput, "pod reconciler should be equal")
assert.Equal(t, test.wants.wantContainers, containerLevelOutput, "container reconciler should be equal")
assert.Equal(t, test.wants.wantPods, allPodsLevelOutput, "all pods reconciler should be equal")
})
}

Expand Down