diff --git a/pkg/apis/config/defaults.go b/pkg/apis/config/defaults.go index 368f1be7e6c..c3b06be5344 100644 --- a/pkg/apis/config/defaults.go +++ b/pkg/apis/config/defaults.go @@ -85,6 +85,22 @@ type Defaults struct { type ClassAndBrokerConfig struct { BrokerClass string `json:"brokerClass,omitempty"` *BrokerConfig `json:",inline"` + // BrokerClasses set the multiple configurations of different brokerClass. + // Different brokerClass use corresponding brokerConfig for all the namespaces. + BrokerClasses map[string]*BrokerConfigSpec `json:"brokerClasses,omitempty"` +} + +// BrokerConfigSpec contains concrete spec for broker. +type BrokerConfigSpec struct { + Spec *BrokerSpec `json:"spec,omitempty"` +} + +// BrokerSpec contains the Config and Delivery for broker. Allows configuring the Config +// which is a KReference that specifies configuration options for this Broker. +// Delivery contains the delivery spec for each trigger to this Broker. +type BrokerSpec struct { + Config *duckv1.KReference `json:"config,omitempty"` + Delivery *eventingduckv1.DeliverySpec `json:"delivery,omitempty"` } // BrokerConfig contains configuration for a given namespace for broker. Allows @@ -103,12 +119,24 @@ func (d *Defaults) GetBrokerConfig(ns string) (*BrokerConfig, error) { return nil, errors.New("Defaults are nil") } value, present := d.NamespaceDefaultsConfig[ns] + // get namespace default config if present && value.BrokerConfig != nil { return value.BrokerConfig, nil } + ndConfig := getClassConfig(d, value) + if ndConfig != nil { + return ndConfig, nil + } + + // get cluster default config + cdConfig := getClassConfig(d, d.ClusterDefault) + if cdConfig != nil { + return cdConfig, nil + } if d.ClusterDefault != nil && d.ClusterDefault.BrokerConfig != nil { return d.ClusterDefault.BrokerConfig, nil } + return nil, errors.New("Defaults for Broker Configurations have not been set up.") } @@ -128,3 +156,36 @@ func (d *Defaults) GetBrokerClass(ns string) (string, error) { } return "", errors.New("Defaults for Broker Configurations have not been set up.") } + +// getClassConfig get the corresponding class Config in multiple classes +func getClassConfig(d *Defaults, cb *ClassAndBrokerConfig) *BrokerConfig { + if cb == nil || cb.BrokerClass == "" { + return nil + } + if bCSpec := getBrokerClasses(d); bCSpec != nil { + bConfig := matchBrokerClass(cb.BrokerClass, bCSpec) + if bConfig != nil { + return bConfig + } + } + return nil +} + +// getBrokerClasses get the configurations of multiple brokerClass +func getBrokerClasses(d *Defaults) map[string]*BrokerConfigSpec { + if d.ClusterDefault != nil && d.ClusterDefault.BrokerClasses != nil { + return d.ClusterDefault.BrokerClasses + } + return nil +} + +// matchBrokerClass find the corresponding brokerConfig for a given brokerClass +func matchBrokerClass(brokerClass string, brokerClasses map[string]*BrokerConfigSpec) *BrokerConfig { + var bConfig BrokerConfig + if bCSpec, ok := brokerClasses[brokerClass]; ok { + bConfig.KReference = bCSpec.Spec.Config + bConfig.Delivery = bCSpec.Spec.Delivery + return &bConfig + } + return nil +} diff --git a/pkg/apis/config/defaults_test.go b/pkg/apis/config/defaults_test.go index 2a527a7c02d..ab675a4fb66 100644 --- a/pkg/apis/config/defaults_test.go +++ b/pkg/apis/config/defaults_test.go @@ -46,8 +46,11 @@ func TestGetBrokerConfig(t *testing.T) { if err != nil { t.Error("GetBrokerConfig Failed =", err) } - if c.Name != "somename" { - t.Error("GetBrokerConfig Failed, wanted somename, got:", c.Name) + if c.Name != "mt-test" { + t.Error("GetBrokerConfig Failed, wanted mt-test, got:", c.Name) + } + if c.Delivery.DeadLetterSink.Ref.Name != "mt-handle-error" { + t.Error("GetBrokerConfig Failed, wanted mt-handle-error, got:", c.Delivery.DeadLetterSink.Ref.Name) } c, err = defaults.GetBrokerConfig("some-namespace") if err != nil { @@ -56,6 +59,28 @@ func TestGetBrokerConfig(t *testing.T) { if c.Name != "someothername" { t.Error("GetBrokerConfig Failed, wanted someothername, got:", c.Name) } + // Test GetBrokerConfig in different namespace + c, err = defaults.GetBrokerConfig("some-namespace-two") + if err != nil { + t.Error("GetBrokerConfig Failed =", err) + } + if c.Name != "kafka-test" { + t.Error("GetBrokerConfig Failed, wanted kafka-test, got:", c.Name) + } + if c.Delivery.DeadLetterSink.Ref.Name != "kafka-handle-error" { + t.Error("GetBrokerConfig Failed, wanted kafka-handle-error, got:", c.Delivery.DeadLetterSink.Ref.Name) + } + + c, err = defaults.GetBrokerConfig("some-namespace-three") + if err != nil { + t.Error("GetBrokerConfig Failed =", err) + } + if c.Name != "kafka-test" { + t.Error("GetBrokerConfig Failed, wanted kafka-test, got:", c.Name) + } + if c.Delivery.DeadLetterSink.Ref.Name != "kafka-handle-error" { + t.Error("GetBrokerConfig Failed, wanted kafka-handle-error, got:", c.Delivery.DeadLetterSink.Ref.Name) + } // Nil and empty tests var nilDefaults *Defaults @@ -117,6 +142,34 @@ func TestGetBrokerClass(t *testing.T) { } func TestDefaultsConfiguration(t *testing.T) { + brokerClasses := make(map[string]*BrokerConfigSpec) + brokerSpec1 := &BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "mt-test", + }, + Delivery: nil, + } + brokerConfigSpec1 := &BrokerConfigSpec{ + Spec: brokerSpec1, + } + brokerSpec2 := &BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "kafka-test", + }, + Delivery: nil, + } + brokerConfigSpec2 := &BrokerConfigSpec{ + Spec: brokerSpec2, + } + brokerClasses["MTChannelBasedBroker"] = brokerConfigSpec1 + brokerClasses["KafkaBroker"] = brokerConfigSpec2 + configTests := []struct { name string wantErr bool @@ -415,6 +468,55 @@ func TestDefaultsConfiguration(t *testing.T) { kind: ConfigMap name: someothernametoo namespace: someothernamespacetoo +`, + }, + }, + }, { + name: "only clusterdefault specified values add brokerClasses", + wantErr: false, + wantDefaults: &Defaults{ + ClusterDefault: &ClassAndBrokerConfig{ + BrokerClass: "clusterbrokerclass", + BrokerConfig: &BrokerConfig{ + KReference: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "somename", + }, + Delivery: nil, + }, + BrokerClasses: brokerClasses, + }, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: DefaultsConfigName, + }, + Data: map[string]string{ + "default-br-config": ` + clusterDefault: + brokerClass: clusterbrokerclass + apiVersion: v1 + kind: ConfigMap + name: somename + namespace: knative-eventing + brokerClasses: + MTChannelBasedBroker: + spec: + config: + apiVersion: v1 + kind: ConfigMap + name: mt-test + namespace: knative-eventing + KafkaBroker: + spec: + config: + apiVersion: v1 + kind: ConfigMap + name: kafka-test + namespace: knative-eventing `, }, }, diff --git a/pkg/apis/config/testdata/config-br-defaults.yaml b/pkg/apis/config/testdata/config-br-defaults.yaml index 15b8e1db29b..1656327a756 100644 --- a/pkg/apis/config/testdata/config-br-defaults.yaml +++ b/pkg/apis/config/testdata/config-br-defaults.yaml @@ -40,10 +40,45 @@ data: ref: apiVersion: serving.knative.dev/v1 kind: Service - name: handle-error + name: cluster-handle-error namespace: knative-eventing backoffPolicy: exponential backoffDelay: 3s + brokerClasses: + MTChannelBasedBroker: + spec: + delivery: + retry: 3 + deadLetterSink: + ref: + apiVersion: serving.knative.dev/v1 + kind: Service + name: mt-handle-error + namespace: knative-eventing + backoffPolicy: exponential + backoffDelay: 3s + config: + apiVersion: v1 + kind: ConfigMap + name: mt-test + namespace: knative-eventing + KafkaBroker: + spec: + delivery: + retry: 3 + deadLetterSink: + ref: + apiVersion: serving.knative. + kind: Service + name: kafka-handle-error + namespace: knative-eventing + backoffPolicy: exponential + backoffDelay: 3s + config: + apiVersion: v1 + kind: ConfigMap + name: kafka-test + namespace: knative-eventing namespaceDefaults: some-namespace: @@ -62,3 +97,7 @@ data: namespace: someothernamespace backoffPolicy: linear backoffDelay: 5s + some-namespace-two: + brokerClass: KafkaBroker + some-namespace-three: + brokerClass: KafkaBroker diff --git a/pkg/apis/config/zz_generated.deepcopy.go b/pkg/apis/config/zz_generated.deepcopy.go index a26083d98d5..417a4cc5f79 100644 --- a/pkg/apis/config/zz_generated.deepcopy.go +++ b/pkg/apis/config/zz_generated.deepcopy.go @@ -52,6 +52,53 @@ func (in *BrokerConfig) DeepCopy() *BrokerConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BrokerConfigSpec) DeepCopyInto(out *BrokerConfigSpec) { + *out = *in + if in.Spec != nil { + in, out := &in.Spec, &out.Spec + *out = new(BrokerSpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerConfigSpec. +func (in *BrokerConfigSpec) DeepCopy() *BrokerConfigSpec { + if in == nil { + return nil + } + out := new(BrokerConfigSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BrokerSpec) DeepCopyInto(out *BrokerSpec) { + *out = *in + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = new(v1.KReference) + **out = **in + } + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(duckv1.DeliverySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerSpec. +func (in *BrokerSpec) DeepCopy() *BrokerSpec { + if in == nil { + return nil + } + out := new(BrokerSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClassAndBrokerConfig) DeepCopyInto(out *ClassAndBrokerConfig) { *out = *in @@ -60,6 +107,21 @@ func (in *ClassAndBrokerConfig) DeepCopyInto(out *ClassAndBrokerConfig) { *out = new(BrokerConfig) (*in).DeepCopyInto(*out) } + if in.BrokerClasses != nil { + in, out := &in.BrokerClasses, &out.BrokerClasses + *out = make(map[string]*BrokerConfigSpec, len(*in)) + for key, val := range *in { + var outVal *BrokerConfigSpec + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(BrokerConfigSpec) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } + } return } diff --git a/pkg/apis/eventing/v1/broker_defaults_test.go b/pkg/apis/eventing/v1/broker_defaults_test.go index 4f341a68a44..44e17658dfc 100644 --- a/pkg/apis/eventing/v1/broker_defaults_test.go +++ b/pkg/apis/eventing/v1/broker_defaults_test.go @@ -109,6 +109,35 @@ var ( }, }, }, + "mynamespace4": { + BrokerConfig: &config.BrokerConfig{ + KReference: &duckv1.KReference{ + APIVersion: "v1", + Kind: "ConfigMap", + Namespace: "knative-eventing", + Name: "kafka-channel", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "handle-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + }, + }, + "mynamespace5": { + BrokerClass: "RabbitmqBroker", + }, + "mynamespace6": { + BrokerClass: "KafkaBroker", + }, }, ClusterDefault: &config.ClassAndBrokerConfig{ BrokerClass: eventing.MTChannelBrokerClassValue, @@ -139,7 +168,8 @@ var ( ) func TestBrokerSetDefaults(t *testing.T) { - testCases := map[string]struct { + // testCases1 is the original testcases + testCases1 := map[string]struct { initial Broker expected Broker }{ @@ -462,7 +492,7 @@ func TestBrokerSetDefaults(t *testing.T) { }, }, } - for n, tc := range testCases { + for n, tc := range testCases1 { t.Run(n, func(t *testing.T) { tc.initial.SetDefaults(config.ToContext(context.Background(), defaultConfig)) if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { @@ -470,4 +500,211 @@ func TestBrokerSetDefaults(t *testing.T) { } }) } + + // Add cluster-level multi-class-based configs to defaultConfig + brokerClasses := make(map[string]*config.BrokerConfigSpec) + brokerSpecMTChannelBasedBroker := &config.BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "mt-test", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "mt-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + } + brokerSpecKafkaBroker := &config.BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "kafka-test", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "kafka-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + } + brokerSpecRabbitmqBroker := &config.BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "rabbitmq-test", + }, + Delivery: nil, + } + brokerConfigSpecMTCBroker := &config.BrokerConfigSpec{ + Spec: brokerSpecMTChannelBasedBroker, + } + brokerConfigSpecKafkaBroker := &config.BrokerConfigSpec{ + Spec: brokerSpecKafkaBroker, + } + brokerConfigSpecRabbitmqBroker := &config.BrokerConfigSpec{ + Spec: brokerSpecRabbitmqBroker, + } + brokerClasses["MTChannelBasedBroker"] = brokerConfigSpecMTCBroker + brokerClasses["KafkaBroker"] = brokerConfigSpecKafkaBroker + brokerClasses["RabbitmqBroker"] = brokerConfigSpecRabbitmqBroker + defaultConfig.Defaults.ClusterDefault.BrokerClasses = brokerClasses + + // After adding cluster-level multi-class-based configs to defaultConfig(BrokerClasses) + testCases2 := map[string]struct { + initial Broker + expected Broker + }{ + "no config, uses clusterDefault broker class and namespace4's broker config": { + initial: Broker{ + ObjectMeta: metav1.ObjectMeta{Namespace: "mynamespace4"}, + }, + expected: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mynamespace4", + Annotations: map[string]string{ + eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, + }, + }, + Spec: BrokerSpec{ + Config: &duckv1.KReference{ + APIVersion: "v1", + Kind: "ConfigMap", + Namespace: "knative-eventing", + Name: "kafka-channel", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "handle-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + }, + }, + }, + "no config, uses mynamespace5's broker class and corresponding config, cluster class": { + initial: Broker{ + ObjectMeta: metav1.ObjectMeta{Namespace: "mynamespace5"}, + }, + expected: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mynamespace5", + Annotations: map[string]string{ + eventing.BrokerClassKey: "RabbitmqBroker", + }, + }, + Spec: BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "rabbitmq-test", + }, + Delivery: nil, + }, + }, + }, + "no config, uses mynamespace6's broker class and corresponding config, cluster class": { + initial: Broker{ + ObjectMeta: metav1.ObjectMeta{Namespace: "mynamespace6"}, + }, + expected: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mynamespace6", + Annotations: map[string]string{ + eventing.BrokerClassKey: "KafkaBroker", + }, + }, + Spec: BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "kafka-test", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "kafka-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + }, + }, + }, + "no config, missing namespace, uses clusterDefault broker class and corresponding config ": { + initial: Broker{ + ObjectMeta: metav1.ObjectMeta{Namespace: "mynamespace-random"}, + }, + expected: Broker{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mynamespace-random", + Annotations: map[string]string{ + eventing.BrokerClassKey: eventing.MTChannelBrokerClassValue, + }, + }, + Spec: BrokerSpec{ + Config: &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Namespace: "knative-eventing", + Name: "mt-test", + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: "default", + Name: "mt-error", + APIVersion: "serving.knative.dev/v1", + }, + }, + Retry: pointer.Int32(5), + BackoffPolicy: (*eventingduckv1.BackoffPolicyType)(pointer.String("exponential")), + BackoffDelay: pointer.String("5s"), + }, + }, + }, + }, + } + for n, tc := range testCases2 { + t.Run(n, func(t *testing.T) { + tc.initial.SetDefaults(config.ToContext(context.Background(), defaultConfig)) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatal("Unexpected defaults (-want, +got):", diff) + } + }) + } + }