Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dispatcher format for triggers #8151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/core/resources/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ spec:
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
type: integer
format: int32
format:
description: Format is the format used to serialize the event into a http request when delivering the event. It can be json (for structured events), binary (for binary events), or unset.
type: string
filter:
description: 'Filter is the filter to apply against all events from the Broker. Only events that pass this filter will be sent to the Subscriber. If not specified, will default to allowing all events.'
type: object
Expand Down
13 changes: 11 additions & 2 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,15 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
Audience: trigger.Status.SubscriberAudience,
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl)
sendOptions := []kncloudevents.SendOption{}
if trigger.Spec.Delivery != nil && trigger.Spec.Delivery.Format != nil {
sendOptions = append(sendOptions, kncloudevents.WithEventFormat(trigger.Spec.Delivery.Format))
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl, sendOptions...)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32, sendOpts ...kncloudevents.SendOption) {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down Expand Up @@ -454,6 +459,10 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
}))
}

if len(sendOpts) > 0 {
opts = append(opts, sendOpts...)
}

dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, opts...)
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))
Expand Down
34 changes: 34 additions & 0 deletions test/rekt/features/trigger/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trigger

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/test"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -96,6 +97,39 @@ func TriggerDependencyAnnotation() *feature.Feature {
return f
}

func TriggerSupportsDeliveryFormat() *feature.FeatureSet {
return &feature.FeatureSet{
Name: "Trigger supports delivery format",
Features: []*feature.Feature{triggerWithDispatcherFormat("json"), triggerWithDispatcherFormat("binary")},
}
}

func triggerWithDispatcherFormat(format string) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("Trigger supports sending with %s delivery format", format))

brokerName := feature.MakeRandomK8sName("broker")
sourceName := feature.MakeRandomK8sName("source")
sinkName := feature.MakeRandomK8sName("sink")
triggerName := feature.MakeRandomK8sName("trigger")
eventToSend := test.FullEvent()

f.Setup("Install Broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("Broker is ready", broker.IsReady(brokerName))
f.Setup("Broker is addressable", broker.IsAddressable(brokerName))

f.Setup("Install Sink", eventshub.Install(sinkName, eventshub.VerifyEventFormat(format), eventshub.StartReceiver))

f.Setup("Install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithFormat(format), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
f.Setup("Trigger is ready", trigger.IsReady(triggerName))

f.Requirement("Install source", eventshub.Install(sourceName, eventshub.InputEvent(eventToSend), eventshub.StartSenderToResource(broker.GVR(), brokerName)))

f.Alpha("trigger").
Must("dispatch event with correct format", assert.OnStore(sinkName).MatchReceivedEvent(test.HasId(eventToSend.ID())).AtLeast(1))

return f
}

func TriggerWithTLSSubscriber() *feature.Feature {
f := feature.NewFeatureNamed("Trigger with TLS subscriber")

Expand Down
11 changes: 11 additions & 0 deletions test/rekt/resources/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff
}
}

func WithFormat(format string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["delivery"]; !set {
cfg["delivery"] = map[string]interface{}{}
}
delivery := cfg["delivery"].(map[string]interface{})

delivery["format"] = format
}
}

// WithTimeout adds the timeout related config to the config.
func WithTimeout(timeout string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ var WithRetry = delivery.WithRetry
// WithTimeout adds the timeout related config to the config.
var WithTimeout = delivery.WithTimeout

// WithFormat adds the format related config to a Trigger spec
var WithFormat = delivery.WithFormat

// Install will create a Trigger resource, augmented with the config fn options.
func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
cfg := map[string]interface{}{
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ spec:
{{ if .delivery.backoffDelay }}
backoffDelay: "{{ .delivery.backoffDelay}}"
{{ end }}
{{ if .delivery.format }}
format: {{ .delivery.format }}
{{ end }}
{{ end }}
12 changes: 12 additions & 0 deletions test/rekt/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ func TestTriggerDependencyAnnotation(t *testing.T) {
env.Test(ctx, t, trigger.TriggerDependencyAnnotation())
}

func TestTriggerDeliveryFormat(t *testing.T) {
ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.TestSet(ctx, t, trigger.TriggerSupportsDeliveryFormat())
}

func TestTriggerTLSSubscriber(t *testing.T) {
t.Parallel()

Expand Down
Loading