Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ add cloudevents clients #259

Merged

Conversation

skeeey
Copy link
Member

@skeeey skeeey commented Aug 10, 2023

No description provided.

@skeeey
Copy link
Member Author

skeeey commented Aug 16, 2023

@skeeey
Copy link
Member Author

skeeey commented Aug 16, 2023

/assign @qiujian16

@skeeey
Copy link
Member Author

skeeey commented Aug 16, 2023

/assign @morvencao

@skeeey
Copy link
Member Author

skeeey commented Aug 16, 2023

/cc @clyang82 @yanmxa

@openshift-ci openshift-ci bot requested review from clyang82 and yanmxa August 16, 2023 01:20
@skeeey skeeey changed the title WIP: add cloudevents clients add cloudevents clients Aug 16, 2023

cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/cloudevents/sdk-go/v2/event"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloudevents.Event is alias of event.Event: https://github.com/cloudevents/sdk-go/blob/main/v2/alias.go#L29
Can we just use cloudevents.Event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using cloudevents.Event instead

}

for _, evt := range c.receivedEvents {
receiver(context.TODO(), evt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intent to use new context, instead of ctx?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, it should not use a new context

return err
}

resources := &payload.ResourceVersionList{Versions: []payload.ResourceVersion{}}
Copy link
Member

@morvencao morvencao Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we pre-allocate the ResourceVersion slice with length of objects?

resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, 0, len(objs))}

this can optimize memory usage, when there are two many objects, to avoid slice resize.
this also can apply to status hashes construction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Decode(event *cloudevents.Event) (T, error)
}

type StatusHashGetter[T ResourceObject] interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be a func instead of a struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using a func instead

subClient, pubClient cloudevents.Client,
lister Lister[T],
statusHashGetter StatusHashGetter[T],
agentID, clusterName string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs explanation on the input var. e.g. what does agentId, clusterName means. It seems to me agentId is actually source. We should not assume the client can only be used by agent. Also what if client is use by the consumer on server, it should not set clusterName when init the client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only for agent, description is added

}

// Resync the resources spec of the current agent by sending a spec resync request to sources
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, eventGVR types.GroupVersionResource) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why resync spec is public but resync status is private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resync status is used to handle the resync status request, it's a internal logical handler, the outside client doesn't call it, rename the func to respondResyncStatusRequest to avoid confusion

evt.SetSource(c.agentID)
evt.SetExtension(types.ExtensionClusterName, c.clusterName)

objs, err := c.lister.List(types.ListOptions{Source: types.SourceAll})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who will sent this request? I am not quite clear on why here is sourceall but we do not set cluster either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the agent will sent this, the agent need to resync its resource spec from all sources when it starts

"open-cluster-management.io/api/client/cloudevents/types"
)

type CloudEventSourceClient[T ResourceObject] struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment please. I think what you mean is this client is used by server but not agent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is, the source and agent have different new func now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need a comment on what CloudEventSourceClient indicates.

sourceID string
}

func NewCloudEventSourceClient[T ResourceObject](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments on all input var

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there seems to be something missing, e.g how to setup the sub/pub client for agent and source client, will it be same context or they need to use the different context, who takes the responsibility to set the context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to introduce a CloudEventsClient interface to avoid using the cloudevents clients and context directly, please take a look

return types.StatusModified, nil
}

func (c *CloudEventSourceClient[T]) getObj(resourceID string, objs []T) (obj T, exists bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems not need to be in the struct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed from the struct

Action: types.ResyncRequestAction,
}

evt := cloudevents.NewEvent()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should have a wrapper of NewEvent with generated uuid and time, and type/source as input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a event builder for this

"strings"
)

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add doc for all const.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}

// GroupVersionResource identifies the type of the resource included in the cloud event payload.
type GroupVersionResource struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot use schema.GroupVersionResource directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the schema.GroupVersionResource

Copy link
Member

@morvencao morvencao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks.

// - cloudEventContextGetter gets a cloudevents context when sending a event, different protocol should have its own
// context.
// - cloudEventsSubClient is a cloudevents client, it is used to receive the event from a broker.
// - cloudEventsPubClient is a cloudevents client, it is used to send the event from a broker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - cloudEventsPubClient is a cloudevents client, it is used to send the event from a broker.
// - cloudEventsPubClient is a cloudevents client, it is used to send the event to a broker.

"open-cluster-management.io/api/client/cloudevents/types"
)

type CloudEventSourceClient[T ResourceObject] struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need a comment on what CloudEventSourceClient indicates.

return nil
}

// Publish a resource spec from a source to a spec.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Publish a resource spec from a source to a spec.
// Publish a resource spec from a source to an agent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to know which agent to send?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the event includes the cluster name extension, when we send the event, the CloudEventsClient will get the cluster name to determine the event will be sent to which cluster, e.g. for mqtt client, it use the cluster name to create the topic

// Subscribe the events that are from the agent spec resync request or agent resource status request.
// For spec resync request, source publish the current resources spec back as response.
// For resource status request, source receives resource status and handles the status with resource handlers.
func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to set the sourceid in the context or it will be don implicitly by the client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't, the sourceid will be get from the event

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a client for a certain source should only subscribe to a certain source, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for a source, it only subscribe its own resource, for mqtt, it limits by the topic, e.g. /sources/maestro/clusters/+/status

sourceID string
}

func NewCloudEventSourceClient[T ResourceObject](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there seems to be something missing, e.g how to setup the sub/pub client for agent and source client, will it be same context or they need to use the different context, who takes the responsibility to set the context?

@skeeey skeeey force-pushed the cloudevent branch 2 times, most recently from 689236b to b584b04 Compare August 22, 2023 03:41
@skeeey skeeey changed the title add cloudevents clients ✨ add cloudevents clients Aug 22, 2023
Copy link
Member

@qiujian16 qiujian16 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should also add a README in the client/cloudevents to describe how the client can be used.

//
// Available implementations:
// - MQTT
type CloudEventsClient interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking the interface should be like

type CloudEventsClient interface {
    Sender() cloudevents.Client
    Subscriber() cloudevents.Client
    WithContext(ctx context.Context, opts) context.Context
}

and the mqtt.options returns the interface with

options.Source()
options.Agent()

Copy link
Member

@yanmxa yanmxa Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// wording
Sender -> Receiver
Publisher -> Subscriber

// dynamic parameters?
WithContext(ctx context.Context, opts ...interface{}) context.Context

Copy link
Member Author

@skeeey skeeey Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use CloudEventsSourceOptions and CloudEventsAgentOptions instead of this interface when building a cloudeventsclient

Signed-off-by: Wei Liu <liuweixa@redhat.com>
Copy link
Member

@qiujian16 qiujian16 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/approve
/lgtm
/hold

I think it is a good start. I am still thinking if statusHashGetter should be in the interface of codec, but we can decide this later. Would @morvencao and @yanmxa to review also? if there is no additional comment, feel free to unhold

@openshift-ci
Copy link
Contributor

openshift-ci bot commented Aug 25, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: qiujian16, skeeey

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@skeeey
Copy link
Member Author

skeeey commented Aug 25, 2023

I think should also add a README in the client/cloudevents to describe how the client can be used.

I will open a new pr to add the README after we introduced the work part

@yanmxa
Copy link
Member

yanmxa commented Aug 25, 2023

Thanks!
Looks good to me.

@skeeey
Copy link
Member Author

skeeey commented Aug 25, 2023

/unhold

@openshift-merge-robot openshift-merge-robot merged commit e598981 into open-cluster-management-io:main Aug 25, 2023
10 checks passed
@skeeey skeeey deleted the cloudevent branch September 4, 2023 06:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants