Skip to content

Commit

Permalink
combine the sender and receiver to one client (#282)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey authored Sep 19, 2023
1 parent ad02e82 commit d0a4952
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 77 deletions.
25 changes: 9 additions & 16 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// to the source.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
Expand All @@ -46,12 +45,7 @@ func NewCloudEventAgentClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventAgentClient[T], error) {
sender, err := agentOptions.CloudEventsOptions.Sender(ctx)
if err != nil {
return nil, err
}

receiver, err := agentOptions.CloudEventsOptions.Receiver(ctx)
cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx)
if err != nil {
return nil, err
}
Expand All @@ -63,8 +57,7 @@ func NewCloudEventAgentClient[T ResourceObject](

return &CloudEventAgentClient[T]{
cloudEventsOptions: agentOptions.CloudEventsOptions,
sender: sender,
receiver: receiver,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
Expand Down Expand Up @@ -113,12 +106,11 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
}

return nil
}

Expand All @@ -143,19 +135,18 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
return nil
}

// Subscribe the events that are from the source status resync request or source resource spec request.
// For status resync request, agent publish the current resources status back as response.
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
Expand Down Expand Up @@ -311,6 +302,8 @@ func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimi
latency, evt))
}

klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}
Expand Down
6 changes: 1 addition & 5 deletions cloudevents/generic/options/fake/fakeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ func (o *CloudEventsFakeOptions) WithContext(ctx context.Context, evtCtx cloudev
return ctx, nil
}

func (o *CloudEventsFakeOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
return o.client, nil
}

func (o *CloudEventsFakeOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *CloudEventsFakeOptions) Client(ctx context.Context) (cloudevents.Client, error) {
return o.client, nil
}

Expand Down
17 changes: 3 additions & 14 deletions cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,11 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return cloudeventscontext.WithTopic(ctx, statusTopic), nil
}

func (o *mqttAgentOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
sender, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-pub-client", o.agentID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
)
if err != nil {
return nil, err
}
return sender, nil
}

func (o *mqttAgentOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-sub-client", o.agentID),
fmt.Sprintf("%s-client", o.agentID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
cloudeventsmqtt.WithSubscribe(
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
Expand Down
8 changes: 4 additions & 4 deletions cloudevents/generic/options/mqtt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect {
func (o *MQTTOptions) GetCloudEventsClient(
ctx context.Context,
clientID string,
clientOpt cloudeventsmqtt.Option,
) (cloudevents.Client, error) {
clientOpts ...cloudeventsmqtt.Option) (cloudevents.Client, error) {
netConn, err := o.GetNetConn()
if err != nil {
return nil, err
Expand All @@ -136,8 +135,9 @@ func (o *MQTTOptions) GetCloudEventsClient(
Conn: netConn,
}

connectOpt := cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))
protocol, err := cloudeventsmqtt.New(ctx, config, connectOpt, clientOpt)
opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))}
opts = append(opts, clientOpts...)
protocol, err := cloudeventsmqtt.New(ctx, config, opts...)
if err != nil {
return nil, err
}
Expand Down
17 changes: 3 additions & 14 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,11 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
return cloudeventscontext.WithTopic(ctx, specTopic), nil
}

func (o *mqttSourceOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
sender, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-pub-client", o.sourceID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
)
if err != nil {
return nil, err
}
return sender, nil
}

func (o *mqttSourceOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-sub-client", o.sourceID),
fmt.Sprintf("%s-client", o.sourceID),
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
cloudeventsmqtt.WithSubscribe(
&paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
Expand Down
7 changes: 2 additions & 5 deletions cloudevents/generic/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ type CloudEventsOptions interface {
// the MQTT topic, for Kafka, the context should contain the message key, etc.
WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error)

// Sender returns a cloudevents client for sending the cloudevents
Sender(ctx context.Context) (cloudevents.Client, error)

// Receiver returns a cloudevents client for receiving the cloudevents
Receiver(ctx context.Context) (cloudevents.Client, error)
// Client returns a cloudevents client for sending and receiving cloudevents
Client(ctx context.Context) (cloudevents.Client, error)
}

// EventRateLimit for limiting the event sending rate.
Expand Down
25 changes: 8 additions & 17 deletions cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// handling resource requests.
type CloudEventSourceClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
Expand All @@ -45,12 +44,7 @@ func NewCloudEventSourceClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventSourceClient[T], error) {
sender, err := sourceOptions.CloudEventsOptions.Sender(ctx)
if err != nil {
return nil, err
}

receiver, err := sourceOptions.CloudEventsOptions.Receiver(ctx)
cloudEventsClient, err := sourceOptions.CloudEventsOptions.Client(ctx)
if err != nil {
return nil, err
}
Expand All @@ -62,8 +56,7 @@ func NewCloudEventSourceClient[T ResourceObject](

return &CloudEventSourceClient[T]{
cloudEventsOptions: sourceOptions.CloudEventsOptions,
sender: sender,
receiver: receiver,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
Expand Down Expand Up @@ -111,12 +104,11 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
}

return nil
}

Expand All @@ -141,19 +133,18 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
return nil
}

// Subscribe the events that are from the agent spec resync request or agent resource status request.
// For spec resync request, source publish the current resources spec back as response.
// For resource status request, source receives resource status and handles the status with resource handlers.
func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
if err := c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
if err := c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
Expand Down Expand Up @@ -286,7 +277,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion cloudevents/work/agent/client/manifestwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met

func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
klog.V(4).Infof("sync manifestworks")
// send resync request to fetch manifestworks from source when the ManifestWorkInformer status
// send resync request to fetch manifestworks from source when the ManifestWorkInformer starts
if err := c.cloudEventsClient.Resync(ctx); err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion cloudevents/work/clientbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt.
go func() {
err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher))
if err != nil {
// TODO (skeeey) consider how to retry to connect the broker again
klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err)
}
}()
Expand Down

0 comments on commit d0a4952

Please sign in to comment.