Skip to content

Commit

Permalink
✨ Implement ManifestWorkReplicaSet RollOut strategy (#259)
Browse files Browse the repository at this point in the history
* Implement rollout strategy

Signed-off-by: melserngawy <melserng@redhat.com>

* Update API and new logic

Signed-off-by: melserngawy <melserng@redhat.com>

---------

Signed-off-by: melserngawy <melserng@redhat.com>
  • Loading branch information
serngawy authored Nov 2, 2023
1 parent 370dbba commit 35680c3
Show file tree
Hide file tree
Showing 13 changed files with 951 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pkg/common/testing/assertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func AssertErrorWithPrefix(t *testing.T, actual error, expectedErrorPrefix strin
func AssertActions(t *testing.T, actualActions []clienttesting.Action, expectedVerbs ...string) {
t.Helper()
if len(actualActions) != len(expectedVerbs) {
t.Fatalf("expected %d call but got: %#v", len(expectedVerbs), actualActions)
t.Fatalf("expected %d call but got %d: %#v", len(expectedVerbs), len(actualActions), actualActions)
}
for i, expected := range expectedVerbs {
if actualActions[i].GetVerb() != expected {
Expand Down
26 changes: 26 additions & 0 deletions pkg/work/helper/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1 "open-cluster-management.io/api/work/v1"
)

Expand Down Expand Up @@ -467,3 +471,25 @@ func BuildResourceMeta(
resourceMeta.Resource = mapping.Resource.Resource
return resourceMeta, mapping.Resource, err
}

type PlacementDecisionGetter struct {
Client clusterlister.PlacementDecisionLister
}

func (pdl PlacementDecisionGetter) List(selector labels.Selector, namespace string) ([]*clusterv1beta1.PlacementDecision, error) {
return pdl.Client.PlacementDecisions(namespace).List(selector)
}

// Get added and deleted clusters names
func GetClusters(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement,
existingClusters sets.Set[string]) (sets.Set[string], sets.Set[string], error) {
pdtracker := GetPlacementTracker(client, placement, existingClusters)

return pdtracker.GetClusterChanges()
}

func GetPlacementTracker(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement,
existingClusters sets.Set[string]) *clusterv1beta1.PlacementDecisionClustersTracker {

return clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
// TODO move this to the api repo
ManifestWorkReplicaSetControllerNameLabelKey = "work.open-cluster-management.io/manifestworkreplicaset"

// ManifestWorkReplicaSetPlacementNameLabelKey is the label key on manifestwork to ref to the Placement that select
// the managedCluster on the manifestWorkReplicaSet's PlacementRef.
ManifestWorkReplicaSetPlacementNameLabelKey = "work.open-cluster-management.io/placementname"

// ManifestWorkReplicaSetFinalizer is the name of the finalizer added to ManifestWorkReplicaSet. It is used to ensure
// related manifestworks is deleted
ManifestWorkReplicaSetFinalizer = "work.open-cluster-management.io/manifest-work-cleanup"
Expand Down Expand Up @@ -124,7 +128,7 @@ func newController(workClient workclientset.Interface,
}
}

// sync is the main reconcile loop for placeManifest work. It is triggered every 15sec
// sync is the main reconcile loop for ManifestWorkReplicaSet. It is triggered every 15sec
func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
key := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ManifestWorkReplicaSet %q", key)
Expand Down Expand Up @@ -180,3 +184,19 @@ func listManifestWorksByManifestWorkReplicaSet(mwrs *workapiv1alpha1.ManifestWor
selector := labels.NewSelector().Add(*req)
return manifestWorkLister.List(selector)
}

func listManifestWorksByMWRSetPlacementRef(mwrs *workapiv1alpha1.ManifestWorkReplicaSet, placementName string,
manifestWorkLister worklisterv1.ManifestWorkLister) ([]*workapiv1.ManifestWork, error) {
reqMWRSet, err := labels.NewRequirement(ManifestWorkReplicaSetControllerNameLabelKey, selection.Equals, []string{manifestWorkReplicaSetKey(mwrs)})
if err != nil {
return nil, err
}

reqPlacementRef, err := labels.NewRequirement(ManifestWorkReplicaSetPlacementNameLabelKey, selection.Equals, []string{placementName})
if err != nil {
return nil, err
}

selector := labels.NewSelector().Add(*reqMWRSet, *reqPlacementRef)
return manifestWorkLister.List(selector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
works: helpertest.CreateTestManifestWorks("test", "default", "placement", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return p
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
works: helpertest.CreateTestManifestWorks("test", "default", "placement", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster2", "cluster3", "cluster4")
return p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
"open-cluster-management.io/api/utils/work/v1/workapplier"
workv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
)

// deployReconciler is to manage ManifestWork based on the placement.
Expand All @@ -31,90 +31,126 @@ type deployReconciler struct {
func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
) (*workapiv1alpha1.ManifestWorkReplicaSet, reconcileState, error) {
// Manifestwork create/update/delete logic.
var placements []*clusterv1beta1.Placement
var errs []error
var plcsSummary []workapiv1alpha1.PlacementSummary
count, total := 0, 0
// Getting the placements and the created ManifestWorks related to each placement
for _, placementRef := range mwrSet.Spec.PlacementRefs {
var existingRolloutClsStatus []clusterv1alpha1.ClusterRolloutStatus
existingClusterNames := sets.New[string]()
placement, err := d.placementLister.Placements(mwrSet.Namespace).Get(placementRef.Name)

if errors.IsNotFound(err) {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, ""))
return mwrSet, reconcileStop, nil
}

if err != nil {
return mwrSet, reconcileContinue, fmt.Errorf("failed get placement %w", err)
}
placements = append(placements, placement)
}

manifestWorks, err := listManifestWorksByManifestWorkReplicaSet(mwrSet, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}
manifestWorks, err := listManifestWorksByMWRSetPlacementRef(mwrSet, placementRef.Name, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}

var errs []error
addedClusters, deletedClusters, existingClusters := sets.New[string](), sets.New[string](), sets.New[string]()
for _, mw := range manifestWorks {
existingClusters.Insert(mw.Namespace)
}
for _, mw := range manifestWorks {
// Check if ManifestWorkTemplate changes, ManifestWork will need to be updated.
newMW := &workv1.ManifestWork{}
mw.ObjectMeta.DeepCopyInto(&newMW.ObjectMeta)
mwrSet.Spec.ManifestWorkTemplate.DeepCopyInto(&newMW.Spec)

// TODO: Create NeedToApply function by workApplier to check the manifestWork->spec hash value from the cache.
if !workapplier.ManifestWorkEqual(newMW, mw) {
continue
}

existingClusterNames.Insert(mw.Namespace)
rolloutClusterStatus, err := d.clusterRolloutStatusFunc(mw.Namespace, *mw)

for _, placement := range placements {
added, deleted, err := helpers.GetClusterChanges(d.placeDecisionLister, placement, existingClusters)
if err != nil {
errs = append(errs, err)
continue
}
existingRolloutClsStatus = append(existingRolloutClsStatus, rolloutClusterStatus)
}

placeTracker := helper.GetPlacementTracker(d.placeDecisionLister, placement, existingClusterNames)
rolloutHandler, err := clusterv1alpha1.NewRolloutHandler(placeTracker, d.clusterRolloutStatusFunc)
if err != nil {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

addedClusters = addedClusters.Union(added)
deletedClusters = deletedClusters.Union(deleted)
}

// Create manifestWork for added clusters
for cls := range addedClusters {
mw, err := CreateManifestWork(mwrSet, cls)
err = placeTracker.Refresh()
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
_, rolloutResult, err := rolloutHandler.GetRolloutCluster(placementRef.RolloutStrategy, existingRolloutClsStatus)

if err != nil {
errs = append(errs, err)
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

continue
}
}

// Update manifestWorks in case there are changes at ManifestWork or ManifestWorkReplicaSet
for cls := range existingClusters {
// Delete manifestWork for deleted clusters
if deletedClusters.Has(cls) {
err = d.workApplier.Delete(ctx, cls, mwrSet.Name)
// Create ManifestWorks
for _, rolloutStatue := range rolloutResult.ClustersToRollout {
if rolloutStatue.Status == clusterv1alpha1.ToApply {
mw, err := CreateManifestWork(mwrSet, rolloutStatue.ClusterName, placementRef.Name)
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
}
if !existingClusterNames.Has(rolloutStatue.ClusterName) {
existingClusterNames.Insert(rolloutStatue.ClusterName)
}
}
}

for _, cls := range rolloutResult.ClustersRemoved {
// Delete manifestWork for removed clusters
err = d.workApplier.Delete(ctx, cls.ClusterName, mwrSet.Name)
if err != nil {
errs = append(errs, err)
continue
}
continue
existingClusterNames.Delete(cls.ClusterName)
}

mw, err := CreateManifestWork(mwrSet, cls)
if err != nil {
errs = append(errs, err)
continue
total = total + int(placement.Status.NumberOfSelectedClusters)
plcSummary := workapiv1alpha1.PlacementSummary{
Name: placementRef.Name,
AvailableDecisionGroups: getAvailableDecisionGroupProgressMessage(len(placement.Status.DecisionGroups),
len(existingClusterNames), placement.Status.NumberOfSelectedClusters),
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
mwrSetSummary := workapiv1alpha1.ManifestWorkReplicaSetSummary{
Total: len(existingClusterNames),
}
plcSummary.Summary = mwrSetSummary
plcsSummary = append(plcsSummary, plcSummary)

count = count + len(existingClusterNames)
}
// Set the placements summary
mwrSet.Status.PlacementsSummary = plcsSummary

// Set the Summary
if mwrSet.Status.Summary == (workapiv1alpha1.ManifestWorkReplicaSetSummary{}) {
mwrSet.Status.Summary = workapiv1alpha1.ManifestWorkReplicaSetSummary{}
}
total := len(existingClusters) - len(deletedClusters) + len(addedClusters)
if total < 0 {
total = 0
}

mwrSet.Status.Summary.Total = total
if total == 0 {
mwrSet.Status.Summary.Total = count
if count == 0 {
mwrSet.Status.Summary.Applied = 0
mwrSet.Status.Summary.Available = 0
mwrSet.Status.Summary.Degraded = 0
Expand All @@ -124,9 +160,54 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonAsExpected, ""))
}

if total == count {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonComplete, ""))
} else {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonProgressing, ""))
}

return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

func (d *deployReconciler) clusterRolloutStatusFunc(clusterName string, manifestWork workv1.ManifestWork) (clusterv1alpha1.ClusterRolloutStatus, error) {
clsRolloutStatus := clusterv1alpha1.ClusterRolloutStatus{
ClusterName: clusterName,
LastTransitionTime: &manifestWork.CreationTimestamp,
// Default status is ToApply
Status: clusterv1alpha1.ToApply,
}

appliedCondition := apimeta.FindStatusCondition(manifestWork.Status.Conditions, workv1.WorkApplied)

// Applied condition not exist return status as ToApply.
if appliedCondition == nil {
return clsRolloutStatus, nil
} else if appliedCondition.Status == metav1.ConditionTrue ||
apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkProgressing) {
// Applied OR Progressing conditions status true return status as Progressing
// ManifestWork Progressing status is not defined however the check is made for future work availability.
clsRolloutStatus.Status = clusterv1alpha1.Progressing
} else if appliedCondition.Status == metav1.ConditionFalse {
// Applied Condition status false return status as failed
clsRolloutStatus.Status = clusterv1alpha1.Failed
return clsRolloutStatus, nil
}

// Available condition return status as Succeeded
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkAvailable) {
clsRolloutStatus.Status = clusterv1alpha1.Succeeded
return clsRolloutStatus, nil
}

// Degraded condition return status as Failed
// ManifestWork Degraded status is not defined however the check is made for future work availability.
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkDegraded) {
clsRolloutStatus.Status = clusterv1alpha1.Failed
}

return clsRolloutStatus, nil
}

// GetManifestworkApplied return only True status if there all clusters have manifests applied as expected
func GetManifestworkApplied(reason string, message string) metav1.Condition {
if reason == workapiv1alpha1.ReasonAsExpected {
Expand All @@ -146,6 +227,15 @@ func GetPlacementDecisionVerified(reason string, message string) metav1.Conditio
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementVerified, reason, message, metav1.ConditionFalse)
}

// GetPlacementRollout return only True status if there are clusters selected
func GetPlacementRollOut(reason string, message string) metav1.Condition {
if reason == workapiv1alpha1.ReasonComplete {
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionTrue)
}

return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionFalse)
}

func getCondition(conditionType string, reason string, message string, status metav1.ConditionStatus) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Expand All @@ -156,7 +246,7 @@ func getCondition(conditionType string, reason string, message string, status me
}
}

func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string) (*workv1.ManifestWork, error) {
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}
Expand All @@ -165,7 +255,12 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet)},
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}

func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string {
return fmt.Sprintf("%d (%d / %d clusters applied)", groupNum, existingClsCount, totalCls)
}
Loading

0 comments on commit 35680c3

Please sign in to comment.