Skip to content

Commit

Permalink
add rate limit for cloudevents clients
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Sep 12, 2023
1 parent 886b462 commit 36f81ed
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 23 deletions.
36 changes: 30 additions & 6 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"open-cluster-management.io/api/cloudevents/generic/options"
Expand All @@ -18,15 +20,14 @@ import (
//
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
// to the source.
//
// TODO support limiting the message sending rate with a configuration.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
rateLimiter flowcontrol.RateLimiter
agentID string
clusterName string
}
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewCloudEventAgentClient[T ResourceObject](
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
rateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
}, nil
Expand Down Expand Up @@ -111,8 +113,8 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return err
}

if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
Expand Down Expand Up @@ -141,8 +143,8 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

if result := c.sender.Send(sendingContext, *evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
Expand Down Expand Up @@ -294,6 +296,28 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R
return types.Modified, nil
}

func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter,
sender cloudevents.Client, evt cloudevents.Event) error {
now := time.Now()

err := limiter.Wait(sendingCtx)
if err != nil {
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
}

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
}

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

return nil
}

func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) {
for _, obj := range objs {
if string(obj.GetUID()) == resourceID {
Expand Down
17 changes: 17 additions & 0 deletions cloudevents/generic/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type CloudEventsOptions interface {
Receiver(ctx context.Context) (cloudevents.Client, error)
}

// EventRateLimit for limiting the event sending rate.
type EventRateLimit struct {
// QPS indicates the maximum QPS to send the event.
// If it's less than or equal to zero, the DefaultQPS (50) will be used.
QPS float32

// Maximum burst for throttle.
// If it's less than or equal to zero, the DefaultBurst (100) will be used.
Burst int
}

// CloudEventsSourceOptions provides the required options to build a source CloudEventsClient
type CloudEventsSourceOptions struct {
// CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol.
Expand All @@ -32,6 +43,9 @@ type CloudEventsSourceOptions struct {
// URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a unique
// ID in the associated database for its source identification.
SourceID string

// EventRateLimit limits the event sending rate.
EventRateLimit EventRateLimit
}

// CloudEventsAgentOptions provides the required options to build an agent CloudEventsClient
Expand All @@ -45,4 +59,7 @@ type CloudEventsAgentOptions struct {

// ClusterName is the name of a managed cluster on which the agent runs.
ClusterName string

// EventRateLimit limits the event sending rate.
EventRateLimit EventRateLimit
}
34 changes: 34 additions & 0 deletions cloudevents/generic/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package generic

import (
"time"

"k8s.io/client-go/util/flowcontrol"

"open-cluster-management.io/api/cloudevents/generic/options"
)

// longThrottleLatency defines threshold for logging requests. All requests being
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
const longThrottleLatency = 1 * time.Second

const (
// TODO we may adjust these after performance test
DefaultQPS float32 = 50.0
DefaultBurst int = 100
)

func NewRateLimiter(limit options.EventRateLimit) flowcontrol.RateLimiter {
qps := limit.QPS
if qps <= 0.0 {
qps = DefaultQPS
}

burst := limit.Burst
if burst <= 0 {
burst = DefaultBurst
}

return flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
22 changes: 14 additions & 8 deletions cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"open-cluster-management.io/api/cloudevents/generic/options"
Expand All @@ -19,15 +20,14 @@ import (
//
// A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service
// handling resource requests.
//
// TODO support limiting the message sending rate with a configuration.
type CloudEventSourceClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
receiver cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
rateLimiter flowcontrol.RateLimiter
sourceID string
}

Expand Down Expand Up @@ -67,6 +67,7 @@ func NewCloudEventSourceClient[T ResourceObject](
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
rateLimiter: NewRateLimiter(sourceOptions.EventRateLimit),
sourceID: sourceOptions.SourceID,
}, nil
}
Expand Down Expand Up @@ -105,8 +106,13 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

if result := c.sender.Send(ctx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send: %v", result)
sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
return err
}

klog.V(4).Infof("Sent resync request:\n%s", evt)
Expand Down Expand Up @@ -135,8 +141,8 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
return err
}

if result := c.sender.Send(sendingContext, *evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
return err
}

klog.V(4).Infof("Sent event:\n%s", evt)
Expand Down Expand Up @@ -280,8 +286,8 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
return err
}

if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
return err
}
}

Expand Down
5 changes: 4 additions & 1 deletion test/integration-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ clean-integration-test:

clean: clean-integration-test

test-integration: test-api-integration test-cloudevents-integration
.PHONY: test-integration

test-api-integration: ensure-kubebuilder-tools
go test -c ./test/integration/api
./api.test -ginkgo.slowSpecThreshold=15 -ginkgo.v -ginkgo.failFast
.PHONY: test-integration
.PHONY: test-api-integration

test-cloudevents-integration: ensure-kubebuilder-tools
go test -c ./test/integration/cloudevents
Expand Down
9 changes: 1 addition & 8 deletions test/integration/cloudevents/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ import (
)

var _ = ginkgo.Describe("Cloudevents clients test", func() {
ginkgo.BeforeEach(func() {
ginkgo.By("init resource source store", func() {
source.GetStore().Add(source.NewResource("cluster1", "resource1"))
source.GetStore().Add(source.NewResource("cluster2", "resource1"))
})
})

ginkgo.Context("Resync resources", func() {
ginkgo.It("resync resources between source and agent", func() {
ginkgo.By("start an agent on cluster1")
Expand Down Expand Up @@ -72,7 +65,7 @@ var _ = ginkgo.Describe("Cloudevents clients test", func() {
return nil
}, 10*time.Second, 1*time.Second).Should(gomega.Succeed())

// resync the status from source
ginkgo.By("resync the status from source")
err = sourceCloudEventsClient.Resync(context.TODO())
gomega.Expect(err).ToNot(gomega.HaveOccurred())

Expand Down
5 changes: 5 additions & 0 deletions test/integration/cloudevents/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}()

ginkgo.By("init the resource source store")
source.GetStore().Add(source.NewResource("cluster1", "resource1"))
source.GetStore().Add(source.NewResource("cluster2", "resource1"))

ginkgo.By("start the resource source")
mqttOptions = mqtt.NewMQTTOptions()
mqttOptions.BrokerHost = mqttBrokerHost
sourceCloudEventsClient, err = source.StartResourceSourceClient(context.TODO(), mqttOptions)
Expand Down

0 comments on commit 36f81ed

Please sign in to comment.