Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETs reference subscriptions/triggers on reply #7733

Merged
Merged
22 changes: 20 additions & 2 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ import (
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta2/eventtype"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/reconciler/names"
)

Expand Down Expand Up @@ -114,7 +117,22 @@ func main() {
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component))

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var featureStore *feature.Store
var handler *filter.Handler

featureStore = feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
featureFlags := value.(feature.Flags)
if featureFlags.IsEnabled(feature.EvenTypeAutoCreate) && featureStore != nil && handler != nil {
autoCreate := &eventtype.EventTypeAutoHandler{
EventTypeLister: eventtypeinformer.Get(ctx).Lister(),
EventingClient: eventingclient.Get(ctx).EventingV1beta2(),
FeatureStore: featureStore,
Logger: logger,
}
handler.EventTypeCreator = autoCreate
}

})
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
Expand All @@ -135,7 +153,7 @@ func main() {
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ rules:
- "serviceaccounts/token"
verbs:
- create
- apiGroups:
- "eventing.knative.dev"
resources:
- "eventtypes"
verbs:
- "get"
- "list"
- "watch"
- "create"
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ spec:
description: Generation of the origin of the subscriber with uid:UID.
type: integer
format: int64
name:
description: The name of the subscription
type: string
namespace:
description: The namespace of the subscription
type: string
replyUri:
description: ReplyURI is the endpoint for the reply
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/core/resources/channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ spec:
description: Generation of the origin of the subscriber with uid:UID.
type: integer
format: int64
name:
description: The name of the subscription
type: string
replyUri:
description: ReplyURI is the endpoint for the reply
type: string
Expand Down
12 changes: 12 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,18 @@ section of the resource.</p>
<tbody>
<tr>
<td>
<code>name</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>Name is used to identify the original subscription object.</p>
</td>
</tr>
<tr>
<td>
<code>uid</code><br/>
<em>
<a href="https://godoc.org/k8s.io/apimachinery/pkg/types#UID">
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1/subscribable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ var _ duck.Implementable = (*Subscribable)(nil)
//
// At least one of SubscriberURI and ReplyURI must be present
type SubscriberSpec struct {
// Name is used to identify the original subscription object.
// +optional
Name *string `json:"name,omitempty"`
// UID is used to understand the origin of the subscriber.
// +optional
UID types.UID `json:"uid,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/attributes"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/eventing/pkg/tracing"
Expand All @@ -78,12 +79,13 @@ type Handler struct {

eventDispatcher *kncloudevents.Dispatcher

triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
EventTypeCreator *eventtype.EventTypeAutoHandler
}

// NewHandler creates a new Handler and its associated EventReceiver.
Expand Down Expand Up @@ -367,6 +369,19 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
kncloudevents.WithHeader(additionalHeaders),
}

if h.EventTypeCreator != nil {
opts = append(opts, kncloudevents.WithEventTypeAutoHandler(
h.EventTypeCreator,
&duckv1.KReference{
Name: t.Name,
Namespace: t.Namespace,
APIVersion: eventingv1.SchemeGroupVersion.String(),
Kind: "Trigger",
},
t.UID,
))
}

if t.Status.Auth != nil && t.Status.Auth.ServiceAccountName != nil {
opts = append(opts, kncloudevents.WithOIDCAuthentication(&types.NamespacedName{
Name: *t.Status.Auth.ServiceAccountName,
Expand Down
25 changes: 24 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"knative.dev/eventing/pkg/apis"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"
Expand All @@ -51,6 +52,9 @@ type Subscription struct {
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
ServiceAccount *types.NamespacedName
Name string
Namespace string
UID types.UID
}

// Config for a fanout.EventHandler.
Expand Down Expand Up @@ -167,7 +171,13 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript
}
}

return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil
s := &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig, UID: sub.UID}

if sub.Name != nil {
s.Name = *sub.Name
}

return s, nil
}

func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscription) {
Expand Down Expand Up @@ -361,6 +371,19 @@ func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.
kncloudevents.WithRetryConfig(sub.RetryConfig),
}

if f.eventTypeHandler != nil && sub.Name != "" && sub.Namespace != "" && sub.UID != types.UID("") {
dispatchOptions = append(dispatchOptions, kncloudevents.WithEventTypeAutoHandler(
f.eventTypeHandler,
&duckv1.KReference{
Name: sub.Name,
Namespace: sub.Namespace,
APIVersion: messagingv1.SchemeGroupVersion.String(),
Kind: "Subscription",
},
sub.UID,
))
}

if sub.ServiceAccount != nil {
dispatchOptions = append(dispatchOptions, kncloudevents.WithOIDCAuthentication(sub.ServiceAccount))
}
Expand Down
50 changes: 44 additions & 6 deletions pkg/kncloudevents/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/buffering"
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/hashicorp/go-retryablehttp"
Expand All @@ -42,6 +43,7 @@ import (
eventingapis "knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/utils"

"knative.dev/eventing/pkg/broker"
Expand Down Expand Up @@ -115,13 +117,29 @@ func WithOIDCAuthentication(serviceAccount *types.NamespacedName) SendOption {
}
}

func WithEventTypeAutoHandler(handler *eventtype.EventTypeAutoHandler, ref *duckv1.KReference, ownerUID types.UID) SendOption {
return func(sc *senderConfig) error {
if handler != nil && (ref == nil || ownerUID == types.UID("")) {
return fmt.Errorf("addressable and ownerUID must be provided if using the eventtype auto handler")
}
sc.eventTypeAutoHandler = handler
sc.eventTypeRef = ref
sc.eventTypeOnwerUID = ownerUID

return nil
}
}

type senderConfig struct {
reply *duckv1.Addressable
deadLetterSink *duckv1.Addressable
additionalHeaders http.Header
retryConfig *RetryConfig
transformers binding.Transformers
oidcServiceAccount *types.NamespacedName
reply *duckv1.Addressable
deadLetterSink *duckv1.Addressable
additionalHeaders http.Header
retryConfig *RetryConfig
transformers binding.Transformers
oidcServiceAccount *types.NamespacedName
eventTypeAutoHandler *eventtype.EventTypeAutoHandler
eventTypeRef *duckv1.KReference
eventTypeOnwerUID types.UID
}

type Dispatcher struct {
Expand Down Expand Up @@ -229,6 +247,10 @@ func (d *Dispatcher) send(ctx context.Context, message binding.Message, destinat

messagesToFinish = append(messagesToFinish, responseMessage)

if config.eventTypeAutoHandler != nil {
d.handleAutocreate(ctx, responseMessage, config)
}

if config.reply == nil {
return dispatchExecutionInfo, nil
}
Expand Down Expand Up @@ -332,6 +354,22 @@ func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressab
return ctx, responseMessage, &dispatchInfo, nil
}

func (d *Dispatcher) handleAutocreate(ctx context.Context, responseMessage binding.Message, config *senderConfig) {
// messages can only be read once, so we need to make a copy of it
messageCopy, err := buffering.CopyMessage(ctx, responseMessage)
if err != nil {
return
}
defer responseMessage.Finish(nil)

responseEvent, err := binding.ToEvent(ctx, messageCopy)
if err != nil {
return
}

config.eventTypeAutoHandler.AutoCreateEventType(ctx, responseEvent, config.eventTypeRef, config.eventTypeOnwerUID)
}

func (d *Dispatcher) createRequest(ctx context.Context, message binding.Message, target duckv1.Addressable, additionalHeaders http.Header, oidcServiceAccount *types.NamespacedName, transformers ...binding.Transformer) (*http.Request, error) {
request, err := http.NewRequestWithContext(ctx, "POST", target.URL.String(), nil)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,18 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (

for i, sub := range imc.Spec.Subscribers {
conf, err := fanout.SubscriberSpecToFanoutConfig(sub)
if err != nil {
return nil, err
}

conf.Namespace = imc.Namespace
if isOIDCEnabled {
conf.ServiceAccount = &types.NamespacedName{
Name: *sub.Auth.ServiceAccountName,
Namespace: imc.Namespace,
}
}
if err != nil {
return nil, err
}

subs[i] = *conf
}

Expand Down
Loading
Loading