Skip to content

Commit

Permalink
koordlet: add allpods reconciler func (#2112)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <kang.zhang@intel.com>
  • Loading branch information
kangclzjc committed Jul 4, 2024
1 parent 091f73e commit 0db9ea2
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 21 deletions.
89 changes: 89 additions & 0 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
PodLevel ReconcilerLevel = "pod"
ContainerLevel ReconcilerLevel = "container"
SandboxLevel ReconcilerLevel = "sandbox"
AllPodsLevel ReconcilerLevel = "allpods"
)

var globalCgroupReconcilers = struct {
Expand All @@ -49,12 +50,14 @@ var globalCgroupReconcilers = struct {
containerLevel map[string]*cgroupReconciler

sandboxContainerLevel map[string]*cgroupReconciler
allPodsLevel map[string]*cgroupReconciler
}{
kubeQOSLevel: map[string]*cgroupReconciler{},
podLevel: map[string]*cgroupReconciler{},
containerLevel: map[string]*cgroupReconciler{},

sandboxContainerLevel: map[string]*cgroupReconciler{},
allPodsLevel: map[string]*cgroupReconciler{},
}

type cgroupReconciler struct {
Expand All @@ -63,6 +66,7 @@ type cgroupReconciler struct {
level ReconcilerLevel
filter Filter
fn map[string]reconcileFunc
fn4AllPods map[string]reconcileFunc4AllPods
}

// Filter & Conditions:
Expand Down Expand Up @@ -134,6 +138,61 @@ func PodQOSFilter() Filter {
}

type reconcileFunc func(protocol.HooksProtocol) error
type reconcileFunc4AllPods func([]protocol.HooksProtocol) error

func RegisterCgroupReconciler4AllPods(level ReconcilerLevel, cgroupFile system.Resource, description string,
fn reconcileFunc4AllPods, filter Filter, conditions ...string) {
if len(conditions) <= 0 { // default condition
conditions = []string{NoneFilterCondition}
}

for _, r := range globalCgroupReconcilers.all {
if level != r.level || cgroupFile.ResourceType() != r.cgroupFile.ResourceType() {
continue
}

// if reconciler exist
if r.filter.Name() != filter.Name() {
klog.Fatalf("%v of level %v is already registered with filter %v by %v, cannot change to %v by %v",
cgroupFile.ResourceType(), level, r.filter.Name(), r.description, filter.Name(), description)
}

for _, condition := range conditions {
if _, ok := r.fn[condition]; ok {
klog.Fatalf("%v of level %v is already registered with condition %v by %v, cannot change by %v",
cgroupFile.ResourceType(), level, condition, r.description, description)
}

r.fn4AllPods[condition] = fn
}
klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, add conditions=%v",
description, level, cgroupFile.ResourceType(), conditions)
return
}

// if reconciler not exist
r := &cgroupReconciler{
cgroupFile: cgroupFile,
description: description,
level: level,
fn: map[string]reconcileFunc{},
fn4AllPods: map[string]reconcileFunc4AllPods{},
}

globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r)
switch level {
case AllPodsLevel:
r.filter = filter
for _, condition := range conditions {
r.fn4AllPods[condition] = fn
}
globalCgroupReconcilers.allPodsLevel[string(r.cgroupFile.ResourceType())] = r
default:
klog.Fatalf("cgroup level %v is not supported", level)
}
klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, filter=%v, conditions=%v",
description, level, cgroupFile.ResourceType(), filter.Name(), conditions)
}

// RegisterCgroupReconciler registers a cgroup reconciler according to the cgroup file, reconcile function and filter
// conditions. A cgroup file of one level can have multiple reconcile functions with different filtered conditions.
Expand Down Expand Up @@ -178,6 +237,7 @@ func RegisterCgroupReconciler(level ReconcilerLevel, cgroupFile system.Resource,
description: description,
level: level,
fn: map[string]reconcileFunc{},
fn4AllPods: map[string]reconcileFunc4AllPods{},
}

globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r)
Expand Down Expand Up @@ -385,6 +445,35 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}
}
}

for _, r := range globalCgroupReconcilers.allPodsLevel {
currentPods := make([]protocol.HooksProtocol, 0)
fns := make(map[string]reconcileFunc4AllPods)
for _, podMeta := range podsMeta {
key := r.filter.Filter(podMeta)
if fn, ok := r.fn4AllPods[key]; ok {
podCtx := protocol.HooksProtocolBuilder.Pod(podMeta)
currentPods = append(currentPods, podCtx)
fns[key] = fn
}
}

if len(currentPods) > 0 {
if len(fns) == 0 {
klog.V(5).Infof("calling reconcile function %v aborted, condition %s not registered",
r.description, r.filter.Name())
continue
}

for k, fn := range fns {
if err := fn(currentPods); err != nil {
klog.Warningf("calling reconcile function %v for pod %v failed, error %v, condition %s",
r.description, err, k)
}
}
}
}

case <-stopCh:
klog.V(1).Infof("stop reconcile pod cgroup")
return
Expand Down
145 changes: 124 additions & 21 deletions pkg/koordlet/runtimehooks/reconciler/reconciler_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,36 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

type none1Filter struct{}

const (
None1FilterCondition = "none1"
None2FilterCondition = "none2"
None1FilterName = "none"
)

