Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 combine the sender and receiver to one client #282

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// to the source.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
Expand All @@ -46,12 +45,7 @@ func NewCloudEventAgentClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventAgentClient[T], error) {
sender, err := agentOptions.CloudEventsOptions.Sender(ctx)
if err != nil {
return nil, err
}

receiver, err := agentOptions.CloudEventsOptions.Receiver(ctx)
cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx)
if err != nil {
return nil, err
}
Expand All @@ -63,8 +57,7 @@ func NewCloudEventAgentClient[T ResourceObject](

return &CloudEventAgentClient[T]{
cloudEventsOptions: agentOptions.CloudEventsOptions,
sender: sender,
receiver: receiver,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
Expand Down Expand Up @@ -113,12 +106,11 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
}

return nil
}

Expand All @@ -143,19 +135,18 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
return nil
}

// Subscribe the events that are from the source status resync request or source resource spec request.
// For status resync request, agent publish the current resources status back as response.
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
Expand Down Expand Up @@ -311,6 +302,8 @@ func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimi
latency, evt))
}

klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}
Expand Down
6 changes: 1 addition & 5 deletions cloudevents/generic/options/fake/fakeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ func (o *CloudEventsFakeOptions) WithContext(ctx context.Context, evtCtx cloudev
return ctx, nil
}

func (o *CloudEventsFakeOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
return o.client, nil
}

func (o *CloudEventsFakeOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *CloudEventsFakeOptions) Client(ctx context.Context) (cloudevents.Client, error) {
return o.client, nil
}

Expand Down
17 changes: 3 additions & 14 deletions cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,11 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return cloudeventscontext.WithTopic(ctx, statusTopic), nil
}

func (o *mqttAgentOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
sender, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-pub-client", o.agentID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
)
if err != nil {
return nil, err
}
return sender, nil
}

func (o *mqttAgentOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-sub-client", o.agentID),
fmt.Sprintf("%s-client", o.agentID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
cloudeventsmqtt.WithSubscribe(
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
Expand Down
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect {
func (o *MQTTOptions) GetCloudEventsClient(
ctx context.Context,
clientID string,
clientOpt cloudeventsmqtt.Option,
) (cloudevents.Client, error) {
clientOpts ...cloudeventsmqtt.Option) (cloudevents.Client, error) {
netConn, err := o.GetNetConn()
if err != nil {
return nil, err
Expand All @@ -136,8 +135,9 @@ func (o *MQTTOptions) GetCloudEventsClient(
Conn: netConn,
}

connectOpt := cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))
protocol, err := cloudeventsmqtt.New(ctx, config, connectOpt, clientOpt)
opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))}
opts = append(opts, clientOpts...)
protocol, err := cloudeventsmqtt.New(ctx, config, opts...)
if err != nil {
return nil, err
}
Expand Down
17 changes: 3 additions & 14 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,11 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
return cloudeventscontext.WithTopic(ctx, specTopic), nil
}

func (o *mqttSourceOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
sender, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-pub-client", o.sourceID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
)
if err != nil {
return nil, err
}
return sender, nil
}

func (o *mqttSourceOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-sub-client", o.sourceID),
fmt.Sprintf("%s-client", o.sourceID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
cloudeventsmqtt.WithSubscribe(
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
Expand Down
7 changes: 2 additions & 5 deletions cloudevents/generic/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ type CloudEventsOptions interface {
// the MQTT topic, for Kafka, the context should contain the message key, etc.
WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error)

// Sender returns a cloudevents client for sending the cloudevents
Sender(ctx context.Context) (cloudevents.Client, error)

// Receiver returns a cloudevents client for receiving the cloudevents
Receiver(ctx context.Context) (cloudevents.Client, error)
// Client returns a cloudevents client for sending and receiving cloudevents
Client(ctx context.Context) (cloudevents.Client, error)
}

// EventRateLimit for limiting the event sending rate.
Expand Down
25 changes: 8 additions & 17 deletions cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// handling resource requests.
type CloudEventSourceClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
Expand All @@ -45,12 +44,7 @@ func NewCloudEventSourceClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventSourceClient[T], error) {
sender, err := sourceOptions.CloudEventsOptions.Sender(ctx)
if err != nil {
return nil, err
}

receiver, err := sourceOptions.CloudEventsOptions.Receiver(ctx)
cloudEventsClient, err := sourceOptions.CloudEventsOptions.Client(ctx)
if err != nil {
return nil, err
}
Expand All @@ -62,8 +56,7 @@ func NewCloudEventSourceClient[T ResourceObject](

return &CloudEventSourceClient[T]{
cloudEventsOptions: sourceOptions.CloudEventsOptions,
sender: sender,
receiver: receiver,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
Expand Down Expand Up @@ -111,12 +104,11 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
}

return nil
}

Expand All @@ -141,19 +133,18 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
return nil
}

// Subscribe the events that are from the agent spec resync request or agent resource status request.
// For spec resync request, source publish the current resources spec back as response.
// For resource status request, source receives resource status and handles the status with resource handlers.
func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
if err := c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
if err := c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
Expand Down Expand Up @@ -286,7 +277,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion cloudevents/work/agent/client/manifestwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met

func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
klog.V(4).Infof("sync manifestworks")
// send resync request to fetch manifestworks from source when the ManifestWorkInformer status
// send resync request to fetch manifestworks from source when the ManifestWorkInformer starts
if err := c.cloudEventsClient.Resync(ctx); err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion cloudevents/work/clientbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt.
go func() {
err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher))
if err != nil {
// TODO (skeeey) consider how to retry to connect the broker again
klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err)
}
}()
Expand Down
Loading