Skip to content

Commit

Permalink
reconnecting the cloudevents client (#280)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey authored Sep 27, 2023
1 parent bf4f47e commit c2b66fa
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 222 deletions.
168 changes: 68 additions & 100 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"open-cluster-management.io/api/cloudevents/generic/options"
Expand All @@ -21,14 +19,12 @@ import (
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
// to the source.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
rateLimiter flowcontrol.RateLimiter
agentID string
clusterName string
*baseClient
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
agentID string
clusterName string
}

// NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to
Expand All @@ -45,8 +41,12 @@ func NewCloudEventAgentClient[T ResourceObject](
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
) (*CloudEventAgentClient[T], error) {
cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx)
if err != nil {
baseClient := &baseClient{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
}

if err := baseClient.connect(ctx); err != nil {
return nil, err
}

Expand All @@ -56,14 +56,12 @@ func NewCloudEventAgentClient[T ResourceObject](
}

return &CloudEventAgentClient[T]{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
rateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
baseClient: baseClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
}, nil
}

Expand Down Expand Up @@ -101,12 +99,7 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
if err := c.publish(ctx, evt); err != nil {
return err
}
}
Expand All @@ -130,12 +123,7 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
if err := c.publish(ctx, *evt); err != nil {
return err
}

Expand All @@ -145,63 +133,67 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
// Subscribe the events that are from the source status resync request or source resource spec request.
// For status resync request, agent publish the current resources status back as response.
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) {
c.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) {
c.receive(ctx, evt, handlers...)
})
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}
func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) {
klog.V(4).Infof("Received event:\n%s", evt)

if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}

if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}

codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}
return
}

obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}
if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
}

action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}
obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
})
}
}

// Upon receiving the status resync event, the agent responds by sending resource status events to the broker as
Expand Down Expand Up @@ -287,30 +279,6 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R
return types.Modified, nil
}

func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter,
sender cloudevents.Client, evt cloudevents.Event) error {
now := time.Now()

err := limiter.Wait(sendingCtx)
if err != nil {
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
}

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
}

klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

return nil
}

func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) {
for _, obj := range objs {
if string(obj.GetUID()) == resourceID {
Expand Down
10 changes: 3 additions & 7 deletions cloudevents/generic/agentclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ func TestStatusResyncResponse(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if err := agent.Subscribe(context.TODO()); err != nil {
t.Errorf("unexpected error %v", err)
}
agent.receive(context.TODO(), c.requestEvent)

c.validate(client.GetSentEvents())
})
Expand Down Expand Up @@ -440,13 +438,11 @@ func TestReceiveResourceSpec(t *testing.T) {

var actualEvent types.ResourceAction
var actualRes *mockResource
if err := agent.Subscribe(context.TODO(), func(event types.ResourceAction, resource *mockResource) error {
agent.receive(context.TODO(), c.requestEvent, func(event types.ResourceAction, resource *mockResource) error {
actualEvent = event
actualRes = resource
return nil
}); err != nil {
t.Errorf("unexpected error %v", err)
}
})

c.validate(actualEvent, actualRes)
})
Expand Down
Loading

0 comments on commit c2b66fa

Please sign in to comment.