From 98f6939990be3e81d19cd59a329788f5a12bea98 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 12 Aug 2024 11:10:41 -0400 Subject: [PATCH 1/3] feat: added delivery format to trigger crd Signed-off-by: Calum Murray --- config/core/resources/trigger.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config/core/resources/trigger.yaml b/config/core/resources/trigger.yaml index dd199f9e011..712d3cd576b 100644 --- a/config/core/resources/trigger.yaml +++ b/config/core/resources/trigger.yaml @@ -117,6 +117,9 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + format: + description: Format is the format used to serialize the event into a http request when delivering the event. It can be json (for structured events), binary (for binary events), or unset. + type: string filter: description: 'Filter is the filter to apply against all events from the Broker. Only events that pass this filter will be sent to the Subscriber. If not specified, will default to allowing all events.' type: object From c604bad003adf7dff1f4de6e6ecc42852dc3ba6f Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 12 Aug 2024 11:10:56 -0400 Subject: [PATCH 2/3] feat: support delivery format in filter handler Signed-off-by: Calum Murray --- pkg/broker/filter/filter_handler.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 8ea2565ebfa..4154e095f9a 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -402,10 +402,15 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger Audience: trigger.Status.SubscriberAudience, } - h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl) + sendOptions := []kncloudevents.SendOption{} + if trigger.Spec.Delivery != nil && trigger.Spec.Delivery.Format != nil { + sendOptions = append(sendOptions, kncloudevents.WithEventFormat(trigger.Spec.Delivery.Format)) + } + + h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl, sendOptions...) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32, sendOpts ...kncloudevents.SendOption) { additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) @@ -433,6 +438,10 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers })) } + if len(sendOpts) > 0 { + opts = append(opts, sendOpts...) + } + dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, opts...) if err != nil { h.logger.Error("failed to send event", zap.Error(err)) From 4eba0b53f16d61dcbdae083712aee13036e11e05 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 19 Aug 2024 10:48:56 -0400 Subject: [PATCH 3/3] test: add e2e test to verify trigger delivers events with the correct format Signed-off-by: Calum Murray --- test/rekt/features/trigger/feature.go | 34 ++++++++++++++++++++++++ test/rekt/resources/delivery/delivery.go | 11 ++++++++ test/rekt/resources/trigger/trigger.go | 3 +++ test/rekt/resources/trigger/trigger.yaml | 3 +++ test/rekt/trigger_test.go | 12 +++++++++ 5 files changed, 63 insertions(+) diff --git a/test/rekt/features/trigger/feature.go b/test/rekt/features/trigger/feature.go index ab8954ad57a..0d26de3ed51 100644 --- a/test/rekt/features/trigger/feature.go +++ b/test/rekt/features/trigger/feature.go @@ -18,6 +18,7 @@ package trigger import ( "context" + "fmt" "github.com/cloudevents/sdk-go/v2/test" "k8s.io/utils/pointer" @@ -96,6 +97,39 @@ func TriggerDependencyAnnotation() *feature.Feature { return f } +func TriggerSupportsDeliveryFormat() *feature.FeatureSet { + return &feature.FeatureSet{ + Name: "Trigger supports delivery format", + Features: []*feature.Feature{triggerWithDispatcherFormat("json"), triggerWithDispatcherFormat("binary")}, + } +} + +func triggerWithDispatcherFormat(format string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("Trigger supports sending with %s delivery format", format)) + + brokerName := feature.MakeRandomK8sName("broker") + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + triggerName := feature.MakeRandomK8sName("trigger") + eventToSend := test.FullEvent() + + f.Setup("Install Broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Setup("Broker is ready", broker.IsReady(brokerName)) + f.Setup("Broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("Install Sink", eventshub.Install(sinkName, eventshub.VerifyEventFormat(format), eventshub.StartReceiver)) + + f.Setup("Install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithFormat(format), trigger.WithSubscriber(service.AsKReference(sinkName), ""))) + f.Setup("Trigger is ready", trigger.IsReady(triggerName)) + + f.Requirement("Install source", eventshub.Install(sourceName, eventshub.InputEvent(eventToSend), eventshub.StartSenderToResource(broker.GVR(), brokerName))) + + f.Alpha("trigger"). + Must("dispatch event with correct format", assert.OnStore(sinkName).MatchReceivedEvent(test.HasId(eventToSend.ID())).AtLeast(1)) + + return f +} + func TriggerWithTLSSubscriber() *feature.Feature { f := feature.NewFeatureNamed("Trigger with TLS subscriber") diff --git a/test/rekt/resources/delivery/delivery.go b/test/rekt/resources/delivery/delivery.go index c94a0c5ebd4..6faaa5259e3 100644 --- a/test/rekt/resources/delivery/delivery.go +++ b/test/rekt/resources/delivery/delivery.go @@ -117,6 +117,17 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff } } +func WithFormat(format string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["delivery"]; !set { + cfg["delivery"] = map[string]interface{}{} + } + delivery := cfg["delivery"].(map[string]interface{}) + + delivery["format"] = format + } +} + // WithTimeout adds the timeout related config to the config. func WithTimeout(timeout string) manifest.CfgFn { return func(cfg map[string]interface{}) { diff --git a/test/rekt/resources/trigger/trigger.go b/test/rekt/resources/trigger/trigger.go index 1d64fb305b8..78f3ba62e85 100644 --- a/test/rekt/resources/trigger/trigger.go +++ b/test/rekt/resources/trigger/trigger.go @@ -196,6 +196,9 @@ var WithRetry = delivery.WithRetry // WithTimeout adds the timeout related config to the config. var WithTimeout = delivery.WithTimeout +// WithFormat adds the format related config to a Trigger spec +var WithFormat = delivery.WithFormat + // Install will create a Trigger resource, augmented with the config fn options. func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ diff --git a/test/rekt/resources/trigger/trigger.yaml b/test/rekt/resources/trigger/trigger.yaml index 9a27dcdf68b..b7825ffb5df 100644 --- a/test/rekt/resources/trigger/trigger.yaml +++ b/test/rekt/resources/trigger/trigger.yaml @@ -96,4 +96,7 @@ spec: {{ if .delivery.backoffDelay }} backoffDelay: "{{ .delivery.backoffDelay}}" {{ end }} + {{ if .delivery.format }} + format: {{ .delivery.format }} + {{ end }} {{ end }} diff --git a/test/rekt/trigger_test.go b/test/rekt/trigger_test.go index 00121550d9d..0d96fe2b86f 100644 --- a/test/rekt/trigger_test.go +++ b/test/rekt/trigger_test.go @@ -81,6 +81,18 @@ func TestTriggerDependencyAnnotation(t *testing.T) { env.Test(ctx, t, trigger.TriggerDependencyAnnotation()) } +func TestTriggerDeliveryFormat(t *testing.T) { + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.TestSet(ctx, t, trigger.TriggerSupportsDeliveryFormat()) +} + func TestTriggerTLSSubscriber(t *testing.T) { t.Parallel()