Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs for ResourceDistribution #861

Merged
merged 1 commit into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ test: generate fmt vet manifests ## Run tests

##@ Build

build: manifests generate fmt vet ## Build manager binary.
build: generate fmt vet manifests ## Build manager binary.
go build -o bin/manager main.go

run: manifests generate fmt vet ## Run a controller from your host.
Expand Down
9 changes: 6 additions & 3 deletions apis/apps/v1alpha1/resourcedistribution_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ const (
// +genclient
// +genclient:nonNamespaced
// +k8s:openapi-gen=true
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:scope=Cluster
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster,shortName=distributor
// +kubebuilder:printcolumn:name="TOTAL",type="integer",JSONPath=".status.desired",description="The desired number of desired distribution and syncs."
// +kubebuilder:printcolumn:name="SUCCEED",type="integer",JSONPath=".status.succeeded",description="The number of successful distribution and syncs."
// +kubebuilder:printcolumn:name="FAILED",type="integer",JSONPath=".status.failed",description="The number of failed distributions and syncs."

// ResourceDistribution is the Schema for the resourcedistributions API.
type ResourceDistribution struct {
Expand Down
17 changes: 16 additions & 1 deletion config/crd/bases/apps.kruise.io_resourcedistributions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,25 @@ spec:
kind: ResourceDistribution
listKind: ResourceDistributionList
plural: resourcedistributions
shortNames:
- distributor
singular: resourcedistribution
scope: Cluster
versions:
- name: v1alpha1
- additionalPrinterColumns:
- description: The desired number of desired distribution and syncs.
jsonPath: .status.desired
name: TOTAL
type: integer
- description: The number of successful distribution and syncs.
jsonPath: .status.succeeded
name: SUCCEED
type: integer
- description: The number of failed distributions and syncs.
jsonPath: .status.failed
name: FAILED
type: integer
name: v1alpha1
schema:
openAPIV3Schema:
description: ResourceDistribution is the Schema for the resourcedistributions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,46 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for changes to Secrets
err = c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{
IsController: true, OwnerType: &appsv1alpha1.ResourceDistribution{},
}, predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObject, oldOK := updateEvent.ObjectOld.(*corev1.Secret)
newObject, newOK := updateEvent.ObjectNew.(*corev1.Secret)
if !oldOK || !newOK {
return false
}
return !reflect.DeepEqual(oldObject.Data, newObject.Data) || !reflect.DeepEqual(oldObject.StringData, newObject.StringData)
},
})
if err != nil {
return err
}

