Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ add payload for manifestwork #269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cloudevents
package generic

import (
"context"
Expand All @@ -9,9 +9,9 @@ import (

"k8s.io/klog/v2"

"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/payload"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/payload"
"open-cluster-management.io/api/cloudevents/generic/types"
)

// CloudEventAgentClient is a client for an agent to resync/send/receive its resources with cloud events.
Expand Down Expand Up @@ -71,7 +71,7 @@ func NewCloudEventAgentClient[T ResourceObject](
}

// Resync the resources spec by sending a spec resync request from an agent to all sources.
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, eventDataType types.CloudEventsDataType) error {
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
// list the resource objects that are maintained by the current agent from all sources
objs, err := c.lister.List(types.ListOptions{ClusterName: c.clusterName, Source: types.SourceAll})
if err != nil {
Expand All @@ -91,27 +91,30 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, eventDataType typ
}
}

eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
// only resync the resources whose event data type is registered
for eventDataType := range c.codecs {
eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}

evt := types.NewEventBuilder(c.agentID, eventType).WithClusterName(c.clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
evt := types.NewEventBuilder(c.agentID, eventType).WithClusterName(c.clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}
sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}
if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
klog.V(4).Infof("Sent resync request:\n%s", evt)
}
return nil
}

Expand Down Expand Up @@ -151,7 +154,7 @@ func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...Re
return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.Parse(evt.Type())
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cloudevents
package generic

