Skip to content

Commit

Permalink
Create eventtypes on reply events to triggers and subscriptions (#4077)
Browse files Browse the repository at this point in the history
* feat: refactor contract to have shared feature flags message

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: control plane provisions new feature flag message in contract

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: receiver uses new feature flags message in contract

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: handle autocreate on reply events

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: set up dispatcher to handle et autocreate

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: add e2e test to verify eventtypes are created on reply

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: do not autocreate events on channel when owned by a broker

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix(test): kafka sink oidc tests work with featureflags contract

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: base response handler build issue is resolved

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: consumer provisions feature flags

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: reference is correctly provisioned for dispatchers in control plance

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Aug 26, 2024
1 parent fc37e5d commit 4adf16f
Show file tree
Hide file tree
Showing 23 changed files with 1,483 additions and 447 deletions.
294 changes: 179 additions & 115 deletions control-plane/pkg/contract/contract.pb.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,10 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
Uid: string(broker.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(broker),
EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate),
Path: receiver.PathFromObject(broker),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate),
},
BootstrapServers: config.GetBootstrapServers(),
Reference: &contract.Reference{
Expand Down
31 changes: 30 additions & 1 deletion control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -253,6 +254,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -375,6 +377,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -439,6 +442,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -502,6 +506,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -574,6 +579,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURLFrom(BrokerNamespace, ServiceName)},
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -694,6 +700,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -763,6 +770,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -857,6 +865,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -957,6 +966,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
EgressConfig: &contract.EgressConfig{DeadLetter: "http://www.my-sink.com/api"},
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -1062,6 +1072,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -1155,6 +1166,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -1222,6 +1234,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -1334,6 +1347,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -1428,6 +1442,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Version: SecretResourceVersion,
},
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -1661,7 +1676,9 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
FeatureFlags: FeatureFlagsETAutocreate(false),

Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
},
},
Generation: 2,
Expand Down Expand Up @@ -1772,6 +1789,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BackoffPolicy: contract.BackoffPolicy_Exponential,
BackoffDelay: 2000,
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -1851,6 +1869,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BackoffPolicy: contract.BackoffPolicy_Linear,
BackoffDelay: 2000,
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -1927,6 +1946,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
EgressConfig: &contract.EgressConfig{
DeadLetter: ServiceURL,
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -2006,6 +2026,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BackoffPolicy: contract.BackoffPolicy_Linear,
BackoffDelay: env.DefaultBackoffDelayMs,
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 2,
Expand Down Expand Up @@ -2068,6 +2089,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -2129,6 +2151,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -2206,6 +2229,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -2317,6 +2341,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
Version: SecretResourceVersion,
},
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -2429,6 +2454,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
}},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -2531,6 +2557,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -3084,6 +3111,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down Expand Up @@ -3155,6 +3183,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) {
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURL},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env)
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
FeatureFlags: FeatureFlagsETAutocreate(false),
},
},
Generation: 1,
Expand Down
18 changes: 15 additions & 3 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,11 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
Uid: string(channel.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Host: receiver.Host(channel.GetNamespace(), channel.GetName()),
EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate),
Path: receiver.Path(channel.GetNamespace(), channel.GetName()),
Host: receiver.Host(channel.GetNamespace(), channel.GetName()),
Path: receiver.Path(channel.GetNamespace(), channel.GetName()),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate) && !ownedByBroker(channel),
},
BootstrapServers: config.GetBootstrapServers(),
Reference: &contract.Reference{
Expand Down Expand Up @@ -862,3 +864,13 @@ func mergeDeliverySpecs(d1, d2 *v1.DeliverySpec) *v1.DeliverySpec {

return d
}

func ownedByBroker(channel *messagingv1beta1.KafkaChannel) bool {
for _, ref := range channel.OwnerReferences {
if strings.EqualFold(ref.Kind, "broker") {
return true
}
}

return false
}
Loading

0 comments on commit 4adf16f

Please sign in to comment.