Skip to content

Commit

Permalink
workloadspread support crd
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed May 16, 2023
1 parent ccd94b2 commit 9f1b7d6
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(mgr.GetClient())
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(mgr.GetClient())
if err != nil {
return err
}
if whiteList != nil {
workloadHandler := &enqueueRequestForStatefulSetLike{reader: mgr.GetClient()}
for _, workload := range whiteList.Workloads {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload); err != nil {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload, "PPS"); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old,

func (p *enqueueRequestForPod) fetchPersistentPodState(pod *corev1.Pod) *appsv1alpha1.PersistentPodState {
ref := metav1.GetControllerOf(pod)
whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(p.client)
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(p.client)
if err != nil {
klog.Errorf("Failed to get persistent pod state config white list, error: %v\n", err.Error())
return nil
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/util/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -51,18 +52,19 @@ func DiscoverGVK(gvk schema.GroupVersionKind) bool {
return true
}

func AddWatcherDynamically(c controller.Controller, h handler.EventHandler, gvk schema.GroupVersionKind) (bool, error) {
if _, ok := watcherMap.Load(gvk); ok {
func AddWatcherDynamically(c controller.Controller, h handler.EventHandler, gvk schema.GroupVersionKind, controllerKey string) (bool, error) {
cacheKey := fmt.Sprintf("controller:%s, gvk:%s", controllerKey, gvk.String())
if _, ok := watcherMap.Load(cacheKey); ok {
return false, nil
}

if !DiscoverGVK(gvk) {
klog.Errorf("Failed to find GVK(%v) in cluster", gvk.String())
klog.Errorf("Failed to find GVK(%v) in cluster for %s", gvk.String(), controllerKey)
return false, nil
}

object := &unstructured.Unstructured{}
object.SetGroupVersionKind(gvk)
watcherMap.Store(gvk, true)
watcherMap.Store(cacheKey, true)
return true, c.Watch(&source.Kind{Type: object}, h)
}
21 changes: 16 additions & 5 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
ctrlUtil "github.com/openkruise/kruise/pkg/controller/util"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/configuration"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
"github.com/openkruise/kruise/pkg/util/fieldindex"
Expand Down Expand Up @@ -152,6 +154,19 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for replicas changes to other CRD
whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(mgr.GetClient())
if err != nil {
return err
}
if len(whiteList.Workloads) > 0 {
workloadHandler := &workloadEventHandler{Reader: mgr.GetClient()}
for _, workload := range whiteList.Workloads {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload.GroupVersionKind, "WorkloadSpread"); err != nil {
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -272,14 +287,10 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work
targetRef := ws.Spec.TargetReference

switch targetRef.Kind {
case controllerKindDep.Kind, controllerKindRS.Kind, controllerKruiseKindCS.Kind, controllerKindSts.Kind:
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
case controllerKindJob.Kind:
pods, workloadReplicas, err = r.getPodJob(targetRef, ws.Namespace)
default:
r.recorder.Eventf(ws, corev1.EventTypeWarning,
"TargetReferenceError", "targetReference is not been recognized")
return nil, -1, nil
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
}

if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/workloadspread/workloadspread_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package workloadspread
import (
"context"
"encoding/json"
"reflect"
"strings"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,6 +39,7 @@ import (

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/util/configuration"
wsutil "github.com/openkruise/kruise/pkg/util/workloadspread"
)

Expand Down Expand Up @@ -125,6 +129,10 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
oldReplicas = *evt.ObjectOld.(*appsv1beta1.StatefulSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1beta1.StatefulSet).Spec.Replicas
gvk = controllerKruiseKindSts
case *unstructured.Unstructured:
oldReplicas = w.getReplicasFromUnstructured(evt.ObjectOld.(*unstructured.Unstructured))
newReplicas = w.getReplicasFromUnstructured(evt.ObjectNew.(*unstructured.Unstructured))
gvk = evt.ObjectNew.(*unstructured.Unstructured).GroupVersionKind()
default:
return
}
Expand All @@ -150,6 +158,40 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
}
}

func (w *workloadEventHandler) getReplicasFromUnstructured(object *unstructured.Unstructured) int32 {
if object == nil || reflect.ValueOf(object).IsNil() {
return 0
}
whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(w.Reader)
if err != nil {
klog.Errorf("Failed to get workloadSpread custom workload white list from kruise config map")
return 0
}

gvk := object.GroupVersionKind()
for _, workload := range whiteList.Workloads {
if workload.GroupVersionKind.GroupKind() != gvk.GroupKind() {
continue
}
var exists bool
var replicas int64
path := strings.Split(workload.ReplicasPath, ".")
if len(path) > 0 {
replicas, exists, err = unstructured.NestedInt64(object.Object, path...)
if err != nil || !exists {
klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath)
}
} else {
replicas, exists, err = unstructured.NestedInt64(object.Object, "spec", "replicas")
if err != nil || !exists {
klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath)
}
}
return int32(replicas)
}
return 0
}

func (w workloadEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
w.handleWorkload(q, evt.Object, DeleteEventAction)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ package client

import (
"fmt"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func NewClientFromManager(mgr manager.Manager, name string) client.Client {
cfg := rest.CopyConfig(mgr.GetConfig())
cfg.UserAgent = fmt.Sprintf("kruise-manager/%s", name)

delegatingClient, _ := cluster.DefaultNewClient(mgr.GetCache(), cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
c, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
panic(err)
}

delegatingClient, _ := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: mgr.GetCache(),
Client: c,
CacheUnstructured: true,
})
return delegatingClient
}
24 changes: 21 additions & 3 deletions pkg/util/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func GetSidecarSetPatchMetadataWhiteList(client client.Client) (*SidecarSetPatch
return whiteList, nil
}

func GetPPSWatchWatchCustomWorkloadWhiteList(client client.Client) (*PPSWatchWatchCustomWorkloadWhiteList, error) {
whiteList := &PPSWatchWatchCustomWorkloadWhiteList{Workloads: make([]schema.GroupVersionKind, 0)}
func GetPPSWatchCustomWorkloadWhiteList(client client.Client) (*CustomWorkloadWhiteList, error) {
whiteList := &CustomWorkloadWhiteList{Workloads: make([]schema.GroupVersionKind, 0)}
data, err := getKruiseConfiguration(client)
if err != nil {
return nil, err
Expand All @@ -69,7 +69,25 @@ func GetPPSWatchWatchCustomWorkloadWhiteList(client client.Client) (*PPSWatchWat
return whiteList, nil
}

func getKruiseConfiguration(c client.Client) (map[string]string, error) {
func GetWSWatchCustomWorkloadWhiteList(client client.Reader) (WSCustomWorkloadWhiteList, error) {
whiteList := WSCustomWorkloadWhiteList{}
data, err := getKruiseConfiguration(client)
if err != nil {
return whiteList, err
} else if len(data) == 0 {
return whiteList, nil
}
value, ok := data[WSWatchCustomWorkloadWhiteList]
if !ok {
return whiteList, nil
}
if err = json.Unmarshal([]byte(value), &whiteList); err != nil {
return whiteList, err
}
return whiteList, nil
}

func getKruiseConfiguration(c client.Reader) (map[string]string, error) {
cfg := &corev1.ConfigMap{}
err := c.Get(context.TODO(), client.ObjectKey{Namespace: util.GetKruiseNamespace(), Name: KruiseConfigurationName}, cfg)
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/util/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
SidecarSetPatchPodMetadataWhiteListKey = "SidecarSet_PatchPodMetadata_WhiteList"
PPSWatchCustomWorkloadWhiteList = "PPS_Watch_Custom_Workload_WhiteList"
WSWatchCustomWorkloadWhiteList = "WorkloadSpread_Watch_Custom_Workload_WhiteList"
)

type SidecarSetPatchMetadataWhiteList struct {
Expand All @@ -40,11 +41,11 @@ type SidecarSetPatchMetadataWhiteRule struct {
AllowedAnnotationKeyExprs []string `json:"allowedAnnotationKeyExprs"`
}

type PPSWatchWatchCustomWorkloadWhiteList struct {
type CustomWorkloadWhiteList struct {
Workloads []schema.GroupVersionKind `json:"workloads,omitempty"`
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool {
func (p *CustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool {
for _, workload := range p.Workloads {
if workload.Group == gk.Group && workload.Kind == gk.Kind {
return true
Expand All @@ -53,7 +54,7 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool
return false
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVersion, kind string) bool {
func (p *CustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVersion, kind string) bool {
if p.IsDefaultSupport(apiVersion, kind) {
return true
}
Expand All @@ -65,7 +66,7 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVers
return p.IsValid(gk)
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind string) bool {
func (p *CustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind string) bool {
gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return false
Expand All @@ -75,3 +76,13 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind
}
return false
}

type WSCustomWorkloadWhiteList struct {
Workloads []CustomWorkload `json:"workloads,omitempty"`
}

type CustomWorkload struct {
schema.GroupVersionKind `json:",inline"`
SubResources []schema.GroupVersionKind `json:"subResources,omitempty"`
ReplicasPath string `json:"replicasPath,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/util/controllerfinder/controller_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (r *ControllerFinder) getPodStatefulSetLike(ref ControllerReference, namesp
if err != nil {
return nil, nil
}
whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(r.Client)
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(r.Client)
if err != nil {
return nil, err
}
Expand Down
39 changes: 31 additions & 8 deletions pkg/util/controllerfinder/pods_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, active bool) ([]*corev1.Pod, int32, error) {
var workloadUIDs []types.UID
var workloadReplicas int32
var labelSelector *metav1.LabelSelector
switch kind {
// ReplicaSet
case ControllerKindRS.Kind:
Expand Down Expand Up @@ -66,6 +67,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
return nil, 0, nil
}
workloadReplicas = obj.Scale
labelSelector = obj.Selector
// try to get replicaSets
rss, err := r.getReplicaSetsForObject(obj)
if err != nil {
Expand All @@ -83,16 +85,11 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
return nil, workloadReplicas, nil
}

// List all Pods owned by workload UID.
matchedPods := make([]*corev1.Pod, 0)
for _, uid := range workloadUIDs {
listPods := func(listOption *client.ListOptions) ([]*corev1.Pod, error) {
matchedPods := make([]*corev1.Pod, 0)
podList := &corev1.PodList{}
listOption := &client.ListOptions{
Namespace: ns,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(uid)}),
}
if err := r.List(context.TODO(), podList, listOption, utilclient.DisableDeepCopy); err != nil {
return nil, -1, err
return nil, err
}
for i := range podList.Items {
pod := &podList.Items[i]
Expand All @@ -102,8 +99,34 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
}
matchedPods = append(matchedPods, pod)
}
return matchedPods, nil
}

var err error
var matchedPods []*corev1.Pod
for _, uid := range workloadUIDs {
listOption := client.ListOptions{
Namespace: ns,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(uid)}),
}
pods, err := listPods(&listOption)
if err != nil {
return nil, -1, err
}
matchedPods = append(matchedPods, pods...)
}

if len(matchedPods) == 0 { // try again using label selector
selector, _ := metav1.LabelSelectorAsSelector(labelSelector)
listOption := client.ListOptions{
Namespace: ns,
LabelSelector: selector,
}
matchedPods, err = listPods(&listOption)
if err != nil {
return nil, -1, err
}
}
return matchedPods, workloadReplicas, nil
}

Expand Down
Loading

0 comments on commit 9f1b7d6

Please sign in to comment.