import (
"context"
Expand All @@ -10,11 +10,11 @@ import (
"github.com/google/uuid"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"

"open-cluster-management.io/api/client/cloudevents/options/fake"
"open-cluster-management.io/api/client/cloudevents/payload"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options/fake"
"open-cluster-management.io/api/cloudevents/generic/payload"
"open-cluster-management.io/api/cloudevents/generic/types"
)

const testAgentName = "mock-agent"
Expand Down Expand Up @@ -44,8 +44,8 @@ func TestAgentResync(t *testing.T) {
name: "has cached resources",
clusterName: "cluster2",
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2"},
{UID: apitypes.UID("test2"), ResourceVersion: "3"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3"},
},
eventType: types.CloudEventsType{SubResource: types.SubResourceSpec},
expectedItems: 2,
Expand All @@ -62,7 +62,7 @@ func TestAgentResync(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if err := agent.Resync(context.TODO(), mockEventDataType); err != nil {
if err := agent.Resync(context.TODO()); err != nil {
t.Errorf("unexpected error %v", err)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestAgentPublish(t *testing.T) {
name: "publish status",
clusterName: "cluster1",
resources: &mockResource{
UID: apitypes.UID("1234"),
UID: kubetypes.UID("1234"),
ResourceVersion: "2",
Status: "test-status",
Namespace: "cluster1",
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestAgentPublish(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if c.resources.UID != apitypes.UID(fmt.Sprintf("%s", resourceID)) {
if c.resources.UID != kubetypes.UID(fmt.Sprintf("%s", resourceID)) {
t.Errorf("expected %s, but got %v", c.resources.UID, evt.Context)
}

Expand Down Expand Up @@ -217,8 +217,8 @@ func TestStatusResyncResponse(t *testing.T) {
return evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: apitypes.UID("test2"), ResourceVersion: "3", Status: "test2"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3", Status: "test2"},
},
validate: func(pubEvents []cloudevents.Event) {
if len(pubEvents) != 2 {
Expand Down Expand Up @@ -251,9 +251,9 @@ func TestStatusResyncResponse(t *testing.T) {
return evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test0"), ResourceVersion: "2", Status: "test0"},
{UID: apitypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: apitypes.UID("test2"), ResourceVersion: "3", Status: "test2-updated"},
{UID: kubetypes.UID("test0"), ResourceVersion: "2", Status: "test0"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2", Status: "test1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "3", Status: "test2-updated"},
},
validate: func(pubEvents []cloudevents.Event) {
if len(pubEvents) != 1 {
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_create_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "1"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "1"})
return *evt
}(),
validate: func(event types.ResourceAction, resource *mockResource) {
Expand All @@ -358,12 +358,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_update_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "2"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "2"})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "1"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if event != types.Modified {
Expand All @@ -387,12 +387,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_delete_request",
}
now := metav1.Now()
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test2"), ResourceVersion: "2", DeletionTimestamp: &now})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test2"), ResourceVersion: "2", DeletionTimestamp: &now})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "1"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "1"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if event != types.Deleted {
Expand All @@ -413,12 +413,12 @@ func TestReceiveResourceSpec(t *testing.T) {
Action: "test_create_request",
}

evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: apitypes.UID("test1"), ResourceVersion: "2"})
evt, _ := newMockResourceCodec().Encode(testAgentName, eventType, &mockResource{UID: kubetypes.UID("test1"), ResourceVersion: "2"})
return *evt
}(),
resources: []*mockResource{
{UID: apitypes.UID("test1"), ResourceVersion: "2"},
{UID: apitypes.UID("test2"), ResourceVersion: "1"},
{UID: kubetypes.UID("test1"), ResourceVersion: "2"},
{UID: kubetypes.UID("test2"), ResourceVersion: "1"},
},
validate: func(event types.ResourceAction, resource *mockResource) {
if len(event) != 0 {
Expand Down Expand Up @@ -454,15 +454,15 @@ func TestReceiveResourceSpec(t *testing.T) {
}

type mockResource struct {
UID apitypes.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
DeletionTimestamp *metav1.Time `json:"deletionTimestamp,omitempty"`
UID kubetypes.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
DeletionTimestamp *metav1.Time `json:"deletionTimestamp,omitempty"`
Namespace string
Spec string `json:"spec"`
Status string `json:"status"`
}

func (r *mockResource) GetUID() apitypes.UID {
func (r *mockResource) GetUID() kubetypes.UID {
return r.UID
}

Expand Down Expand Up @@ -532,7 +532,7 @@ func (c *mockResourceCodec) Decode(evt *cloudevents.Event) (*mockResource, error
}

res := &mockResource{
UID: apitypes.UID(fmt.Sprintf("%s", resourceID)),
UID: kubetypes.UID(fmt.Sprintf("%s", resourceID)),
ResourceVersion: fmt.Sprintf("%s", resourceVersion),
Status: string(evt.Data()),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package cloudevents
package generic

import (
"context"

cloudevents "github.com/cloudevents/sdk-go/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"

"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/types"
)

// ResourceHandler handles the received resource object.
Expand All @@ -20,7 +20,7 @@ type StatusHashGetter[T ResourceObject] func(obj T) (string, error)
type ResourceObject interface {
// GetUID returns the resource ID of this object. The resource ID represents the unique identifier for this object.
// The source should ensure its uniqueness and consistency.
GetUID() apitypes.UID
GetUID() kubetypes.UID

// GetResourceVersion returns the resource version of this object. The resource version is a required int64 sequence
// number property that must be incremented by the source whenever this resource changes.
Expand Down Expand Up @@ -54,7 +54,7 @@ type Codec[T ResourceObject] interface {

type CloudEventsClient[T ResourceObject] interface {
// Resync the resources of one source/agent by sending resync request.
Resync(ctx context.Context, evtDataType types.CloudEventsDataType) error
Resync(ctx context.Context) error

// Publish the resources spec/status event to the broker.
Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"open-cluster-management.io/api/client/cloudevents/options"

"open-cluster-management.io/api/cloudevents/generic/options"
)

type CloudEventsFakeOptions struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/eclipse/paho.golang/paho"

"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/types"
"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/types"
)

type mqttAgentOptions struct {
Expand All @@ -33,7 +33,7 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
}

func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.Parse(evtCtx.GetType())
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/types"
)

var mockEventDataType = types.CloudEventsDataType{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/eclipse/paho.golang/paho"
"open-cluster-management.io/api/client/cloudevents/options"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/options"
"open-cluster-management.io/api/cloudevents/generic/types"
)

type mqttSourceOptions struct {
Expand All @@ -29,7 +30,7 @@ func NewSourceOptions(mqttOptions *MQTTOptions, sourceID string) *options.CloudE
}

func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.Parse(evtCtx.GetType())
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"open-cluster-management.io/api/client/cloudevents/types"

"open-cluster-management.io/api/cloudevents/generic/types"
)

func TestSourceContext(t *testing.T) {
Expand Down
Loading