func (d *none1Filter) Name() string {
return None1FilterName
}

func (d *none1Filter) Filter(podMeta *statesinformer.PodMeta) string {
if podMeta.Pod.Name == "test1-pod-name" {
return None1FilterCondition
} else if podMeta.Pod.Name == "test2-pod-name" {
return None2FilterCondition
}
return podMeta.Pod.Name
}

var singletonNone1Filter *none1Filter

func None1Filter() Filter {
if singletonNone1Filter == nil {
singletonNone1Filter = &none1Filter{}
}
return singletonNone1Filter
}

func Test_doKubeQOSCgroup(t *testing.T) {
type args struct {
resource system.Resource
Expand Down Expand Up @@ -107,6 +137,7 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
}
podLevelOutput := map[string]string{}
containerLevelOutput := map[string]string{}
allPodsLevelOutput := map[string]string{}

podReconcilerFn := func(proto protocol.HooksProtocol) error {
podCtx := proto.(*protocol.PodContext)
Expand All @@ -123,15 +154,28 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
tryStopFn()
return nil
}
allpodReconcilerFn := func(protos []protocol.HooksProtocol) error {
for _, proto := range protos {
podCtx := proto.(*protocol.PodContext)
podKey := genPodKey(podCtx.Request.PodMeta.Namespace, podCtx.Request.PodMeta.Name)
allPodsLevelOutput[podKey] = podCtx.Request.PodMeta.UID
}
tryStopFn()
return nil
}

RegisterCgroupReconciler(PodLevel, system.CPUBVTWarpNs, "get pod uid", podReconcilerFn, NoneFilter())
RegisterCgroupReconciler(ContainerLevel, system.CPUBVTWarpNs, "get container uid", containerReconcilerFn, NoneFilter())
RegisterCgroupReconciler4AllPods(AllPodsLevel, system.CPUBVTWarpNs, "get all pods uid", allpodReconcilerFn, None1Filter(), None1FilterCondition)
RegisterCgroupReconciler4AllPods(AllPodsLevel, system.CPUBVTWarpNs, "get all pods uid", allpodReconcilerFn, None1Filter(), None2FilterCondition)

type fields struct {
podsMeta *statesinformer.PodMeta
podsMeta []*statesinformer.PodMeta
}
type wants struct {
wantPods map[string]string
wantContainers map[string]string
wantPods map[string]string
wantContainers map[string]string
wantPods4AllPods map[string]string
}

test := struct {
Expand All @@ -142,25 +186,75 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {

name: "reconcile pod cgroup to get uid",
fields: fields{
podsMeta: &statesinformer.PodMeta{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-pod-name",
UID: "test-pod-uid",
podsMeta: []*statesinformer.PodMeta{
{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test1-pod-name",
UID: "test1-pod-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test1-container-name",
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test1-container-name",
ContainerID: "test1-container-id",
},
},
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container-name",
},
{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test2-pod-name",
UID: "test2-pod-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test2-container-name",
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test2-container-name",
ContainerID: "test2-container-id",
},
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test-container-name",
ContainerID: "test-container-id",
},
{
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test3-pod-name",
UID: "test3-pod-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test3-container-name",
},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "test3-container-name",
ContainerID: "test3-container-id",
},
},
},
},
Expand All @@ -169,16 +263,24 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
},
wants: wants{
wantPods: map[string]string{
genPodKey("test-ns", "test-pod-name"): "test-pod-uid",
genPodKey("test-ns", "test1-pod-name"): "test1-pod-uid",
genPodKey("test-ns", "test2-pod-name"): "test2-pod-uid",
genPodKey("test-ns", "test3-pod-name"): "test3-pod-uid",
},
wantContainers: map[string]string{
genContainerKey("test-ns", "test-pod-name", "test-container-name"): "test-container-id",
genContainerKey("test-ns", "test1-pod-name", "test1-container-name"): "test1-container-id",
genContainerKey("test-ns", "test2-pod-name", "test2-container-name"): "test2-container-id",
genContainerKey("test-ns", "test3-pod-name", "test3-container-name"): "test3-container-id",
},
wantPods4AllPods: map[string]string{
genPodKey("test-ns", "test1-pod-name"): "test1-pod-uid",
genPodKey("test-ns", "test2-pod-name"): "test2-pod-uid",
},
},
}
t.Run(test.name, func(t *testing.T) {
c := &reconciler{
podsMeta: []*statesinformer.PodMeta{test.fields.podsMeta},
podsMeta: test.fields.podsMeta,
podUpdated: make(chan struct{}, 1),
executor: resourceexecutor.NewTestResourceExecutor(),
}
Expand All @@ -189,6 +291,7 @@ func Test_reconciler_reconcilePodCgroup(t *testing.T) {
c.reconcilePodCgroup(stopCh)
assert.Equal(t, test.wants.wantPods, podLevelOutput, "pod reconciler should be equal")
assert.Equal(t, test.wants.wantContainers, containerLevelOutput, "container reconciler should be equal")
assert.Equal(t, test.wants.wantPods4AllPods, allPodsLevelOutput, "all pods reconciler should be equal")
})
}

Expand Down

0 comments on commit 0db9ea2

Please sign in to comment.