From e59e4b1051e7e730796e23eafc6a7e941faceadc Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 15 Sep 2023 09:33:04 +0800 Subject: [PATCH] combine the sender and receiver to one client Signed-off-by: Wei Liu --- cloudevents/generic/agentclient.go | 25 +++++++------------ .../generic/options/fake/fakeoptions.go | 6 +---- .../generic/options/mqtt/agentoptions.go | 17 +++---------- cloudevents/generic/options/mqtt/options.go | 8 +++--- .../generic/options/mqtt/sourceoptions.go | 17 +++---------- cloudevents/generic/options/options.go | 7 ++---- cloudevents/generic/sourceclient.go | 25 ++++++------------- cloudevents/work/agent/client/manifestwork.go | 2 +- cloudevents/work/clientbuilder.go | 1 - 9 files changed, 31 insertions(+), 77 deletions(-) diff --git a/cloudevents/generic/agentclient.go b/cloudevents/generic/agentclient.go index 3e9a8403..412e0bc3 100644 --- a/cloudevents/generic/agentclient.go +++ b/cloudevents/generic/agentclient.go @@ -22,8 +22,7 @@ import ( // to the source. type CloudEventAgentClient[T ResourceObject] struct { cloudEventsOptions options.CloudEventsOptions - sender cloudevents.Client - receiver cloudevents.Client + cloudEventsClient cloudevents.Client lister Lister[T] codecs map[types.CloudEventsDataType]Codec[T] statusHashGetter StatusHashGetter[T] @@ -46,12 +45,7 @@ func NewCloudEventAgentClient[T ResourceObject]( statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventAgentClient[T], error) { - sender, err := agentOptions.CloudEventsOptions.Sender(ctx) - if err != nil { - return nil, err - } - - receiver, err := agentOptions.CloudEventsOptions.Receiver(ctx) + cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx) if err != nil { return nil, err } @@ -63,8 +57,7 @@ func NewCloudEventAgentClient[T ResourceObject]( return &CloudEventAgentClient[T]{ cloudEventsOptions: agentOptions.CloudEventsOptions, - sender: sender, - receiver: receiver, + cloudEventsClient: cloudEventsClient, lister: lister, codecs: evtCodes, statusHashGetter: statusHashGetter, @@ -113,12 +106,11 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error { return err } - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil { + if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { return err } - - klog.V(4).Infof("Sent resync request:\n%s", evt) } + return nil } @@ -143,11 +135,10 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types. return err } - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil { + if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil { return err } - klog.V(4).Infof("Sent event:\n%s", evt) return nil } @@ -155,7 +146,7 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types. // For status resync request, agent publish the current resources status back as response. // For resource spec request, agent receives resource spec and handles the spec with resource handlers. func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error { - return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) { + return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) { klog.V(4).Infof("Received event:\n%s", evt) eventType, err := types.ParseCloudEventsType(evt.Type()) @@ -311,6 +302,8 @@ func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimi latency, evt)) } + klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt) + if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) { return fmt.Errorf("failed to send event %s, %v", evt, result) } diff --git a/cloudevents/generic/options/fake/fakeoptions.go b/cloudevents/generic/options/fake/fakeoptions.go index 900b4c87..0d9574f5 100644 --- a/cloudevents/generic/options/fake/fakeoptions.go +++ b/cloudevents/generic/options/fake/fakeoptions.go @@ -34,11 +34,7 @@ func (o *CloudEventsFakeOptions) WithContext(ctx context.Context, evtCtx cloudev return ctx, nil } -func (o *CloudEventsFakeOptions) Sender(ctx context.Context) (cloudevents.Client, error) { - return o.client, nil -} - -func (o *CloudEventsFakeOptions) Receiver(ctx context.Context) (cloudevents.Client, error) { +func (o *CloudEventsFakeOptions) Client(ctx context.Context) (cloudevents.Client, error) { return o.client, nil } diff --git a/cloudevents/generic/options/mqtt/agentoptions.go b/cloudevents/generic/options/mqtt/agentoptions.go index bd6aa206..1525ff68 100644 --- a/cloudevents/generic/options/mqtt/agentoptions.go +++ b/cloudevents/generic/options/mqtt/agentoptions.go @@ -55,22 +55,11 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return cloudeventscontext.WithTopic(ctx, statusTopic), nil } -func (o *mqttAgentOptions) Sender(ctx context.Context) (cloudevents.Client, error) { - sender, err := o.GetCloudEventsClient( - ctx, - fmt.Sprintf("%s-pub-client", o.agentID), - cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), - ) - if err != nil { - return nil, err - } - return sender, nil -} - -func (o *mqttAgentOptions) Receiver(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { receiver, err := o.GetCloudEventsClient( ctx, - fmt.Sprintf("%s-sub-client", o.agentID), + fmt.Sprintf("%s-client", o.agentID), + cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe( &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ diff --git a/cloudevents/generic/options/mqtt/options.go b/cloudevents/generic/options/mqtt/options.go index 27815942..e79e0af6 100644 --- a/cloudevents/generic/options/mqtt/options.go +++ b/cloudevents/generic/options/mqtt/options.go @@ -124,8 +124,7 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect { func (o *MQTTOptions) GetCloudEventsClient( ctx context.Context, clientID string, - clientOpt cloudeventsmqtt.Option, -) (cloudevents.Client, error) { + clientOpts ...cloudeventsmqtt.Option) (cloudevents.Client, error) { netConn, err := o.GetNetConn() if err != nil { return nil, err @@ -136,8 +135,9 @@ func (o *MQTTOptions) GetCloudEventsClient( Conn: netConn, } - connectOpt := cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID)) - protocol, err := cloudeventsmqtt.New(ctx, config, connectOpt, clientOpt) + opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))} + opts = append(opts, clientOpts...) + protocol, err := cloudeventsmqtt.New(ctx, config, opts...) if err != nil { return nil, err } diff --git a/cloudevents/generic/options/mqtt/sourceoptions.go b/cloudevents/generic/options/mqtt/sourceoptions.go index a3c002fd..095c1352 100644 --- a/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/cloudevents/generic/options/mqtt/sourceoptions.go @@ -51,22 +51,11 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return cloudeventscontext.WithTopic(ctx, specTopic), nil } -func (o *mqttSourceOptions) Sender(ctx context.Context) (cloudevents.Client, error) { - sender, err := o.GetCloudEventsClient( - ctx, - fmt.Sprintf("%s-pub-client", o.sourceID), - cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), - ) - if err != nil { - return nil, err - } - return sender, nil -} - -func (o *mqttSourceOptions) Receiver(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { receiver, err := o.GetCloudEventsClient( ctx, - fmt.Sprintf("%s-sub-client", o.sourceID), + fmt.Sprintf("%s-client", o.sourceID), + cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe( &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ diff --git a/cloudevents/generic/options/options.go b/cloudevents/generic/options/options.go index 5dc4c86b..68c9dec4 100644 --- a/cloudevents/generic/options/options.go +++ b/cloudevents/generic/options/options.go @@ -16,11 +16,8 @@ type CloudEventsOptions interface { // the MQTT topic, for Kafka, the context should contain the message key, etc. WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error) - // Sender returns a cloudevents client for sending the cloudevents - Sender(ctx context.Context) (cloudevents.Client, error) - - // Receiver returns a cloudevents client for receiving the cloudevents - Receiver(ctx context.Context) (cloudevents.Client, error) + // Client returns a cloudevents client for sending and receiving cloudevents + Client(ctx context.Context) (cloudevents.Client, error) } // EventRateLimit for limiting the event sending rate. diff --git a/cloudevents/generic/sourceclient.go b/cloudevents/generic/sourceclient.go index 855ba8ff..25f790dd 100644 --- a/cloudevents/generic/sourceclient.go +++ b/cloudevents/generic/sourceclient.go @@ -22,8 +22,7 @@ import ( // handling resource requests. type CloudEventSourceClient[T ResourceObject] struct { cloudEventsOptions options.CloudEventsOptions - sender cloudevents.Client - receiver cloudevents.Client + cloudEventsClient cloudevents.Client lister Lister[T] codecs map[types.CloudEventsDataType]Codec[T] statusHashGetter StatusHashGetter[T] @@ -45,12 +44,7 @@ func NewCloudEventSourceClient[T ResourceObject]( statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventSourceClient[T], error) { - sender, err := sourceOptions.CloudEventsOptions.Sender(ctx) - if err != nil { - return nil, err - } - - receiver, err := sourceOptions.CloudEventsOptions.Receiver(ctx) + cloudEventsClient, err := sourceOptions.CloudEventsOptions.Client(ctx) if err != nil { return nil, err } @@ -62,8 +56,7 @@ func NewCloudEventSourceClient[T ResourceObject]( return &CloudEventSourceClient[T]{ cloudEventsOptions: sourceOptions.CloudEventsOptions, - sender: sender, - receiver: receiver, + cloudEventsClient: cloudEventsClient, lister: lister, codecs: evtCodes, statusHashGetter: statusHashGetter, @@ -111,12 +104,11 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error { return err } - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil { + if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { return err } - - klog.V(4).Infof("Sent resync request:\n%s", evt) } + return nil } @@ -141,11 +133,10 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types return err } - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil { + if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil { return err } - klog.V(4).Infof("Sent event:\n%s", evt) return nil } @@ -153,7 +144,7 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types // For spec resync request, source publish the current resources spec back as response. // For resource status request, source receives resource status and handles the status with resource handlers. func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error { - if err := c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) { + if err := c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) { klog.V(4).Infof("Received event:\n%s", evt) eventType, err := types.ParseCloudEventsType(evt.Type()) @@ -286,7 +277,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest( return err } - if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil { + if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil { return err } } diff --git a/cloudevents/work/agent/client/manifestwork.go b/cloudevents/work/agent/client/manifestwork.go index 2b797965..54261d88 100644 --- a/cloudevents/work/agent/client/manifestwork.go +++ b/cloudevents/work/agent/client/manifestwork.go @@ -79,7 +79,7 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { klog.V(4).Infof("sync manifestworks") - // send resync request to fetch manifestworks from source when the ManifestWorkInformer status + // send resync request to fetch manifestworks from source when the ManifestWorkInformer starts if err := c.cloudEventsClient.Resync(ctx); err != nil { return nil, err } diff --git a/cloudevents/work/clientbuilder.go b/cloudevents/work/clientbuilder.go index 28014ae2..ca8a6a5a 100644 --- a/cloudevents/work/clientbuilder.go +++ b/cloudevents/work/clientbuilder.go @@ -147,7 +147,6 @@ func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt. go func() { err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) if err != nil { - // TODO (skeeey) consider how to retry to connect the broker again klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err) } }()