Skip to content

Commit

Permalink
add payload for manifestwork
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Aug 29, 2023
1 parent e598981 commit 4494ac9
Show file tree
Hide file tree
Showing 16 changed files with 1,563 additions and 0 deletions.
197 changes: 197 additions & 0 deletions client/cloudevents/resources/manifestwork/codec/agent/manifest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package agent

import (
"fmt"
"strconv"

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
kubetypes "k8s.io/apimachinery/pkg/types"

"open-cluster-management.io/api/client/cloudevents/resources/manifestwork/payload"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/utils/work/v1/utils"
"open-cluster-management.io/api/utils/work/v1/workvalidator"
workv1 "open-cluster-management.io/api/work/v1"
)

// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestCodec struct {
restMapper meta.RESTMapper
}

func NewManifestCodec(restMapper meta.RESTMapper) *ManifestCodec {
return &ManifestCodec{
restMapper: restMapper,
}
}

// EventDataType returns the event data type for `io.open-cluster-management.works.v1alpha1.manifests`.
func (c *ManifestCodec) EventDataType() types.CloudEventsDataType {
return payload.ManifestEventDataType
}

// Encode the status of a ManifestWork to a cloudevent with SingleManifestStatus.
func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, work *workv1.ManifestWork) (*cloudevents.Event, error) {
if eventType.CloudEventsDataType != payload.ManifestEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}

resourceVersion, err := strconv.ParseInt(work.ResourceVersion, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse the resourceversion of the work %s, %v", work.UID, err)
}

originalSource, ok := work.Annotations[types.CloudEventsOriginalSourceAnnotationKey]
if !ok {
return nil, fmt.Errorf("failed to find originalsource from the work %s", work.UID)
}

if len(work.Spec.Workload.Manifests) != 1 {
return nil, fmt.Errorf("too many manifests in the work %s", work.UID)
}

evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
NewEvent()

manifestStatus := &payload.SingleManifestStatus{
Conditions: work.Status.Conditions,
}

if len(work.Status.ResourceStatus.Manifests) != 0 {
resourceStatus := work.Status.ResourceStatus.Manifests[0]
manifestStatus.ResourceMeta = resourceStatus.ResourceMeta
for _, v := range resourceStatus.StatusFeedbacks.Values {
if v.Name == "status" && v.Value.Type == workv1.JsonRaw {
manifestStatus.ManifestStatus = &runtime.RawExtension{
Raw: []byte(*v.Value.JsonRaw),
}

break
}
}
}

if err := evt.SetData(cloudevents.ApplicationJSON, manifestStatus); err != nil {
return nil, fmt.Errorf("failed to encode manifestwork status to a cloudevent: %v", err)
}

return &evt, nil
}

// Decode a cloudevent whose data is SingleManifest to a ManifestWork.
func (c *ManifestCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWork, error) {
eventType, err := types.Parse(evt.Type())
if err != nil {
return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
}

if eventType.CloudEventsDataType != payload.ManifestEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}

evtExtensions := evt.Context.GetExtensions()

resourceID, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionResourceID])
if err != nil {
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
}

resourceVersion, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionResourceVersion])
if err != nil {
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
}

clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName])
if err != nil {
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
}

work := &workv1.ManifestWork{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
UID: kubetypes.UID(resourceID),
ResourceVersion: resourceVersion,
Name: resourceID,
Namespace: clusterName,
Annotations: map[string]string{
types.CloudEventsDataTypeAnnotationKey: eventType.CloudEventsDataType.String(),
types.CloudEventsOriginalSourceAnnotationKey: evt.Source(),
},
},
}

if _, ok := evtExtensions[types.ExtensionDeletionTimestamp]; ok {
deletionTimestamp, err := cloudeventstypes.ToTime(evtExtensions[types.ExtensionDeletionTimestamp])
if err != nil {
return nil, fmt.Errorf("failed to get deletiontimestamp, %v", err)
}

work.DeletionTimestamp = &metav1.Time{Time: deletionTimestamp}
return work, nil
}

manifest := &payload.SingleManifest{}
if err := evt.DataAs(manifest); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}

unstructuredObj := &unstructured.Unstructured{}
if err := unstructuredObj.UnmarshalJSON(manifest.Manifest.Raw); err != nil {
return nil, fmt.Errorf("failed to unmarshal manifest, %v", err)
}

_, gvr, err := utils.BuildResourceMeta(0, unstructuredObj, c.restMapper)
if err != nil {
return nil, fmt.Errorf("failed to get manifest GVR from event %s, %v", string(evt.Data()), err)
}

work.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: []workv1.Manifest{manifest.Manifest},
},
DeleteOption: &workv1.DeleteOption{
PropagationPolicy: workv1.DeletePropagationPolicyTypeForeground,
},
ManifestConfigs: []workv1.ManifestConfigOption{
{
ResourceIdentifier: workv1.ResourceIdentifier{
Group: gvr.Group,
Resource: gvr.Resource,
Name: unstructuredObj.GetName(),
Namespace: unstructuredObj.GetNamespace(),
},
FeedbackRules: []workv1.FeedbackRule{
{
Type: workv1.JSONPathsType,
JsonPaths: []workv1.JsonPath{
{
Name: "status",
Path: ".status",
},
},
},
},
UpdateStrategy: &workv1.UpdateStrategy{
Type: workv1.UpdateStrategyTypeUpdate,
},
},
},
}

// validate the manifest
if err := workvalidator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil {
return nil, fmt.Errorf("manifest is invalid, %v", err)
}

return work, nil
}
Loading

0 comments on commit 4494ac9

Please sign in to comment.