Skip to content

Commit

Permalink
workloadspread support crd (#1286)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
veophi and mingzhou.swx authored May 23, 2023
1 parent 40e62c6 commit 95e42f3
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(mgr.GetClient())
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(mgr.GetClient())
if err != nil {
return err
}
if whiteList != nil {
workloadHandler := &enqueueRequestForStatefulSetLike{reader: mgr.GetClient()}
for _, workload := range whiteList.Workloads {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload); err != nil {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload, "PPS"); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old,

func (p *enqueueRequestForPod) fetchPersistentPodState(pod *corev1.Pod) *appsv1alpha1.PersistentPodState {
ref := metav1.GetControllerOf(pod)
whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(p.client)
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(p.client)
if err != nil {
klog.Errorf("Failed to get persistent pod state config white list, error: %v\n", err.Error())
return nil
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/util/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -51,18 +52,19 @@ func DiscoverGVK(gvk schema.GroupVersionKind) bool {
return true
}

func AddWatcherDynamically(c controller.Controller, h handler.EventHandler, gvk schema.GroupVersionKind) (bool, error) {
if _, ok := watcherMap.Load(gvk); ok {
func AddWatcherDynamically(c controller.Controller, h handler.EventHandler, gvk schema.GroupVersionKind, controllerKey string) (bool, error) {
cacheKey := fmt.Sprintf("controller:%s, gvk:%s", controllerKey, gvk.String())
if _, ok := watcherMap.Load(cacheKey); ok {
return false, nil
}

if !DiscoverGVK(gvk) {
klog.Errorf("Failed to find GVK(%v) in cluster", gvk.String())
klog.Errorf("Failed to find GVK(%v) in cluster for %s", gvk.String(), controllerKey)
return false, nil
}

object := &unstructured.Unstructured{}
object.SetGroupVersionKind(gvk)
watcherMap.Store(gvk, true)
watcherMap.Store(cacheKey, true)
return true, c.Watch(&source.Kind{Type: object}, h)
}
21 changes: 16 additions & 5 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
ctrlUtil "github.com/openkruise/kruise/pkg/controller/util"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/configuration"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
"github.com/openkruise/kruise/pkg/util/fieldindex"
Expand Down Expand Up @@ -152,6 +154,19 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for replicas changes to other CRD
whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(mgr.GetClient())
if err != nil {
return err
}
if len(whiteList.Workloads) > 0 {
workloadHandler := &workloadEventHandler{Reader: mgr.GetClient()}
for _, workload := range whiteList.Workloads {
if _, err := ctrlUtil.AddWatcherDynamically(c, workloadHandler, workload.GroupVersionKind, "WorkloadSpread"); err != nil {
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -272,14 +287,10 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work
targetRef := ws.Spec.TargetReference

switch targetRef.Kind {
case controllerKindDep.Kind, controllerKindRS.Kind, controllerKruiseKindCS.Kind, controllerKindSts.Kind:
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
case controllerKindJob.Kind:
pods, workloadReplicas, err = r.getPodJob(targetRef, ws.Namespace)
default:
r.recorder.Eventf(ws, corev1.EventTypeWarning,
"TargetReferenceError", "targetReference is not been recognized")
return nil, -1, nil
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
}

if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/workloadspread/workloadspread_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package workloadspread
import (
"context"
"encoding/json"
"reflect"
"strings"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,6 +39,7 @@ import (

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/util/configuration"
wsutil "github.com/openkruise/kruise/pkg/util/workloadspread"
)

Expand Down Expand Up @@ -125,6 +129,10 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
oldReplicas = *evt.ObjectOld.(*appsv1beta1.StatefulSet).Spec.Replicas
newReplicas = *evt.ObjectNew.(*appsv1beta1.StatefulSet).Spec.Replicas
gvk = controllerKruiseKindSts
case *unstructured.Unstructured:
oldReplicas = w.getReplicasFromUnstructured(evt.ObjectOld.(*unstructured.Unstructured))
newReplicas = w.getReplicasFromUnstructured(evt.ObjectNew.(*unstructured.Unstructured))
gvk = evt.ObjectNew.(*unstructured.Unstructured).GroupVersionKind()
default:
return
}
Expand All @@ -150,6 +158,40 @@ func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimi
}
}

func (w *workloadEventHandler) getReplicasFromUnstructured(object *unstructured.Unstructured) int32 {
if object == nil || reflect.ValueOf(object).IsNil() {
return 0
}
whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(w.Reader)
if err != nil {
klog.Errorf("Failed to get workloadSpread custom workload white list from kruise config map")
return 0
}

gvk := object.GroupVersionKind()
for _, workload := range whiteList.Workloads {
if workload.GroupVersionKind.GroupKind() != gvk.GroupKind() {
continue
}
var exists bool
var replicas int64
path := strings.Split(workload.ReplicasPath, ".")
if len(path) > 0 {
replicas, exists, err = unstructured.NestedInt64(object.Object, path...)
if err != nil || !exists {
klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath)
}
} else {
replicas, exists, err = unstructured.NestedInt64(object.Object, "spec", "replicas")
if err != nil || !exists {
klog.Errorf("Failed to get replicas from %v, replicas path %s", gvk, workload.ReplicasPath)
}
}
return int32(replicas)
}
return 0
}

func (w workloadEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
w.handleWorkload(q, evt.Object, DeleteEventAction)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ import (

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func NewClientFromManager(mgr manager.Manager, name string) client.Client {
cfg := rest.CopyConfig(mgr.GetConfig())
cfg.UserAgent = fmt.Sprintf("kruise-manager/%s", name)

delegatingClient, _ := cluster.DefaultNewClient(mgr.GetCache(), cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
c, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
panic(err)
}

delegatingClient, _ := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: mgr.GetCache(),
Client: c,
CacheUnstructured: true,
})
return delegatingClient
}
24 changes: 21 additions & 3 deletions pkg/util/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func GetSidecarSetPatchMetadataWhiteList(client client.Client) (*SidecarSetPatch
return whiteList, nil
}

func GetPPSWatchWatchCustomWorkloadWhiteList(client client.Client) (*PPSWatchWatchCustomWorkloadWhiteList, error) {
whiteList := &PPSWatchWatchCustomWorkloadWhiteList{Workloads: make([]schema.GroupVersionKind, 0)}
func GetPPSWatchCustomWorkloadWhiteList(client client.Client) (*CustomWorkloadWhiteList, error) {
whiteList := &CustomWorkloadWhiteList{Workloads: make([]schema.GroupVersionKind, 0)}
data, err := getKruiseConfiguration(client)
if err != nil {
return nil, err
Expand All @@ -69,7 +69,25 @@ func GetPPSWatchWatchCustomWorkloadWhiteList(client client.Client) (*PPSWatchWat
return whiteList, nil
}

func getKruiseConfiguration(c client.Client) (map[string]string, error) {
func GetWSWatchCustomWorkloadWhiteList(client client.Reader) (WSCustomWorkloadWhiteList, error) {
whiteList := WSCustomWorkloadWhiteList{}
data, err := getKruiseConfiguration(client)
if err != nil {
return whiteList, err
} else if len(data) == 0 {
return whiteList, nil
}
value, ok := data[WSWatchCustomWorkloadWhiteList]
if !ok {
return whiteList, nil
}
if err = json.Unmarshal([]byte(value), &whiteList); err != nil {
return whiteList, err
}
return whiteList, nil
}

func getKruiseConfiguration(c client.Reader) (map[string]string, error) {
cfg := &corev1.ConfigMap{}
err := c.Get(context.TODO(), client.ObjectKey{Namespace: util.GetKruiseNamespace(), Name: KruiseConfigurationName}, cfg)
if err != nil {
Expand Down
20 changes: 16 additions & 4 deletions pkg/util/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
SidecarSetPatchPodMetadataWhiteListKey = "SidecarSet_PatchPodMetadata_WhiteList"
PPSWatchCustomWorkloadWhiteList = "PPS_Watch_Custom_Workload_WhiteList"
WSWatchCustomWorkloadWhiteList = "WorkloadSpread_Watch_Custom_Workload_WhiteList"
)

type SidecarSetPatchMetadataWhiteList struct {
Expand All @@ -40,11 +41,11 @@ type SidecarSetPatchMetadataWhiteRule struct {
AllowedAnnotationKeyExprs []string `json:"allowedAnnotationKeyExprs"`
}

type PPSWatchWatchCustomWorkloadWhiteList struct {
type CustomWorkloadWhiteList struct {
Workloads []schema.GroupVersionKind `json:"workloads,omitempty"`
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool {
func (p *CustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool {
for _, workload := range p.Workloads {
if workload.Group == gk.Group && workload.Kind == gk.Kind {
return true
Expand All @@ -53,7 +54,7 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) IsValid(gk metav1.GroupKind) bool
return false
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVersion, kind string) bool {
func (p *CustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVersion, kind string) bool {
if p.IsDefaultSupport(apiVersion, kind) {
return true
}
Expand All @@ -65,7 +66,7 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) ValidateAPIVersionAndKind(apiVers
return p.IsValid(gk)
}

func (p *PPSWatchWatchCustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind string) bool {
func (p *CustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind string) bool {
gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return false
Expand All @@ -75,3 +76,14 @@ func (p *PPSWatchWatchCustomWorkloadWhiteList) IsDefaultSupport(apiVersion, kind
}
return false
}

type WSCustomWorkloadWhiteList struct {
Workloads []CustomWorkload `json:"workloads,omitempty"`
}

type CustomWorkload struct {
schema.GroupVersionKind `json:",inline"`
SubResources []schema.GroupVersionKind `json:"subResources,omitempty"`
// ReplicasPath is the replicas field path of this type of workload, such as "spec.replicas"
ReplicasPath string `json:"replicasPath,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/util/controllerfinder/controller_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (r *ControllerFinder) getPodStatefulSetLike(ref ControllerReference, namesp
if err != nil {
return nil, nil
}
whiteList, err := configuration.GetPPSWatchWatchCustomWorkloadWhiteList(r.Client)
whiteList, err := configuration.GetPPSWatchCustomWorkloadWhiteList(r.Client)
if err != nil {
return nil, err
}
Expand Down
56 changes: 46 additions & 10 deletions pkg/util/controllerfinder/pods_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, active bool) ([]*corev1.Pod, int32, error) {
var workloadUIDs []types.UID
var workloadReplicas int32
var labelSelector *metav1.LabelSelector
switch kind {
// ReplicaSet
case ControllerKindRS.Kind:
Expand All @@ -57,8 +58,8 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
}
workloadReplicas = obj.Scale
workloadUIDs = append(workloadUIDs, obj.UID)
// Deployment, Deployment-like workload or other custom workload(support scale sub-resources)
default:
// Deployment
case ControllerKindDep.Kind:
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
Expand All @@ -78,21 +79,27 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
workloadUIDs = append(workloadUIDs, rs.UID)
}
}
// The Other custom workload(support scale sub-resources)
default:
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
} else if obj == nil || !obj.Metadata.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = obj.Scale
labelSelector = obj.Selector
workloadUIDs = append(workloadUIDs, obj.UID)
}
if workloadReplicas == 0 {
return nil, workloadReplicas, nil
}

// List all Pods owned by workload UID.
matchedPods := make([]*corev1.Pod, 0)
for _, uid := range workloadUIDs {
listPods := func(listOption *client.ListOptions) ([]*corev1.Pod, error) {
matchedPods := make([]*corev1.Pod, 0)
podList := &corev1.PodList{}
listOption := &client.ListOptions{
Namespace: ns,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(uid)}),
}
if err := r.List(context.TODO(), podList, listOption, utilclient.DisableDeepCopy); err != nil {
return nil, -1, err
return nil, err
}
for i := range podList.Items {
pod := &podList.Items[i]
Expand All @@ -102,8 +109,37 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
}
matchedPods = append(matchedPods, pod)
}
return matchedPods, nil
}

var err error
var matchedPods []*corev1.Pod
for _, uid := range workloadUIDs {
listOption := client.ListOptions{
Namespace: ns,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(uid)}),
}
pods, err := listPods(&listOption)
if err != nil {
return nil, -1, err
}
matchedPods = append(matchedPods, pods...)
}

// For such workloads like Deployment that do not manage the Pods directly,
// Pods' ownerReferences do not contain the workload, so we have to retry
// to use the label selector to list the Pods.
if labelSelector != nil && len(matchedPods) == 0 {
selector, _ := metav1.LabelSelectorAsSelector(labelSelector)
listOption := client.ListOptions{
Namespace: ns,
LabelSelector: selector,
}
matchedPods, err = listPods(&listOption)
if err != nil {
return nil, -1, err
}
}
return matchedPods, workloadReplicas, nil
}

Expand Down
Loading

0 comments on commit 95e42f3

Please sign in to comment.