// Watch for changes to ConfigMap
err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{
IsController: true, OwnerType: &appsv1alpha1.ResourceDistribution{},
}, predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObject, oldOK := updateEvent.ObjectOld.(*corev1.ConfigMap)
newObject, newOK := updateEvent.ObjectNew.(*corev1.ConfigMap)
if !oldOK || !newOK {
return false
}
return !reflect.DeepEqual(oldObject.Data, newObject.Data) || !reflect.DeepEqual(oldObject.BinaryData, newObject.BinaryData)
},
})
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -170,7 +210,7 @@ func (r *ReconcileResourceDistribution) doReconcile(distributor *appsv1alpha1.Re
_, cleanErrList := r.cleanResource(distributor, unmatchedNamespaces, resource)

// 3. process all errors about resource distribution and cleanup
conditions, errList := r.handleErrors(distributor.Name, distributeErrList, cleanErrList)
conditions, errList := r.handleErrors(distributeErrList, cleanErrList)

// 4. update distributor status
newStatus := calculateNewStatus(distributor, conditions, int32(len(matchedNamespaces)), succeeded)
Expand All @@ -185,9 +225,18 @@ func (r *ReconcileResourceDistribution) distributeResource(distributor *appsv1al

resourceName := utils.ConvertToUnstructured(resource).GetName()
resourceKind := resource.GetObjectKind().GroupVersionKind().Kind
newResourceHashCode := hashResource(distributor.Spec.Resource)

resourceHashCode := hashResource(distributor.Spec.Resource)
return syncItSlowly(matchedNamespaces, 1, func(namespace string) *UnexpectedError {
ns := &corev1.Namespace{}
getNSErr := r.Client.Get(context.TODO(), types.NamespacedName{Name: namespace}, ns)
if errors.IsNotFound(getNSErr) || (getNSErr == nil && ns.DeletionTimestamp != nil) {
return &UnexpectedError{
err: fmt.Errorf("namespace not found or is terminating"),
namespace: namespace,
conditionID: NotExistConditionID,
}
}

// 1. try to fetch existing old resource
oldResource := &unstructured.Unstructured{}
oldResource.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
Expand All @@ -202,17 +251,10 @@ func (r *ReconcileResourceDistribution) distributeResource(distributor *appsv1al
}

// 2. if resource doesn't exist, create resource;
newResource := makeResourceObject(distributor, namespace, resource, newResourceHashCode).(client.Object)
if getErr != nil && errors.IsNotFound(getErr) {
if createErr := r.Client.Create(context.TODO(), newResource); createErr != nil {
newResource := makeResourceObject(distributor, namespace, resource, resourceHashCode, nil)
if createErr := r.Client.Create(context.TODO(), newResource.(client.Object)); createErr != nil {
klog.Errorf("Error occurred when creating resource in namespace %s, err: %v, name: %s", namespace, createErr, distributor.Name)
if errors.IsNotFound(createErr) {
return &UnexpectedError{
err: fmt.Errorf("namespace not found"),
namespace: namespace,
conditionID: NotExistConditionID,
}
}
return &UnexpectedError{
err: createErr,
namespace: namespace,
Expand All @@ -223,19 +265,20 @@ func (r *ReconcileResourceDistribution) distributeResource(distributor *appsv1al
return nil
}

// 3. if resource exits
annotations := oldResource.GetAnnotations()
if annotations == nil || annotations[utils.SourceResourceDistributionOfResource] != distributor.Name {
// if conflict occurred
// 3. check conflict
if !isControlledByDistributor(oldResource, distributor) {
klog.Errorf("Conflict with existing resource(%s/%s) in namespaces %s, name: %s", resourceKind, resourceName, namespace, distributor.Name)
return &UnexpectedError{
err: fmt.Errorf("conflict with existing resources because of the same namespace and name"),
err: fmt.Errorf("conflict with existing resources because of the same namespace, group, version, kind and name"),
namespace: namespace,
conditionID: ConflictConditionID,
}
} else if annotations[utils.ResourceHashCodeAnnotation] != newResourceHashCode {
// else if the resource needs to update
if updateErr := r.Client.Update(context.TODO(), newResource); updateErr != nil {
}

// 4. check whether resource need to update
if needToUpdate(oldResource, utils.ConvertToUnstructured(resource)) {
newResource := makeResourceObject(distributor, namespace, resource, resourceHashCode, oldResource)
if updateErr := r.Client.Update(context.TODO(), newResource.(client.Object)); updateErr != nil {
klog.Errorf("Error occurred when updating resource in namespace %s, err: %v, name: %s", namespace, updateErr, distributor.Name)
return &UnexpectedError{
err: updateErr,
Expand All @@ -255,6 +298,12 @@ func (r *ReconcileResourceDistribution) cleanResource(distributor *appsv1alpha1.
resourceName := utils.ConvertToUnstructured(resource).GetName()
resourceKind := resource.GetObjectKind().GroupVersionKind().Kind
return syncItSlowly(unmatchedNamespaces, 1, func(namespace string) *UnexpectedError {
ns := &corev1.Namespace{}
getNSErr := r.Client.Get(context.TODO(), types.NamespacedName{Name: namespace}, ns)
if errors.IsNotFound(getNSErr) || (getNSErr == nil && ns.DeletionTimestamp != nil) {
return nil
}

// 1. try to fetch existing old resource
oldResource := &unstructured.Unstructured{}
oldResource.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
Expand All @@ -270,11 +319,11 @@ func (r *ReconcileResourceDistribution) cleanResource(distributor *appsv1alpha1.
}
}

// 2. just return if the owner of the oldResource is not this distributor
annotations := oldResource.GetAnnotations()
if annotations == nil || annotations[utils.SourceResourceDistributionOfResource] != distributor.Name {
// 2. if the owner of the oldResource is not this distributor, just return
if !isControlledByDistributor(oldResource, distributor) {
return nil
}

// 3. else clean the resource
if deleteErr := r.Client.Delete(context.TODO(), oldResource); deleteErr != nil && !errors.IsNotFound(deleteErr) {
klog.Errorf("Error occurred when deleting resource in namespace %s from client, err: %v, name: %s", namespace, deleteErr, distributor.Name)
Expand All @@ -290,7 +339,7 @@ func (r *ReconcileResourceDistribution) cleanResource(distributor *appsv1alpha1.
}

// handlerErrors process all errors about resource distribution and clean, and record them to conditions
func (r *ReconcileResourceDistribution) handleErrors(distributorName string, errLists ...[]*UnexpectedError) ([]appsv1alpha1.ResourceDistributionCondition, field.ErrorList) {
func (r *ReconcileResourceDistribution) handleErrors(errLists ...[]*UnexpectedError) ([]appsv1alpha1.ResourceDistributionCondition, field.ErrorList) {
// init a status.conditions
conditions := make([]appsv1alpha1.ResourceDistributionCondition, NumberOfConditionTypes)
initConditionType(conditions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ func TestDoReconcile(t *testing.T) {
t.Fatalf("failed to get resource(%s.%s) from fake client, err %v", namespace, "test-secret-1", err)
}
// check resource source and version
if distributor.Name != resource.Annotations[utils.SourceResourceDistributionOfResource] ||
resourceVersion != resource.Annotations[utils.ResourceHashCodeAnnotation] {
if !isControlledByDistributor(resource, distributor) {
t.Fatalf("failed to sync resource(%s) for namespace (%s), owner set error", resource.Name, namespace)
}
if resourceVersion != resource.Annotations[utils.ResourceHashCodeAnnotation] {
t.Fatalf("failed to sync resource(%s) for namespace (%s) from %s, \nexpected version:%x \nactual version:%x", resource.Name, namespace, resource.Annotations[utils.SourceResourceDistributionOfResource], resourceVersion, resource.Annotations[utils.ResourceHashCodeAnnotation])
}
}
Expand Down Expand Up @@ -112,7 +114,13 @@ func buildResourceDistributionWithSecret() *appsv1alpha1.ResourceDistribution {

func buildResourceDistribution(raw runtime.RawExtension) *appsv1alpha1.ResourceDistribution {
return &appsv1alpha1.ResourceDistribution{
ObjectMeta: metav1.ObjectMeta{Name: "test-resource-distribution"},
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1alpha1.GroupVersion.String(),
Kind: "ResourceDistribution",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-resource-distribution",
},
Spec: appsv1alpha1.ResourceDistributionSpec{
Resource: raw,
Targets: appsv1alpha1.ResourceDistributionTargets{
Expand Down Expand Up @@ -147,6 +155,7 @@ func buildResourceDistribution(raw runtime.RawExtension) *appsv1alpha1.ResourceD
}

func makeEnvironment() []runtime.Object {
distributor := buildResourceDistributionWithSecret()
return []runtime.Object{
&corev1.Namespace{ // for create
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -193,21 +202,16 @@ func makeEnvironment() []runtime.Object {
},
},
},
&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
Labels: map[string]string{
"group": "two",
"environment": "test",
},
},
},
&corev1.Secret{ // for update
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret-1",
Namespace: "ns-1",
Annotations: map[string]string{
utils.SourceResourceDistributionOfResource: "test-resource-distribution",
utils.ResourceHashCodeAnnotation: hashResource(distributor.Spec.Resource),
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(distributor, distributor.GroupVersionKind()),
},
},
},
Expand All @@ -217,6 +221,10 @@ func makeEnvironment() []runtime.Object {
Namespace: "ns-4",
Annotations: map[string]string{
utils.SourceResourceDistributionOfResource: "test-resource-distribution",
utils.ResourceHashCodeAnnotation: hashResource(distributor.Spec.Resource),
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(distributor, distributor.GroupVersionKind()),
},
},
},
Expand Down
Loading