Skip to content

Commit

Permalink
feat(sync): sync to none cluster when object is fed hpa
Browse files Browse the repository at this point in the history
  • Loading branch information
wy-lucky committed Nov 9, 2023
1 parent 070a686 commit 6f25440
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 79 deletions.
14 changes: 8 additions & 6 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ const (
TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch"

LatestReplicasetDigestsAnnotation = DefaultPrefix + "latest-replicaset-digests"

HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path"
)

// The following consts are labels key-values used by Kubeadmiral controllers.

const (
FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled"
)

// PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present
Expand Down Expand Up @@ -190,9 +198,3 @@ var (
//
//nolint:lll
const MaxFederatedObjectNameLength = 253

// HPAScaleTargetRefPath defines the fed hpa annotations and labels
const (
HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path"
FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled"
)
55 changes: 28 additions & 27 deletions pkg/controllers/federatedhpa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package federatedhpa
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -80,9 +81,10 @@ type FederatedHPAController struct {
worker worker.ReconcileWorker[Resource]
cacheSyncRateLimiter workqueue.RateLimiter

scaleTargetRefMapping map[schema.GroupVersionKind]string
workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource]
ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource]
gvkToScaleTargetRefLock sync.RWMutex
gvkToScaleTargetRef map[schema.GroupVersionKind]string
workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource]
ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource]

metrics stats.Metrics
logger klog.Logger
Expand Down Expand Up @@ -114,9 +116,9 @@ func NewFederatedHPAController(

cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),

scaleTargetRefMapping: map[schema.GroupVersionKind]string{},
workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](),
ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](),
gvkToScaleTargetRef: map[schema.GroupVersionKind]string{},
workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](),
ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](),

metrics: metrics,
logger: logger.WithValues("controller", FederatedHPAControllerName),
Expand Down Expand Up @@ -209,10 +211,13 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy
func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) {
logger := f.logger.WithValues("ftc", ftc.GetName())

f.gvkToScaleTargetRefLock.Lock()
defer f.gvkToScaleTargetRefLock.Unlock()

if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok {
f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath
f.gvkToScaleTargetRef[ftc.GetSourceTypeGVK()] = scaleTargetRefPath
} else {
delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK())
delete(f.gvkToScaleTargetRef, ftc.GetSourceTypeGVK())
return
}

Expand Down Expand Up @@ -329,8 +334,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusAllOK
}

scaleTargetRef := f.scaleTargetRefMapping[key.gvk]
newWorkloadResource, err := scaleTargetRefToResource(hpaObject, scaleTargetRef)
newWorkloadResource, err := f.scaleTargetRefToResource(key.gvk, hpaObject)
if err != nil {
logger.Error(err, "Failed to get workload resource from hpa")
return worker.StatusError
Expand Down Expand Up @@ -360,8 +364,9 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
}

var pp fedcorev1a1.GenericPropagationPolicy
var newPPResource *Resource
if workloadExist {
newPPResource := getPropagationPolicyResourceFromFedWorkload(fedWorkload)
newPPResource = getPropagationPolicyResourceFromFedWorkload(fedWorkload)
if newPPResource != nil {
_, exist = f.ppWorkloadMapping.LookupByT2(newWorkloadResource)
if exist {
Expand All @@ -388,11 +393,11 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}

if !workloadExist || isPropagationPolicyDividedMode(pp) {
isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue)
if !workloadExist || isPropagationPolicyExist(pp) && isPropagationPolicyDividedMode(pp) {
isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(ctx, hpaObject)
isHPAObjectUpdated = isHPAObjectUpdated || addFedHPAEnableLabel(ctx, hpaObject)
} else {
hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp))
hpaNotWorkReason := generateFederationHPANotWorkReason(newPPResource, pp)
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
Expand All @@ -401,28 +406,24 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
hpaNotWorkReason,
)

isHPAObjectUpdated = addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || removeHPALabel(hpaObject, common.FedHPAEnableKey)
isHPAObjectUpdated = addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaNotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPAEnableLabel(ctx, hpaObject)
}

case FederatedHPAModeDistributed, FederatedHPAModeDefault:
isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey)
isHPAObjectUpdated = removeFedHPAEnableLabel(ctx, hpaObject)

if !workloadExist || isPropagationPolicyDuplicateMode(pp) &&
if !workloadExist || isPropagationPolicyExist(pp) &&
isPropagationPolicyDuplicateMode(pp) &&
isPropagationPolicyFollowerEnabled(pp) &&
isWorkloadRetainReplicas(fedWorkload) &&
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) {
isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPANotWorkReasonAnno(ctx, hpaObject)
if isFedHPAObjectUpdated, err = removePendingController(ctx, hpaFTC, fedHPAObject); err != nil {
return worker.StatusError
}
} else {
hpaNotWorkReason := generateDistributedHPANotWorkReason(
isPropagationPolicyExist(pp),
isPropagationPolicyDuplicateMode(pp),
isPropagationPolicyFollowerEnabled(pp),
isWorkloadRetainReplicas(fedWorkload),
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload))
hpaNotWorkReason := generateDistributedHPANotWorkReason(ctx, newPPResource, pp, fedWorkload, hpaObject)
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
Expand All @@ -431,7 +432,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
hpaNotWorkReason,
)

isHPAObjectUpdated = isHPAObjectUpdated || addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaNotWorkReason)
if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject, hpaFTC); err != nil {
return worker.StatusError
}
Expand Down
Loading

0 comments on commit 6f25440

Please sign in to comment.