Skip to content

Commit

Permalink
add payload for manifestwork
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Aug 30, 2023
1 parent e598981 commit c45a1f1
Show file tree
Hide file tree
Showing 31 changed files with 1,829 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cloudevents
package generic

import (
"context"
Expand All @@ -9,9 +9,9 @@ import (

"k8s.io/klog/v2"

"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/payload"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/payload"
"open-cluster-management.io/api/cloudevents/generic/types"
)

// CloudEventAgentClient is a client for an agent to resync/send/receive its resources with cloud events.
Expand Down Expand Up @@ -71,7 +71,7 @@ func NewCloudEventAgentClient[T ResourceObject](
}

// Resync the resources spec by sending a spec resync request from an agent to all sources.
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, eventDataType types.CloudEventsDataType) error {
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
// list the resource objects that are maintained by the current agent from all sources
objs, err := c.lister.List(types.ListOptions{ClusterName: c.clusterName, Source: types.SourceAll})
if err != nil {
Expand All @@ -91,27 +91,30 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, eventDataType typ
}
}

eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
// only resync the resources whose event data type is registered
for eventDataType := range c.codecs {
eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}

evt := types.NewEventBuilder(c.agentID, eventType).WithClusterName(c.clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
evt := types.NewEventBuilder(c.agentID, eventType).WithClusterName(c.clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}
sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}
if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
klog.V(4).Infof("Sent resync request:\n%s", evt)
}
return nil
}

Expand Down Expand Up @@ -151,7 +154,7 @@ func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...Re
return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.Parse(evt.Type())
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cloudevents
package generic

import (
"context"
Expand All @@ -10,11 +10,11 @@ import (
"github.com/google/uuid"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"

"open-cluster-management.io/api/client/cloudevents/options/fake"
"open-cluster-management.io/api/client/cloudevents/payload"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options/fake"
"open-cluster-management.io/api/cloudevents/generic/payload"
"open-cluster-management.io/api/cloudevents/generic/types"
)

const testAgentName = "mock-agent"
Expand Down Expand Up @@ -44,8 +44,8 @@ func TestAgentResync(t *testing.T) {
name: "has cached resources",
clusterName: "cluster2",
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2"},
{UID: apitypes.UID("test2"), ResourceVersion: "3"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3"},
},
eventType: types.CloudEventsType{SubResource: types.SubResourceSpec},
expectedItems: 2,
Expand All @@ -62,7 +62,7 @@ func TestAgentResync(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if err := agent.Resync(context.TODO(), mockEventDataType); err != nil {
if err := agent.Resync(context.TODO()); err != nil {
t.Errorf("unexpected error %v", err)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestAgentPublish(t *testing.T) {
name: "publish status",
clusterName: "cluster1",
resources: &mockResource{
UID: apitypes.UID("1234"),
UID: kubetypes.UID("1234"),
ResourceVersion: "2",
Status: "test-status",
Namespace: "cluster1",
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestAgentPublish(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if c.resources.UID != apitypes.UID(fmt.Sprintf("%s", resourceID)) {
if c.resources.UID != kubetypes.UID(fmt.Sprintf("%s", resourceID)) {
t.Errorf("expected %s, but got %v", c.resources.UID, evt.Context)
}

Expand Down Expand Up @@ -217,8 +217,8 @@ func TestStatusResyncResponse(t *testing.T) {
return evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: apitypes.UID("test2"), ResourceVersion: "3", Status: "test2"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3", Status: "test2"},
},
validate: func(pubEvents []cloudevents.Event) {
if len(pubEvents) != 2 {
Expand Down Expand Up @@ -251,9 +251,9 @@ func TestStatusResyncResponse(t *testing.T) {
return evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test0"), ResourceVersion: "2", Status: "test0"},
{UID: apitypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: apitypes.UID("test2"), ResourceVersion: "3", Status: "test2-updated"},
{UID: kubetypes.UID("test0"), ResourceVersion: "2", Status: "test0"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3", Status: "test2-updated"},
},
validate: func(pubEvents []cloudevents.Event) {
if len(pubEvents) != 1 {
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_create_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "1"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "1"})
return *evt
}(),
validate: func(event types.ResourceAction, resource *mockResource) {
Expand All @@ -358,12 +358,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_update_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "2"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "2"})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "1"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if event != types.Modified {
Expand All @@ -387,12 +387,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_delete_request",
}
now := metav1.Now()
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test2"), ResourceVersion: "2", DeletionTimestamp: &now})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test2"), ResourceVersion: "2", DeletionTimestamp: &now})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "1"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if event != types.Deleted {
Expand All @@ -413,12 +413,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_create_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "2"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "2"})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if len(event) != 0 {
Expand Down Expand Up @@ -454,15 +454,15 @@ func TestReceiveResourceSpec(t *testing.T) {
}

type mockResource struct {
UID apitypes.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
DeletionTimestamp *metav1.Time `json:"deletionTimestamp,omitempty"`
UID kubetypes.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
DeletionTimestamp *metav1.Time `json:"deletionTimestamp,omitempty"`
Namespace string
Spec string `json:"spec"`
Status string `json:"status"`
}

func (r *mockResource) GetUID() apitypes.UID {
func (r *mockResource) GetUID() kubetypes.UID {
return r.UID
}

Expand Down Expand Up @@ -532,7 +532,7 @@ func (c *mockResourceCodec) Decode(evt *cloudevents.Event) (*mockResource, error
}

res := &mockResource{
UID: apitypes.UID(fmt.Sprintf("%s", resourceID)),
UID: kubetypes.UID(fmt.Sprintf("%s", resourceID)),
ResourceVersion: fmt.Sprintf("%s", resourceVersion),
Status: string(evt.Data()),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package cloudevents
package generic

import (
"context"

cloudevents "github.com/cloudevents/sdk-go/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"

"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/types"
)

// ResourceHandler handles the received resource object.
Expand All @@ -20,7 +20,7 @@ type StatusHashGetter[T ResourceObject] func(obj T) (string, error)
type ResourceObject interface {
// GetUID returns the resource ID of this object. The resource ID represents the unique identifier for this object.
// The source should ensure its uniqueness and consistency.
GetUID() apitypes.UID
GetUID() kubetypes.UID

// GetResourceVersion returns the resource version of this object. The resource version is a required int64 sequence
// number property that must be incremented by the source whenever this resource changes.
Expand Down Expand Up @@ -54,7 +54,7 @@ type Codec[T ResourceObject] interface {

type CloudEventsClient[T ResourceObject] interface {
// Resync the resources of one source/agent by sending resync request.
Resync(ctx context.Context, evtDataType types.CloudEventsDataType) error
Resync(ctx context.Context) error

// Publish the resources spec/status event to the broker.
Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"open-cluster-management.io/api/client/cloudevents/options"

"open-cluster-management.io/api/cloudevents/generic/options"
)

type CloudEventsFakeOptions struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/eclipse/paho.golang/paho"

"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/types"
)

type mqttAgentOptions struct {
Expand All @@ -33,7 +33,7 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
}

func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.Parse(evtCtx.GetType())
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/types"
)

var mockEventDataType = types.CloudEventsDataType{
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/eclipse/paho.golang/paho"
"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/types"
)

type mqttSourceOptions struct {
Expand All @@ -29,7 +30,7 @@ func NewSourceOptions(mqttOptions *MQTTOptions, sourceID string) *options.CloudE
}

func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.Parse(evtCtx.GetType())
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/types"
)

func TestSourceContext(t *testing.T) {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit c45a1f1

Please sign in to comment.