Skip to content

Commit

Permalink
implement patch function in ManifestWorkAgentClient
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Sep 1, 2023
1 parent c7abb65 commit c6f518c
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 71 deletions.
158 changes: 87 additions & 71 deletions cloudevents/work/agent/client/manifestwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
Expand All @@ -15,6 +18,7 @@ import (
"open-cluster-management.io/api/cloudevents/generic"
"open-cluster-management.io/api/cloudevents/generic/types"
"open-cluster-management.io/api/cloudevents/work/agent/codec"
"open-cluster-management.io/api/cloudevents/work/utils"
"open-cluster-management.io/api/cloudevents/work/watcher"
workv1 "open-cluster-management.io/api/work/v1"
)
Expand All @@ -26,117 +30,129 @@ const (
DeleteRequestAction = "delete_request"
)

// ManifestWorksAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by
// ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by
// CloudEventAgentClient.
type ManifestWorksAgentClient struct {
cloudEventsClient generic.CloudEventAgentClient[*workv1.ManifestWork]
type ManifestWorkAgentClient struct {
cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork]
watcher *watcher.ManifestWorkWatcher
lister workv1lister.ManifestWorkNamespaceLister
}

var _ workv1client.ManifestWorkInterface = &ManifestWorksAgentClient{}
var manifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "manifestworks"}

func (c *ManifestWorksAgentClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) {
klog.Fatal("Create function for ManifestWorksAgentClient is unsupported")
return nil, nil
var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{}

func (c *ManifestWorkAgentClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) {
return nil, errors.NewMethodNotSupported(manifestWorkGR, "create")
}

func (c *ManifestWorksAgentClient) Update(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.UpdateOptions) (*workv1.ManifestWork, error) {
// TODO (skeeey) using patch instead
klog.V(4).Infof("updating manifestwork %s", manifestWork.Name)
func (c *ManifestWorkAgentClient) Update(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.UpdateOptions) (*workv1.ManifestWork, error) {
return nil, errors.NewMethodNotSupported(manifestWorkGR, "update")
}

if !manifestWork.DeletionTimestamp.IsZero() && len(manifestWork.Finalizers) == 0 {
// the finalizers of a deleting manifestwork are removed on a managed cluster, marking the manifest work status
// to deleted and send it back to source
meta.SetStatusCondition(&manifestWork.Status.Conditions, metav1.Condition{
Type: ManifestsDeleted,
Status: metav1.ConditionTrue,
Reason: "ManifestsDeleted",
Message: fmt.Sprintf("The manifests are deleted from the cluster %s", manifestWork.Namespace),
})
func (c *ManifestWorkAgentClient) UpdateStatus(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.UpdateOptions) (*workv1.ManifestWork, error) {
return nil, errors.NewMethodNotSupported(manifestWorkGR, "updatestatus")
}

eventDataType, err := types.ParseCloudEventsDataType(manifestWork.Annotations[codec.CloudEventsDataTypeAnnotationKey])
if err != nil {
return nil, err
}
func (c *ManifestWorkAgentClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return errors.NewMethodNotSupported(manifestWorkGR, "delete")
}

eventType := types.CloudEventsType{
CloudEventsDataType: *eventDataType,
SubResource: types.SubResourceStatus,
Action: DeleteRequestAction,
}
func (c *ManifestWorkAgentClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
return errors.NewMethodNotSupported(manifestWorkGR, "deletecollection")
}

if err := c.cloudEventsClient.Publish(ctx, eventType, manifestWork); err != nil {
return nil, err
}
func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*workv1.ManifestWork, error) {
klog.V(4).Infof("getting manifestwork %s", name)
return c.lister.Get(name)
}

// also send the deleted event to delete the manifestwork from the ManifestWorkInformer
c.watcher.Receive(watch.Event{Type: watch.Deleted, Object: manifestWork})
return manifestWork, nil
func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
klog.V(4).Infof("sync manifestworks")
// send resync request to fetch manifestworks from source when the ManifestWorkInformer status
if err := c.cloudEventsClient.Resync(ctx); err != nil {
return nil, err
}

return nil, nil
return &workv1.ManifestWorkList{}, nil
}

func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
// TODO (skeeey) consider resync the manifestworks when the ManifestWorkInformer reconnected
return c.watcher, nil
}

func (c *ManifestWorksAgentClient) UpdateStatus(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.UpdateOptions) (*workv1.ManifestWork, error) {
// TODO (skeeey) using patch instead
klog.V(4).Infof("updating manifestwork %s status", manifestWork.Name)
func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) {
klog.V(4).Infof("patching manifestwork %s", name)

_, err := c.lister.Get(manifestWork.Name)
lastWork, err := c.lister.Get(name)
if err != nil {
return nil, err
}

eventDataType, err := types.ParseCloudEventsDataType(manifestWork.Annotations[codec.CloudEventsDataTypeAnnotationKey])
patchedWork, err := utils.Patch(pt, lastWork, data)
if err != nil {
return nil, err
}

updatedWork := manifestWork.DeepCopy()
eventDataType, err := types.ParseCloudEventsDataType(patchedWork.Annotations[codec.CloudEventsDataTypeAnnotationKey])
if err != nil {
return nil, err
}

eventType := types.CloudEventsType{
CloudEventsDataType: *eventDataType,
SubResource: types.SubResourceStatus,
Action: UpdateRequestAction,
}

if err := c.cloudEventsClient.Publish(ctx, eventType, updatedWork); err != nil {
return nil, err
newWork := patchedWork.DeepCopy()

if isStatusUpdate(subresources) {
eventType.Action = UpdateRequestAction
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
return nil, err
}

// refresh the new work in the ManifestWorkInformer local cache to update its status.
c.watcher.Receive(watch.Event{Type: watch.Modified, Object: newWork})
return newWork, nil
}
return updatedWork, nil
}

