Skip to content

Commit

Permalink
feat(sync): sync to none cluster when object is fed hpa
Browse files Browse the repository at this point in the history
  • Loading branch information
wy-lucky committed Nov 7, 2023
1 parent 070a686 commit d203f77
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 58 deletions.
14 changes: 8 additions & 6 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ const (
TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch"

LatestReplicasetDigestsAnnotation = DefaultPrefix + "latest-replicaset-digests"

HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path"
)

// The following consts are labels key-values used by Kubeadmiral controllers.

const (
FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled"
)

// PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present
Expand Down Expand Up @@ -190,9 +198,3 @@ var (
//
//nolint:lll
const MaxFederatedObjectNameLength = 253

// HPAScaleTargetRefPath defines the fed hpa annotations and labels
const (
HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path"
FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled"
)
41 changes: 21 additions & 20 deletions pkg/controllers/federatedhpa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package federatedhpa
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -80,9 +81,10 @@ type FederatedHPAController struct {
worker worker.ReconcileWorker[Resource]
cacheSyncRateLimiter workqueue.RateLimiter

scaleTargetRefMapping map[schema.GroupVersionKind]string
workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource]
ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource]
gvkToScaleTargetRefLock sync.RWMutex
gvkToScaleTargetRef map[schema.GroupVersionKind]string
workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource]
ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource]

metrics stats.Metrics
logger klog.Logger
Expand Down Expand Up @@ -114,9 +116,9 @@ func NewFederatedHPAController(

cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),

scaleTargetRefMapping: map[schema.GroupVersionKind]string{},
workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](),
ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](),
gvkToScaleTargetRef: map[schema.GroupVersionKind]string{},
workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](),
ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](),

metrics: metrics,
logger: logger.WithValues("controller", FederatedHPAControllerName),
Expand Down Expand Up @@ -209,10 +211,13 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy
func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) {
logger := f.logger.WithValues("ftc", ftc.GetName())

f.gvkToScaleTargetRefLock.Lock()
defer f.gvkToScaleTargetRefLock.Unlock()

if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok {
f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath
f.gvkToScaleTargetRef[ftc.GetSourceTypeGVK()] = scaleTargetRefPath
} else {
delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK())
delete(f.gvkToScaleTargetRef, ftc.GetSourceTypeGVK())
return
}

Expand Down Expand Up @@ -329,8 +334,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusAllOK
}

scaleTargetRef := f.scaleTargetRefMapping[key.gvk]
newWorkloadResource, err := scaleTargetRefToResource(hpaObject, scaleTargetRef)
newWorkloadResource, err := f.scaleTargetRefToResource(key.gvk, hpaObject)
if err != nil {
logger.Error(err, "Failed to get workload resource from hpa")
return worker.StatusError
Expand Down Expand Up @@ -360,8 +364,9 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
}

var pp fedcorev1a1.GenericPropagationPolicy
var newPPResource *Resource
if workloadExist {
newPPResource := getPropagationPolicyResourceFromFedWorkload(fedWorkload)
newPPResource = getPropagationPolicyResourceFromFedWorkload(fedWorkload)
if newPPResource != nil {
_, exist = f.ppWorkloadMapping.LookupByT2(newWorkloadResource)
if exist {
Expand All @@ -388,11 +393,11 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}

if !workloadExist || isPropagationPolicyDividedMode(pp) {
if !workloadExist || isPropagationPolicyExist(pp) && isPropagationPolicyDividedMode(pp) {
isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue)
} else {
hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp))
hpaNotWorkReason := generateFederationHPANotWorkReason(newPPResource, pp)
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
Expand All @@ -408,7 +413,8 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
case FederatedHPAModeDistributed, FederatedHPAModeDefault:
isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey)

if !workloadExist || isPropagationPolicyDuplicateMode(pp) &&
if !workloadExist || isPropagationPolicyExist(pp) &&
isPropagationPolicyDuplicateMode(pp) &&
isPropagationPolicyFollowerEnabled(pp) &&
isWorkloadRetainReplicas(fedWorkload) &&
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) {
Expand All @@ -417,12 +423,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}
} else {
hpaNotWorkReason := generateDistributedHPANotWorkReason(
isPropagationPolicyExist(pp),
isPropagationPolicyDuplicateMode(pp),
isPropagationPolicyFollowerEnabled(pp),
isWorkloadRetainReplicas(fedWorkload),
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload))
hpaNotWorkReason := generateDistributedHPANotWorkReason(ctx, newPPResource, pp, fedWorkload, hpaObject)
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
Expand Down
91 changes: 61 additions & 30 deletions pkg/controllers/federatedhpa/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,58 +73,81 @@ func fedObjectToSourceObjectResource(object metav1.Object) (Resource, error) {
}

