diff --git a/config/core/resources/trigger.yaml b/config/core/resources/trigger.yaml index dd199f9e011..712d3cd576b 100644 --- a/config/core/resources/trigger.yaml +++ b/config/core/resources/trigger.yaml @@ -117,6 +117,9 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + format: + description: Format is the format used to serialize the event into a http request when delivering the event. It can be json (for structured events), binary (for binary events), or unset. + type: string filter: description: 'Filter is the filter to apply against all events from the Broker. Only events that pass this filter will be sent to the Subscriber. If not specified, will default to allowing all events.' type: object diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 78064a3e417..ef2dfd49da7 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -423,10 +423,15 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger Audience: trigger.Status.SubscriberAudience, } - h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl) + sendOptions := []kncloudevents.SendOption{} + if trigger.Spec.Delivery != nil && trigger.Spec.Delivery.Format != nil { + sendOptions = append(sendOptions, kncloudevents.WithEventFormat(trigger.Spec.Delivery.Format)) + } + + h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl, sendOptions...) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32, sendOpts ...kncloudevents.SendOption) { additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) @@ -454,6 +459,10 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers })) } + if len(sendOpts) > 0 { + opts = append(opts, sendOpts...) + } + dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, opts...) if err != nil { h.logger.Error("failed to send event", zap.Error(err)) diff --git a/test/rekt/features/trigger/feature.go b/test/rekt/features/trigger/feature.go index ab8954ad57a..0d26de3ed51 100644 --- a/test/rekt/features/trigger/feature.go +++ b/test/rekt/features/trigger/feature.go @@ -18,6 +18,7 @@ package trigger import ( "context" + "fmt" "github.com/cloudevents/sdk-go/v2/test" "k8s.io/utils/pointer" @@ -96,6 +97,39 @@ func TriggerDependencyAnnotation() *feature.Feature { return f } +func TriggerSupportsDeliveryFormat() *feature.FeatureSet { + return &feature.FeatureSet{ + Name: "Trigger supports delivery format", + Features: []*feature.Feature{triggerWithDispatcherFormat("json"), triggerWithDispatcherFormat("binary")}, + } +} + +func triggerWithDispatcherFormat(format string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("Trigger supports sending with %s delivery format", format)) + + brokerName := feature.MakeRandomK8sName("broker") + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + triggerName := feature.MakeRandomK8sName("trigger") + eventToSend := test.FullEvent() + + f.Setup("Install Broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Setup("Broker is ready", broker.IsReady(brokerName)) + f.Setup("Broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("Install Sink", eventshub.Install(sinkName, eventshub.VerifyEventFormat(format), eventshub.StartReceiver)) + + f.Setup("Install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithFormat(format), trigger.WithSubscriber(service.AsKReference(sinkName), ""))) + f.Setup("Trigger is ready", trigger.IsReady(triggerName)) + + f.Requirement("Install source", eventshub.Install(sourceName, eventshub.InputEvent(eventToSend), eventshub.StartSenderToResource(broker.GVR(), brokerName))) + + f.Alpha("trigger"). + Must("dispatch event with correct format", assert.OnStore(sinkName).MatchReceivedEvent(test.HasId(eventToSend.ID())).AtLeast(1)) + + return f +} + func TriggerWithTLSSubscriber() *feature.Feature { f := feature.NewFeatureNamed("Trigger with TLS subscriber") diff --git a/test/rekt/resources/delivery/delivery.go b/test/rekt/resources/delivery/delivery.go index c94a0c5ebd4..6faaa5259e3 100644 --- a/test/rekt/resources/delivery/delivery.go +++ b/test/rekt/resources/delivery/delivery.go @@ -117,6 +117,17 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff } } +func WithFormat(format string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["delivery"]; !set { + cfg["delivery"] = map[string]interface{}{} + } + delivery := cfg["delivery"].(map[string]interface{}) + + delivery["format"] = format + } +} + // WithTimeout adds the timeout related config to the config. func WithTimeout(timeout string) manifest.CfgFn { return func(cfg map[string]interface{}) { diff --git a/test/rekt/resources/trigger/trigger.go b/test/rekt/resources/trigger/trigger.go index 1d64fb305b8..78f3ba62e85 100644 --- a/test/rekt/resources/trigger/trigger.go +++ b/test/rekt/resources/trigger/trigger.go @@ -196,6 +196,9 @@ var WithRetry = delivery.WithRetry // WithTimeout adds the timeout related config to the config. var WithTimeout = delivery.WithTimeout +// WithFormat adds the format related config to a Trigger spec +var WithFormat = delivery.WithFormat + // Install will create a Trigger resource, augmented with the config fn options. func Install(name string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ diff --git a/test/rekt/resources/trigger/trigger.yaml b/test/rekt/resources/trigger/trigger.yaml index 9a27dcdf68b..b7825ffb5df 100644 --- a/test/rekt/resources/trigger/trigger.yaml +++ b/test/rekt/resources/trigger/trigger.yaml @@ -96,4 +96,7 @@ spec: {{ if .delivery.backoffDelay }} backoffDelay: "{{ .delivery.backoffDelay}}" {{ end }} + {{ if .delivery.format }} + format: {{ .delivery.format }} + {{ end }} {{ end }} diff --git a/test/rekt/trigger_test.go b/test/rekt/trigger_test.go index 00121550d9d..0d96fe2b86f 100644 --- a/test/rekt/trigger_test.go +++ b/test/rekt/trigger_test.go @@ -81,6 +81,18 @@ func TestTriggerDependencyAnnotation(t *testing.T) { env.Test(ctx, t, trigger.TriggerDependencyAnnotation()) } +func TestTriggerDeliveryFormat(t *testing.T) { + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.TestSet(ctx, t, trigger.TriggerSupportsDeliveryFormat()) +} + func TestTriggerTLSSubscriber(t *testing.T) { t.Parallel()