func (c *ManifestWorksAgentClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
klog.Fatal("Delete function for ManifestWorksAgentClient is unsupported")
return nil
}
if equality.Semantic.DeepEqual(lastWork.Finalizers, newWork.Finalizers) {
return nil, fmt.Errorf("cannot update the manifestwork %s, only allowed to update manifestwork finalizers or status", newWork.Name)
}

func (c *ManifestWorksAgentClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
klog.Fatal("DeleteCollection function for ManifestWorksAgentClient is unsupported")
return nil
}
// the finalizers of a deleting manifestwork are removed, marking the manifestwork status to deleted and sending
// it back to source
if !newWork.DeletionTimestamp.IsZero() && len(newWork.Finalizers) == 0 {
meta.SetStatusCondition(&newWork.Status.Conditions, metav1.Condition{
Type: ManifestsDeleted,
Status: metav1.ConditionTrue,
Reason: "ManifestsDeleted",
Message: fmt.Sprintf("The manifests are deleted from the cluster %s", newWork.Namespace),
})

func (c *ManifestWorksAgentClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*workv1.ManifestWork, error) {
klog.V(4).Infof("getting manifestwork %s", name)
return c.lister.Get(name)
}
eventType.Action = DeleteRequestAction
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
return nil, err
}

func (c *ManifestWorksAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
klog.V(4).Infof("sync manifestworks")
// send resync request to fetch manifestworks from source when the ManifestWorkInformer status
if err := c.cloudEventsClient.Resync(ctx); err != nil {
return nil, err
// delete the manifestwork from the ManifestWorkInformer local cache.
c.watcher.Receive(watch.Event{Type: watch.Deleted, Object: newWork})
return newWork, nil
}

return &workv1.ManifestWorkList{}, nil
// refresh the new work in the ManifestWorkInformer local cache to update its finalizers.
c.watcher.Receive(watch.Event{Type: watch.Modified, Object: newWork})
return newWork, nil
}

func (mw *ManifestWorksAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
// TODO (skeeey) consider resync the manifestworks when the ManifestWorkInformer reconnected
return mw.watcher, nil
}
func isStatusUpdate(subresources []string) bool {
for _, subresource := range subresources {
if subresource == "status" {
return true
}
}

func (mw *ManifestWorksAgentClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) {
klog.Fatal("Patch function for ManifestWorksAgentClient has not been implemented")
return nil, nil
return false
}
78 changes: 78 additions & 0 deletions cloudevents/work/utils/utiles_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package utils

import (
"encoding/json"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

workv1 "open-cluster-management.io/api/work/v1"
)

func TestPatch(t *testing.T) {
cases := []struct {
name string
patchType types.PatchType
work *workv1.ManifestWork
patch []byte
validate func(t *testing.T, work *workv1.ManifestWork)
}{
{
name: "json patch",
patchType: types.JSONPatchType,
work: &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
},
patch: []byte("[{\"op\":\"replace\",\"path\":\"/metadata/name\",\"value\":\"test1\"}]"),
validate: func(t *testing.T, work *workv1.ManifestWork) {
if work.Name != "test1" {
t.Errorf("unexpected work %v", work)
}
},
},
{
name: "merge patch",
patchType: types.MergePatchType,
work: &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
},
patch: func() []byte {
newWork := &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "test2",
},
}
data, err := json.Marshal(newWork)
if err != nil {
t.Fatal(err)
}
return data
}(),
validate: func(t *testing.T, work *workv1.ManifestWork) {
if work.Name != "test2" {
t.Errorf("unexpected work %v", work)
}
if work.Namespace != "test2" {
t.Errorf("unexpected work %v", work)
}
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
work, err := Patch(c.patchType, c.work, c.patch)
if err != nil {
t.Errorf("unexpected error %v", err)
}

c.validate(t, work)
})
}
}
47 changes: 47 additions & 0 deletions cloudevents/work/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package utils

import (
"encoding/json"
"fmt"

jsonpatch "github.com/evanphx/json-patch"
"k8s.io/apimachinery/pkg/types"
workv1 "open-cluster-management.io/api/work/v1"
)

// Patch applies the patch to a work with the patch type.
func Patch(patchType types.PatchType, work *workv1.ManifestWork, patchData []byte) (*workv1.ManifestWork, error) {
workData, err := json.Marshal(work)
if err != nil {
return nil, err
}

var patchedData []byte
switch patchType {
case types.JSONPatchType:
var patchObj jsonpatch.Patch
patchObj, err = jsonpatch.DecodePatch(patchData)
if err != nil {
return nil, err
}
patchedData, err = patchObj.Apply(workData)
if err != nil {
return nil, err
}

case types.MergePatchType:
patchedData, err = jsonpatch.MergePatch(workData, patchData)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported patch type: %s", patchType)
}

patchedWork := &workv1.ManifestWork{}
if err := json.Unmarshal(patchedData, patchedWork); err != nil {
return nil, err
}

return patchedWork, nil
}

0 comments on commit c6f518c

Please sign in to comment.