func policyObjectToResource(object metav1.Object) Resource {
if cpp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok {
if cpp, ok := object.(*fedcorev1a1.PropagationPolicy); ok {
return Resource{
name: cpp.GetName(),
namespace: cpp.GetNamespace(),
gvk: cpp.GroupVersionKind(),
gvk: schema.GroupVersionKind{
Group: fedcorev1a1.SchemeGroupVersion.Group,
Version: fedcorev1a1.SchemeGroupVersion.Version,
Kind: PropagationPolicyKind,
},
}
}

if pp, ok := object.(*fedcorev1a1.PropagationPolicy); ok {
if pp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok {
return Resource{
name: pp.GetName(),
namespace: pp.GetNamespace(),
gvk: pp.GroupVersionKind(),
gvk: schema.GroupVersionKind{
Group: fedcorev1a1.SchemeGroupVersion.Group,
Version: fedcorev1a1.SchemeGroupVersion.Version,
Kind: ClusterPropagationPolicyKind,
},
}
}

return Resource{}
}

func generateFederationHPANotWorkReason(isPropagationPolicyExist, isPropagationPolicyDividedMode bool) string {
func generateFederationHPANotWorkReason(newPPResource *Resource, pp fedcorev1a1.GenericPropagationPolicy) string {
var reasons []string
if !isPropagationPolicyExist {
reasons = append(reasons, "PropagationPolicy is not exist.")
if newPPResource == nil {
reasons = append(reasons, "The workload is not bound to any propagationPolicy.")
return fmt.Sprintf("%v", reasons)
}

ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name
if !isPropagationPolicyExist(pp) {
reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", ppKind, ppName))
return fmt.Sprintf("%v", reasons)
}
if isPropagationPolicyExist && !isPropagationPolicyDividedMode {
reasons = append(reasons, "PropagationPolicy is not divide.")
if isPropagationPolicyExist(pp) && !isPropagationPolicyDividedMode(pp) {
reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Divided mode.", ppKind, ppName))
}

return fmt.Sprintf("%v", reasons)
}

func generateDistributedHPANotWorkReason(
isPropagationPolicyExist,
isPropagationPolicyDuplicateMode,
isPropagationPolicyFollowerEnabled,
isWorkloadRetainReplicas,
isHPAFollowTheWorkload bool,
ctx context.Context,
newPPResource *Resource,
pp fedcorev1a1.GenericPropagationPolicy,
fedWorkload fedcorev1a1.GenericFederatedObject,
hpaObject *unstructured.Unstructured,
) string {
var reasons []string
if !isPropagationPolicyExist {
reasons = append(reasons, "PropagationPolicy is not exist.")
if newPPResource == nil {
reasons = append(reasons, "The workload is not bound to any propagationPolicy.")
return fmt.Sprintf("%v", reasons)
}
if !isPropagationPolicyDuplicateMode {
reasons = append(reasons, "PropagationPolicy is not Duplicate.")
if !isPropagationPolicyExist(pp) {
reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", pp.GetObjectKind(), pp.GetName()))
return fmt.Sprintf("%v", reasons)
}
if !isPropagationPolicyFollowerEnabled {
reasons = append(reasons, "PropagationPolicy follower is not enable.")

ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name
if !isPropagationPolicyDuplicateMode(pp) {
reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Duplicate mode.", ppKind, ppName))
}
if !isWorkloadRetainReplicas {
reasons = append(reasons, "Workload is not retain replicas.")
if !isPropagationPolicyFollowerEnabled(pp) {
reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not enabled for follower scheduling.", ppKind, ppName))
}
if !isHPAFollowTheWorkload {
reasons = append(reasons, "Hpa is not follow the workload.")
if !isWorkloadRetainReplicas(fedWorkload) {
reasons = append(reasons, "The workload is not enabled for retain replicas.")
}
if !isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) {
reasons = append(reasons, "The hpa is not follow the workload.")
}

return fmt.Sprintf("%v", reasons)
Expand All @@ -139,11 +162,11 @@ func isPropagationPolicyExist(pp fedcorev1a1.GenericPropagationPolicy) bool {
}

func isPropagationPolicyDividedMode(pp fedcorev1a1.GenericPropagationPolicy) bool {
return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide
return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide
}

func isPropagationPolicyDuplicateMode(pp fedcorev1a1.GenericPropagationPolicy) bool {
return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate
return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate
}

func isPropagationPolicyFollowerEnabled(pp fedcorev1a1.GenericPropagationPolicy) bool {
Expand Down Expand Up @@ -178,16 +201,19 @@ func (f *FederatedHPAController) isHPAType(resourceGVK schema.GroupVersionKind)
return false
}

f.gvkToScaleTargetRefLock.Lock()
defer f.gvkToScaleTargetRefLock.Unlock()

// HPA gvk has already been stored
if _, ok := f.scaleTargetRefMapping[resourceGVK]; ok {
if _, ok := f.gvkToScaleTargetRef[resourceGVK]; ok {
return true
}

if path, ok := ftc.Annotations[common.HPAScaleTargetRefPath]; ok {
f.scaleTargetRefMapping[resourceGVK] = path
f.gvkToScaleTargetRef[resourceGVK] = path
return true
} else {
delete(f.scaleTargetRefMapping, resourceGVK)
delete(f.gvkToScaleTargetRef, resourceGVK)
return false
}
}
Expand All @@ -196,7 +222,12 @@ func isWorkloadRetainReplicas(fedObj metav1.Object) bool {
return fedObj.GetAnnotations()[common.RetainReplicasAnnotation] == common.AnnotationValueTrue
}

func scaleTargetRefToResource(hpaUns *unstructured.Unstructured, scaleTargetRef string) (Resource, error) {
func (f *FederatedHPAController) scaleTargetRefToResource(gvk schema.GroupVersionKind, hpaUns *unstructured.Unstructured) (Resource, error) {

Check failure on line 225 in pkg/controllers/federatedhpa/util.go

View workflow job for this annotation

GitHub Actions / lint

line is 141 characters (lll)
f.gvkToScaleTargetRefLock.RLock()
defer f.gvkToScaleTargetRefLock.RUnlock()

scaleTargetRef := f.gvkToScaleTargetRef[gvk]

fieldVal, found, err := unstructured.NestedFieldCopy(hpaUns.Object, strings.Split(scaleTargetRef, ".")...)
if err != nil || !found {
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/controllers/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,11 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua
keyedLogger.Error(err, "Failed to get pending controllers")
return worker.StatusError
}
if len(pendingControllers) > 0 {
isFedHPAObject, err := isFedHPAObject(ctx, fedResource.Object())
if err != nil {
return worker.StatusError
}
if len(pendingControllers) > 0 && !isFedHPAObject {
// upstream controllers have not finished processing, we wait for our turn
return worker.StatusAllOK
}
Expand All @@ -422,7 +426,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua
fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer"))
return worker.StatusError
}
clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource)
clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource, isFedHPAObject)
if err != nil {
fedResource.RecordError("PrepareToSyncError", errors.Wrap(err, "Failed to prepare to sync"))
return worker.StatusError
Expand All @@ -440,6 +444,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua
func (s *SyncController) prepareToSync(
ctx context.Context,
fedResource FederatedResource,
isFedHPAObject bool,
) (
requireSync []*fedcorev1a1.FederatedCluster,
selectedClusters sets.Set[string],
Expand All @@ -459,6 +464,9 @@ func (s *SyncController) prepareToSync(
}
return nil, nil, err
}
if isFedHPAObject {
clusters = nil
}
clusterMap := make(map[string]*fedcorev1a1.FederatedCluster, len(clusters))
for _, cluster := range clusters {
clusterMap[cluster.Name] = cluster
Expand Down
42 changes: 42 additions & 0 deletions pkg/controllers/sync/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2023 The KubeAdmiral Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sync

import (
"context"

"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
)

func isFedHPAObject(ctx context.Context, fedObject fedcorev1a1.GenericFederatedObject) (bool, error) {
logger := klog.FromContext(ctx)

templateMetadata, err := fedObject.GetSpec().GetTemplateMetadata()
if err != nil {
logger.Error(err, "Failed to get TemplateMetadata")
return false, err
}

if value, ok := templateMetadata.GetLabels()[common.FedHPAEnableKey]; ok && value == common.AnnotationValueTrue {
return true, nil
}

return false, nil
}

0 comments on commit d203f77

Please sign in to comment.