From a295d55d3ee16d3ab2c3031deba598fc2c1c4505 Mon Sep 17 00:00:00 2001 From: Steven DOng Date: Mon, 30 Aug 2021 17:02:00 -0400 Subject: [PATCH] add Kafka defaults, starting with KEDA annotations --- cmd/source/controller/main.go | 34 ++- cmd/source/mtcontroller/main.go | 31 +- .../config-kafka-source-defaults.yaml | 58 ++++ .../400-config-kafka-source-defaults.yaml | 1 + .../400-config-kafka-source-defaults.yaml | 1 + hack/tools.go | 1 + hack/update-checksums.sh | 29 ++ pkg/apis/sources/config/doc.go | 21 ++ pkg/apis/sources/config/kafka_defaults.go | 143 +++++++++ .../sources/config/kafka_defaults_test.go | 274 ++++++++++++++++++ pkg/apis/sources/config/store.go | 97 +++++++ pkg/apis/sources/config/store_test.go | 63 ++++ .../config-kafka-source-defaults.yaml | 56 ++++ pkg/apis/sources/v1beta1/kafka_defaults.go | 26 ++ .../sources/v1beta1/kafka_defaults_test.go | 105 ++++--- .../pkg/configmap/hash-gen/main.go | 141 +++++++++ .../pkg/configmap/testing/configmap.go | 109 +++++++ vendor/modules.txt | 2 + 18 files changed, 1130 insertions(+), 62 deletions(-) create mode 100644 config/source/common/configmaps/config-kafka-source-defaults.yaml create mode 120000 config/source/multi/400-config-kafka-source-defaults.yaml create mode 120000 config/source/single/400-config-kafka-source-defaults.yaml create mode 100755 hack/update-checksums.sh create mode 100644 pkg/apis/sources/config/doc.go create mode 100644 pkg/apis/sources/config/kafka_defaults.go create mode 100644 pkg/apis/sources/config/kafka_defaults_test.go create mode 100644 pkg/apis/sources/config/store.go create mode 100644 pkg/apis/sources/config/store_test.go create mode 100644 pkg/apis/sources/config/testdata/config-kafka-source-defaults.yaml create mode 100644 vendor/knative.dev/pkg/configmap/hash-gen/main.go create mode 100644 vendor/knative.dev/pkg/configmap/testing/configmap.go diff --git a/cmd/source/controller/main.go b/cmd/source/controller/main.go index cfee368e56..48fc086a11 100644 --- a/cmd/source/controller/main.go +++ b/cmd/source/controller/main.go @@ -20,28 +20,29 @@ import ( "context" "os" - "knative.dev/eventing-kafka/pkg/apis/bindings" - "knative.dev/eventing-kafka/pkg/apis/sources" - "knative.dev/pkg/webhook/resourcesemantics/conversion" - "k8s.io/apimachinery/pkg/runtime/schema" - bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1" - sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" - - "knative.dev/eventing-kafka/pkg/source/reconciler/binding" - "knative.dev/eventing-kafka/pkg/source/reconciler/source" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" "knative.dev/pkg/signals" "knative.dev/pkg/webhook" "knative.dev/pkg/webhook/certificates" "knative.dev/pkg/webhook/psbinding" "knative.dev/pkg/webhook/resourcesemantics" + "knative.dev/pkg/webhook/resourcesemantics/conversion" "knative.dev/pkg/webhook/resourcesemantics/defaulting" "knative.dev/pkg/webhook/resourcesemantics/validation" + + "knative.dev/eventing-kafka/pkg/apis/bindings" + bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1" + "knative.dev/eventing-kafka/pkg/apis/sources" + kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config" + sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka/pkg/source/reconciler/binding" + "knative.dev/eventing-kafka/pkg/source/reconciler/source" ) const ( @@ -57,6 +58,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ var callbacks = map[schema.GroupVersionKind]validation.Callback{} func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + // Decorate contexts with the current state of the config. + kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store")) + kafkaStore.WatchConfigs(cmw) + + // Decorate contexts with the current state of the config. + ctxFunc := func(ctx context.Context) context.Context { + return kafkaStore.ToContext(ctx) + } + return defaulting.NewAdmissionController(ctx, // Name of the resource webhook. @@ -69,11 +79,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher types, // A function that infuses the context passed to Validate/SetDefaults with custom metadata. - func(ctx context.Context) context.Context { - // Here is where you would infuse the context with state - // (e.g. attach a store with configmap data) - return ctx - }, + ctxFunc, // Whether to disallow unknown fields. true, diff --git a/cmd/source/mtcontroller/main.go b/cmd/source/mtcontroller/main.go index 7a5481ca2f..ee2fb44537 100644 --- a/cmd/source/mtcontroller/main.go +++ b/cmd/source/mtcontroller/main.go @@ -20,21 +20,16 @@ import ( "context" "os" - "knative.dev/eventing-kafka/pkg/apis/bindings" - "knative.dev/eventing-kafka/pkg/apis/sources" "knative.dev/pkg/webhook/resourcesemantics/conversion" "k8s.io/apimachinery/pkg/runtime/schema" - bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1" - sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" - - "knative.dev/eventing-kafka/pkg/source/reconciler/binding" source "knative.dev/eventing-kafka/pkg/source/reconciler/mtsource" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" "knative.dev/pkg/signals" "knative.dev/pkg/webhook" "knative.dev/pkg/webhook/certificates" @@ -42,6 +37,14 @@ import ( "knative.dev/pkg/webhook/resourcesemantics" "knative.dev/pkg/webhook/resourcesemantics/defaulting" "knative.dev/pkg/webhook/resourcesemantics/validation" + + "knative.dev/eventing-kafka/pkg/apis/bindings" + bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1" + "knative.dev/eventing-kafka/pkg/apis/sources" + sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka/pkg/source/reconciler/binding" + + kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config" ) const ( @@ -57,6 +60,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ var callbacks = map[schema.GroupVersionKind]validation.Callback{} func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + // Decorate contexts with the current state of the config. + kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store")) + kafkaStore.WatchConfigs(cmw) + + // Decorate contexts with the current state of the config. + ctxFunc := func(ctx context.Context) context.Context { + return kafkaStore.ToContext(ctx) + } + return defaulting.NewAdmissionController(ctx, // Name of the resource webhook. @@ -69,11 +81,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher types, // A function that infuses the context passed to Validate/SetDefaults with custom metadata. - func(ctx context.Context) context.Context { - // Here is where you would infuse the context with state - // (e.g. attach a store with configmap data) - return ctx - }, + ctxFunc, // Whether to disallow unknown fields. true, @@ -92,6 +100,7 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher // The resources to validate. types, + // A function that infuses the context passed to Validate/SetDefaults with custom metadata. // A function that infuses the context passed to Validate/SetDefaults with custom metadata. func(ctx context.Context) context.Context { // Here is where you would infuse the context with state diff --git a/config/source/common/configmaps/config-kafka-source-defaults.yaml b/config/source/common/configmaps/config-kafka-source-defaults.yaml new file mode 100644 index 0000000000..a79338e33a --- /dev/null +++ b/config/source/common/configmaps/config-kafka-source-defaults.yaml @@ -0,0 +1,58 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-source-defaults + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel + annotations: + knative.dev/example-checksum: "b6ed351d" +data: + _example: | + ################################ + # # + # EXAMPLE CONFIGURATION # + # # + ################################ + + # This block is not actually functional configuration, + # but serves to illustrate the available configuration + # options and document them in a way that is accessible + # to users that `kubectl edit` this config map. + # + # These sample configuration options may be copied out of + # this example block and unindented to be in the data block + # to actually change the configuration. + + # autoscalingClass is the autoscaler class name to use. + # valid value: keda.autoscaling.knative.dev + # autoscalingClass: "" + + # minScale is the minimum number of replicas to scale down to. + # minScale: "1" + + # maxScale is the maximum number of replicas to scale up to. + # maxScale: "1" + + # pollingInterval is the interval in seconds KEDA uses to poll metrics. + # pollingInterval: "30" + + # cooldownPeriod is the period of time in seconds KEDA waits until it scales down. + # cooldownPeriod: "300" + + # kafkaLagThreshold is the lag (ie. number of messages in a partition) threshold for KEDA to scale up sources. + # kafkaLagThreshold: "10" diff --git a/config/source/multi/400-config-kafka-source-defaults.yaml b/config/source/multi/400-config-kafka-source-defaults.yaml new file mode 120000 index 0000000000..f54c6c367c --- /dev/null +++ b/config/source/multi/400-config-kafka-source-defaults.yaml @@ -0,0 +1 @@ +../common/configmaps/config-kafka-source-defaults.yaml \ No newline at end of file diff --git a/config/source/single/400-config-kafka-source-defaults.yaml b/config/source/single/400-config-kafka-source-defaults.yaml new file mode 120000 index 0000000000..f54c6c367c --- /dev/null +++ b/config/source/single/400-config-kafka-source-defaults.yaml @@ -0,0 +1 @@ +../common/configmaps/config-kafka-source-defaults.yaml \ No newline at end of file diff --git a/hack/tools.go b/hack/tools.go index 1dcec792fa..2c1c282f56 100644 --- a/hack/tools.go +++ b/hack/tools.go @@ -20,6 +20,7 @@ package tools import ( _ "knative.dev/hack" + _ "knative.dev/pkg/configmap/hash-gen" _ "knative.dev/pkg/hack" // Test images from eventing diff --git a/hack/update-checksums.sh b/hack/update-checksums.sh new file mode 100755 index 0000000000..3a23023f9e --- /dev/null +++ b/hack/update-checksums.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +export GO111MODULE=on + +if [ -z "${GOPATH:-}" ]; then + export GOPATH=$(go env GOPATH) +fi + +source $(dirname $0)/../vendor/knative.dev/hack/library.sh + +go run "${REPO_ROOT_DIR}/vendor/knative.dev/pkg/configmap/hash-gen" "${REPO_ROOT_DIR}"/config/source/common/configmaps/*.yaml diff --git a/pkg/apis/sources/config/doc.go b/pkg/apis/sources/config/doc.go new file mode 100644 index 0000000000..a0d4319efc --- /dev/null +++ b/pkg/apis/sources/config/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +// Package config holds the typed objects that define the schemas for +// ConfigMap objects that pertain to our API objects. +package config diff --git a/pkg/apis/sources/config/kafka_defaults.go b/pkg/apis/sources/config/kafka_defaults.go new file mode 100644 index 0000000000..6602f66b76 --- /dev/null +++ b/pkg/apis/sources/config/kafka_defaults.go @@ -0,0 +1,143 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" +) + +const ( + // KafkaDefaultsConfigName is the name of config map for the default + // configs that KafkaSource should use. + KafkaDefaultsConfigName = "config-kafka-source-defaults" + + // DefaultAutoscalingClassKey is the name of autoscaler class annotation + DefaultAutoscalingClassKey = "autoscalingClass" + + // DefaultMinScaleKey is the name of the key corresponding to the KEDA minScale annotation + DefaultMinScaleKey = "minScale" + + // DefaultMaxScaleKey is the name of the KEDA maxScale annotation + DefaultMaxScaleKey = "maxScale" + + // DefaultPollingIntervalKey is the name of the KEDA pollingInterval annotation + DefaultPollingIntervalKey = "pollingInterval" + + // DefaultCooldownPeriodKey is the name of the KEDA cooldownPeriod annotation + DefaultCooldownPeriodKey = "cooldownPeriod" + + // DefaultKafkaLagThresholdKey is the name of the KEDA kafkaLagThreshold annotation + DefaultKafkaLagThresholdKey = "kafkaLagThreshold" + + // KedaAutoscalingClass is the class name for KEDA + KedaAutoscalingClass = "keda.autoscaling.knative.dev" + + // DefaultMinScaleValue is the default value for DefaultMinScaleKey + DefaultMinScaleValue = int64(1) + + // DefaultMaxScaleValue is the default value for DefaultMaxScaleKey + DefaultMaxScaleValue = int64(1) + + // DefaultPollingIntervalValue is the default value for DefaultPollingIntervalKey + DefaultPollingIntervalValue = int64(30) + + // DefaultCooldownPeriodValue is the default value for DefaultCooldownPeriodKey + DefaultCooldownPeriodValue = int64(300) + + // DefaultKafkaLagThresholdValue is the default value for DefaultKafkaLagThresholdKey + DefaultKafkaLagThresholdValue = int64(10) +) + +// NewKafkaDefaultsConfigFromMap creates a KafkaSourceDefaults from the supplied Map +func NewKafkaDefaultsConfigFromMap(data map[string]string) (*KafkaSourceDefaults, error) { + nc := &KafkaSourceDefaults{} + + value, present := data[DefaultAutoscalingClassKey] + if !present || value == "" { + return nc, nil + } + if value != "keda.autoscaling.knative.dev" { + return nil, fmt.Errorf("invalid value %q for %s. Only keda.autoscaling.knative.dev is allowed", value, DefaultAutoscalingClassKey) + } + nc.AutoscalingClass = value + + int64Value, err := parseInt64Entry(data, DefaultMinScaleKey, DefaultMinScaleValue) + if err != nil { + return nil, err + } + nc.MinScale = int64Value + + int64Value, err = parseInt64Entry(data, DefaultMaxScaleKey, DefaultMaxScaleValue) + if err != nil { + return nil, err + } + nc.MaxScale = int64Value + + int64Value, err = parseInt64Entry(data, DefaultPollingIntervalKey, DefaultPollingIntervalValue) + if err != nil { + return nil, err + } + nc.PollingInterval = int64Value + + int64Value, err = parseInt64Entry(data, DefaultCooldownPeriodKey, DefaultCooldownPeriodValue) + if err != nil { + return nil, err + } + nc.CooldownPeriod = int64Value + + int64Value, err = parseInt64Entry(data, DefaultKafkaLagThresholdKey, DefaultKafkaLagThresholdValue) + if err != nil { + return nil, err + } + nc.KafkaLagThreshold = int64Value + + return nc, nil +} + +// NewKafkaDefaultsConfigFromConfigMap creates a KafkaSourceDefaults from the supplied configMap +func NewKafkaDefaultsConfigFromConfigMap(config *corev1.ConfigMap) (*KafkaSourceDefaults, error) { + return NewKafkaDefaultsConfigFromMap(config.Data) +} + +type KafkaSourceDefaults struct { + AutoscalingClass string `json:"autoscalingClass,omitempty"` + MinScale int64 `json:"minScale,omitempty"` + MaxScale int64 `json:"maxScale,omitempty"` + PollingInterval int64 `json:"pollingInterval,omitempty"` + CooldownPeriod int64 `json:"cooldownPeriod,omitempty"` + KafkaLagThreshold int64 `json:"kafkaLagThreshold,omitempty"` +} + +func (d *KafkaSourceDefaults) DeepCopy() *KafkaSourceDefaults { + if d == nil { + return nil + } + out := new(KafkaSourceDefaults) + *out = *d + return out +} + +func parseInt64Entry(data map[string]string, key string, defaults int64) (int64, error) { + value, present := data[key] + if !present { + return defaults, nil + } + return strconv.ParseInt(value, 0, 64) +} diff --git a/pkg/apis/sources/config/kafka_defaults_test.go b/pkg/apis/sources/config/kafka_defaults_test.go new file mode 100644 index 0000000000..7436b2ccf8 --- /dev/null +++ b/pkg/apis/sources/config/kafka_defaults_test.go @@ -0,0 +1,274 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "knative.dev/pkg/configmap/testing" + "knative.dev/pkg/system" + _ "knative.dev/pkg/system/testing" +) + +func TestNewKafkaDefaultsConfigFromConfigMap(t *testing.T) { + _, example := ConfigMapsFromTestFile(t, KafkaDefaultsConfigName) + if _, err := NewKafkaDefaultsConfigFromConfigMap(example); err != nil { + t.Error("NewKafkaDefaultsConfigFromMap(example) =", err) + } +} + +func TestKafkaDefaultsConfiguration(t *testing.T) { + testCases := []struct { + name string + wantErr bool + wantDefault KafkaSourceDefaults + wantClass string + wantMinScale int64 + wantMaxScale int64 + wantPollingInterval int64 + wantCooldownPeriod int64 + wantKafkaLagThreshold int64 + config *corev1.ConfigMap + }{{ + name: "default config", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "", + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{}, + }, + }, { + name: "example text", + wantErr: false, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "_example": ` + ################################ + # # + # EXAMPLE CONFIGURATION # + # # + ################################ + + # This block is not actually functional configuration, + # but serves to illustrate the available configuration + # options and document them in a way that is accessible + # to users that kubectl edit this config map. + # + # These sample configuration options may be copied out of + # this example block and unindented to be in the data block + # to actually change the configuration. + + # autoscalingClass is the autoscaler class name to use. + # valid value: keda.autoscaling.knative.dev + autoscalingClass: "keda.autoscaling.knative.dev" + + # minScale is the minimum number of replicas to scale down to. + # minScale: "1" + + # maxScale is the maximum number of replicas to scale up to. + # maxScale: "1" + + # pollingInterval is the interval in seconds KEDA uses to poll metrics. + # pollingInterval: "30" + + # cooldownPeriod is the period of time in seconds KEDA waits until it scales down. + # cooldownPeriod: "300" + + # kafkaLagThreshold is the lag (ie. number of messages in a partition) threshold for KEDA to scale up sources. + # kafkaLagThreshold: "10" +`, + }, + }, + }, { + name: "valid autoscaler class", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: DefaultMinScaleValue, + MaxScale: DefaultMaxScaleValue, + PollingInterval: DefaultPollingIntervalValue, + CooldownPeriod: DefaultCooldownPeriodValue, + KafkaLagThreshold: DefaultKafkaLagThresholdValue, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + }, + }, + }, { + name: "invalid autoscaler class", + wantErr: true, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "invalid", + }, + }, + }, { + name: "dangling key/value pair", + wantErr: true, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "#nothing to see here", + }, + }, + }, { + name: "change minScale default", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: 40, + MaxScale: DefaultMaxScaleValue, + PollingInterval: DefaultPollingIntervalValue, + CooldownPeriod: DefaultCooldownPeriodValue, + KafkaLagThreshold: DefaultKafkaLagThresholdValue, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + "minScale": "40", + }, + }, + }, { + name: "change maxScale default", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: DefaultMinScaleValue, + MaxScale: 60, + PollingInterval: DefaultPollingIntervalValue, + CooldownPeriod: DefaultCooldownPeriodValue, + KafkaLagThreshold: DefaultKafkaLagThresholdValue, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + "maxScale": "60", + }, + }, + }, { + name: "change pollingInterval default", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: DefaultMinScaleValue, + MaxScale: DefaultMaxScaleValue, + PollingInterval: 500, + CooldownPeriod: DefaultCooldownPeriodValue, + KafkaLagThreshold: DefaultKafkaLagThresholdValue, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + "pollingInterval": "500", + }, + }, + }, { + name: "change cooldownPeriod default", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: DefaultMinScaleValue, + MaxScale: DefaultMaxScaleValue, + PollingInterval: DefaultPollingIntervalValue, + CooldownPeriod: 900, + KafkaLagThreshold: DefaultKafkaLagThresholdValue, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + "cooldownPeriod": "900", + }, + }, + }, { + name: "change kafkaLagThreshold default", + wantErr: false, + wantDefault: KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: DefaultMinScaleValue, + MaxScale: DefaultMaxScaleValue, + PollingInterval: DefaultPollingIntervalValue, + CooldownPeriod: DefaultCooldownPeriodValue, + KafkaLagThreshold: 800, + }, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: KafkaDefaultsConfigName, + }, + Data: map[string]string{ + "autoscalingClass": "keda.autoscaling.knative.dev", + "kafkaLagThreshold": "800", + }, + }, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualDefault, err := NewKafkaDefaultsConfigFromConfigMap(tc.config) + + if (err != nil) != tc.wantErr { + t.Fatalf("Test: %q: NewKafkaDefaultsConfigFromMap() error = %v, wantErr %v", tc.name, err, tc.wantErr) + } + if !tc.wantErr { + if diff := cmp.Diff(tc.wantDefault, *actualDefault); diff != "" { + t.Error("unexpected value (-want, +got)", diff) + } + } + }) + } +} diff --git a/pkg/apis/sources/config/store.go b/pkg/apis/sources/config/store.go new file mode 100644 index 0000000000..1770dca730 --- /dev/null +++ b/pkg/apis/sources/config/store.go @@ -0,0 +1,97 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + + "knative.dev/pkg/configmap" +) + +type kafkaCfgKey struct{} + +// Config holds the collection of configurations that we attach to contexts. +// +k8s:deepcopy-gen=false +type Config struct { + KafkaSourceDefaults *KafkaSourceDefaults +} + +// FromContext extracts a Config from the provided context. +func FromContext(ctx context.Context) *Config { + x, ok := ctx.Value(kafkaCfgKey{}).(*Config) + if ok { + return x + } + return nil +} + +// FromContextOrDefaults is like FromContext, but when no Config is attached it +// returns a Config populated with the defaults for each of the Config fields. +func FromContextOrDefaults(ctx context.Context) *Config { + if cfg := FromContext(ctx); cfg != nil { + return cfg + } + kafkaDefaults, err := NewKafkaDefaultsConfigFromMap(map[string]string{}) + if err != nil || kafkaDefaults == nil { + kafkaDefaults = &KafkaSourceDefaults{} + } + x := &Config{ + KafkaSourceDefaults: kafkaDefaults, + } + + return x +} + +// ToContext attaches the provided Config to the provided context, returning the +// new context with the Config attached. +func ToContext(ctx context.Context, c *Config) context.Context { + return context.WithValue(ctx, kafkaCfgKey{}, c) +} + +// Store is a typed wrapper around configmap.Untyped store to handle our configmaps. +// +k8s:deepcopy-gen=false +type Store struct { + *configmap.UntypedStore +} + +//NewStore creates a new store of Configs and optionally calls functions when ConfigMaps are updated. +func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store { + store := &Store{ + UntypedStore: configmap.NewUntypedStore( + "kafkadefaults", + logger, + configmap.Constructors{ + KafkaDefaultsConfigName: NewKafkaDefaultsConfigFromConfigMap, + }, + onAfterStore..., + ), + } + + return store +} + +// ToContext attaches the current Config state to the provided context. +func (s *Store) ToContext(ctx context.Context) context.Context { + return ToContext(ctx, s.Load()) +} + +// Load creates a Config from the current config state of the Store. +func (s *Store) Load() *Config { + return &Config{ + KafkaSourceDefaults: s.UntypedLoad(KafkaDefaultsConfigName).(*KafkaSourceDefaults).DeepCopy(), + } +} diff --git a/pkg/apis/sources/config/store_test.go b/pkg/apis/sources/config/store_test.go new file mode 100644 index 0000000000..bb0f8e941e --- /dev/null +++ b/pkg/apis/sources/config/store_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2021 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/api/resource" + logtesting "knative.dev/pkg/logging/testing" + + . "knative.dev/pkg/configmap/testing" +) + +var ignoreStuff = cmp.Options{ + cmpopts.IgnoreUnexported(resource.Quantity{}), +} + +func TestStoreLoadWithContext(t *testing.T) { + store := NewStore(logtesting.TestLogger(t)) + + _, defaultsConfig := ConfigMapsFromTestFile(t, KafkaDefaultsConfigName) + + store.OnConfigChanged(defaultsConfig) + + config := FromContextOrDefaults(store.ToContext(context.Background())) + + t.Run("defaults", func(t *testing.T) { + expected, _ := NewKafkaDefaultsConfigFromConfigMap(defaultsConfig) + if diff := cmp.Diff(expected, config.KafkaSourceDefaults, ignoreStuff...); diff != "" { + t.Error("Unexpected defaults config (-want, +got):", diff) + t.Fatal("Unexpected defaults config (-want, +got):", diff) + } + }) +} + +func TestStoreLoadWithContextOrDefaults(t *testing.T) { + defaultsConfig := ConfigMapFromTestFile(t, KafkaDefaultsConfigName) + config := FromContextOrDefaults(context.Background()) + + t.Run("defaults", func(t *testing.T) { + expected, _ := NewKafkaDefaultsConfigFromConfigMap(defaultsConfig) + if diff := cmp.Diff(expected, config.KafkaSourceDefaults, ignoreStuff...); diff != "" { + t.Error("Unexpected defaults config (-want, +got):", diff) + } + }) +} diff --git a/pkg/apis/sources/config/testdata/config-kafka-source-defaults.yaml b/pkg/apis/sources/config/testdata/config-kafka-source-defaults.yaml new file mode 100644 index 0000000000..72b605fed6 --- /dev/null +++ b/pkg/apis/sources/config/testdata/config-kafka-source-defaults.yaml @@ -0,0 +1,56 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-source-defaults + namespace: knative-eventing +data: + _example: | + ################################ + # # + # EXAMPLE CONFIGURATION # + # # + ################################ + + # This block is not actually functional configuration, + # but serves to illustrate the available configuration + # options and document them in a way that is accessible + # to users that `kubectl edit` this config map. + # + # These sample configuration options may be copied out of + # this example block and unindented to be in the data block + # to actually change the configuration. + + # autoscalingClass is the autoscaler class name to use. + # valid value: keda.autoscaling.knative.dev + autoscalingClass: "keda.autoscaling.knative.dev" + + # minScale is the minimum number of replicas to scale down to. + # minScale: "1" + + # maxScale is the maximum number of replicas to scale up to. + # maxScale: "1" + + # pollingInterval is the interval in seconds KEDA uses to poll metrics. + # pollingInterval: "30" + + # cooldownPeriod is the period of time in seconds KEDA waits until it scales down. + # cooldownPeriod: "300" + + # kafkaLagThreshold is the lag (ie. number of messages in a partition) threshold for KEDA to scale up sources. + # kafkaLagThreshold: "10" + + diff --git a/pkg/apis/sources/v1beta1/kafka_defaults.go b/pkg/apis/sources/v1beta1/kafka_defaults.go index a09416fd6b..241aeca892 100644 --- a/pkg/apis/sources/v1beta1/kafka_defaults.go +++ b/pkg/apis/sources/v1beta1/kafka_defaults.go @@ -18,13 +18,23 @@ package v1beta1 import ( "context" + "strconv" "github.com/google/uuid" "k8s.io/utils/pointer" + + "knative.dev/eventing-kafka/pkg/apis/sources/config" ) const ( uuidPrefix = "knative-kafka-source-" + + classAnnotation = "autoscaling.knative.dev/class" + minScaleAnnotation = "autoscaling.knative.dev/minScale" + maxScaleAnnotation = "autoscaling.knative.dev/maxScale" + pollingIntervalAnnotation = "keda.autoscaling.knative.dev/pollingInterval" + cooldownPeriodAnnotation = "keda.autoscaling.knative.dev/cooldownPeriod" + kafkaLagThresholdAnnotation = "keda.autoscaling.knative.dev/kafkaLagThreshold" ) // SetDefaults ensures KafkaSource reflects the default values. @@ -40,4 +50,20 @@ func (k *KafkaSource) SetDefaults(ctx context.Context) { if k.Spec.InitialOffset == "" { k.Spec.InitialOffset = OffsetLatest } + + kafkaConfig := config.FromContextOrDefaults(ctx) + kafkaDefaults := kafkaConfig.KafkaSourceDefaults + if kafkaDefaults.AutoscalingClass == config.KedaAutoscalingClass { + if k.Annotations == nil { + k.Annotations = map[string]string{} + } + k.Annotations[classAnnotation] = kafkaDefaults.AutoscalingClass + + // Set all annotations regardless of defaults + k.Annotations[minScaleAnnotation] = strconv.FormatInt(kafkaDefaults.MinScale, 10) + k.Annotations[maxScaleAnnotation] = strconv.FormatInt(kafkaDefaults.MaxScale, 10) + k.Annotations[pollingIntervalAnnotation] = strconv.FormatInt(kafkaDefaults.PollingInterval, 10) + k.Annotations[cooldownPeriodAnnotation] = strconv.FormatInt(kafkaDefaults.CooldownPeriod, 10) + k.Annotations[kafkaLagThresholdAnnotation] = strconv.FormatInt(kafkaDefaults.KafkaLagThreshold, 10) + } } diff --git a/pkg/apis/sources/v1beta1/kafka_defaults_test.go b/pkg/apis/sources/v1beta1/kafka_defaults_test.go index fa36bcb0dc..121df51fcc 100644 --- a/pkg/apis/sources/v1beta1/kafka_defaults_test.go +++ b/pkg/apis/sources/v1beta1/kafka_defaults_test.go @@ -25,81 +25,112 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "k8s.io/utils/pointer" + "knative.dev/eventing-kafka/pkg/apis/sources/config" ) +type assertFnType func(t *testing.T, ks KafkaSource, expected interface{}) + type defaultKafkaTestArgs struct { - Name string - Initial KafkaSource - Expected string - AssertFunc func(t *testing.T, ks KafkaSource, expected string) + Name string + Defaults config.KafkaSourceDefaults + Initial KafkaSource + Expected interface{} + AssertFuncs []assertFnType } func TestSetDefaults(t *testing.T) { - assertUUID := func(t *testing.T, ks KafkaSource, expected string) { + assertUUID := func(t *testing.T, ks KafkaSource, expected interface{}) { consumerGroup := strings.Split(ks.Spec.ConsumerGroup, uuidPrefix) _, err := uuid.Parse(consumerGroup[len(consumerGroup)-1]) if err != nil { t.Fatalf("Error Parsing UUID value: %s", err) } } - assertGivenGroup := func(t *testing.T, ks KafkaSource, expected string) { + assertGivenGroup := func(t *testing.T, ks KafkaSource, expected interface{}) { if diff := cmp.Diff(ks.Spec.ConsumerGroup, expected); diff != "" { t.Fatalf("Unexpected consumerGroup Set (-want, +got): %s", diff) } } - assertConsumers := func(t *testing.T, ks KafkaSource, expected string) { - i, _ := strconv.Atoi(expected) + assertConsumers := func(t *testing.T, ks KafkaSource, expected interface{}) { + i, _ := strconv.Atoi(expected.(string)) i32 := int32(i) if diff := cmp.Diff(ks.Spec.Consumers, &i32); diff != "" { t.Fatalf("Unexpected consumers (-want, +got): %s", diff) } } - assertOffset := func(t *testing.T, ks KafkaSource, expected string) { + assertOffset := func(t *testing.T, ks KafkaSource, expected interface{}) { if diff := cmp.Diff(ks.Spec.InitialOffset, OffsetLatest); diff != "" { t.Fatalf("Unexpected initial offset (-want, +got): %s", diff) } } + assertNoAnnotations := func(t *testing.T, ks KafkaSource, expected interface{}) { + if len(ks.Annotations) != 0 { + t.Fatalf("Unexpected annotations: %v", ks.Annotations) + } + } + assertAnnotations := func(t *testing.T, ks KafkaSource, expected interface{}) { + if diff := cmp.Diff(ks.Annotations, expected); diff != "" { + t.Fatalf("Unexpected annotations (-want, +got): %s", diff) + } + } testCases := []defaultKafkaTestArgs{ { - Name: "nil spec", - Initial: KafkaSource{}, - AssertFunc: assertUUID, - }, - { + Name: "nil spec", + Initial: KafkaSource{}, + AssertFuncs: []assertFnType{assertUUID, assertNoAnnotations}, + }, { Name: "Set consumerGroup", Initial: KafkaSource{ Spec: KafkaSourceSpec{ ConsumerGroup: "foo", }, }, - Expected: "foo", - AssertFunc: assertGivenGroup, - }, - { - Name: "consumers not set", - Initial: KafkaSource{}, - Expected: "1", - AssertFunc: assertConsumers, - }, - { - Name: "consumers set", - Initial: KafkaSource{Spec: KafkaSourceSpec{Consumers: pointer.Int32Ptr(4)}}, - Expected: "4", - AssertFunc: assertConsumers, - }, - { - Name: "offset set", - Initial: KafkaSource{}, - Expected: "", - AssertFunc: assertOffset, + Expected: "foo", + AssertFuncs: []assertFnType{assertGivenGroup, assertNoAnnotations}, + }, { + Name: "consumers not set", + Initial: KafkaSource{}, + Expected: "1", + AssertFuncs: []assertFnType{assertConsumers, assertNoAnnotations}, + }, { + Name: "consumers set", + Initial: KafkaSource{Spec: KafkaSourceSpec{Consumers: pointer.Int32Ptr(4)}}, + Expected: "4", + AssertFuncs: []assertFnType{assertConsumers, assertNoAnnotations}, + }, { + Name: "offset set", + Initial: KafkaSource{}, + Expected: "", + AssertFuncs: []assertFnType{assertOffset, assertNoAnnotations}, + }, { + Name: "autoscaling config", + Defaults: config.KafkaSourceDefaults{ + AutoscalingClass: "keda.autoscaling.knative.dev", + MinScale: 40, + MaxScale: 60, + PollingInterval: 500, + CooldownPeriod: 4000, + KafkaLagThreshold: 100, + }, + Initial: KafkaSource{}, + Expected: map[string]string{ + classAnnotation: "keda.autoscaling.knative.dev", + minScaleAnnotation: "40", + maxScaleAnnotation: "60", + pollingIntervalAnnotation: "500", + cooldownPeriodAnnotation: "4000", + kafkaLagThresholdAnnotation: "100", + }, + AssertFuncs: []assertFnType{assertAnnotations}, }, } for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { - tc.Initial.SetDefaults(context.TODO()) - if tc.AssertFunc != nil { - tc.AssertFunc(t, tc.Initial, tc.Expected) + ctx := config.ToContext(context.Background(), &config.Config{KafkaSourceDefaults: &tc.Defaults}) + tc.Initial.SetDefaults(ctx) + for _, assertFunc := range tc.AssertFuncs { + assertFunc(t, tc.Initial, tc.Expected) } }) } diff --git a/vendor/knative.dev/pkg/configmap/hash-gen/main.go b/vendor/knative.dev/pkg/configmap/hash-gen/main.go new file mode 100644 index 0000000000..5846167f98 --- /dev/null +++ b/vendor/knative.dev/pkg/configmap/hash-gen/main.go @@ -0,0 +1,141 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "log" + "os" + + "gopkg.in/yaml.v3" + "knative.dev/pkg/configmap" +) + +func main() { + for _, fileName := range os.Args[1:] { + if err := processFile(fileName); err != nil { + log.Fatal(err) + } + } +} + +// processFile reads the ConfigMap manifest from a file and adds or updates the label +// containing the checksum of it's _example data if present. +func processFile(fileName string) error { + in, err := ioutil.ReadFile(fileName) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + out, err := process(in) + if out == nil || err != nil { + return err + } + + //nolint:gosec // This is not security critical so open permissions are fine. + if err := ioutil.WriteFile(fileName, out, 0644); err != nil { + return fmt.Errorf("failed to write file: %w", err) + } + return nil +} + +// process processes a YAML file's bytes and adds or updates the label containing +// the checksum of it's _example data if present. +func process(data []byte) ([]byte, error) { + var doc yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return nil, fmt.Errorf("failed to parse YAML: %w", err) + } + content := doc.Content[0] + + example := traverse(content, "data", configmap.ExampleKey) + if example == nil { + return nil, nil + } + + metadata := traverse(content, "metadata") + if metadata == nil { + return nil, errors.New("'metadata' not found") + } + + annotations := traverse(metadata, "annotations") + if annotations == nil { + annotations = &yaml.Node{Kind: yaml.MappingNode} + metadata.Content = append(metadata.Content, strNode("annotations"), annotations) + } + + checksum := configmap.Checksum(example.Value) + existingAnnotation := value(annotations, configmap.ExampleChecksumAnnotation) + if existingAnnotation != nil { + existingAnnotation.Value = checksum + } else { + sumNode := strNode(checksum) + sumNode.Style = yaml.DoubleQuotedStyle + annotations.Content = append(annotations.Content, + strNode(configmap.ExampleChecksumAnnotation), sumNode) + } + + var buffer bytes.Buffer + buffer.Grow(len(data)) + encoder := yaml.NewEncoder(&buffer) + encoder.SetIndent(2) + if err := encoder.Encode(&doc); err != nil { + return nil, fmt.Errorf("failed to encode YAML: %w", err) + } + return buffer.Bytes(), nil +} + +// traverse traverses the YAML nodes' children using the given path keys. Returns nil if +// one of the keys can't be found. +func traverse(parent *yaml.Node, path ...string) *yaml.Node { + if parent == nil || len(path) == 0 { + return parent + } + tail := path[1:] + child := value(parent, path[0]) + return traverse(child, tail...) +} + +// value returns the value of the current node under the given key. Returns nil if not +// found. +func value(parent *yaml.Node, key string) *yaml.Node { + for i := range parent.Content { + if parent.Content[i].Value == key { + // The Content array of a yaml.Node is just an array of more nodes. The keys + // to each field are therefore a string node right before the relevant field's + // value, hence we need to return the next value of the array to retrieve + // the value under a key. + if len(parent.Content) < i+2 { + return nil + } + return parent.Content[i+1] + } + } + return nil +} + +// strNode generate a node that forces the representation to be a string. +func strNode(value string) *yaml.Node { + return &yaml.Node{ + Kind: yaml.ScalarNode, + Tag: "!!str", + Value: value, + } +} diff --git a/vendor/knative.dev/pkg/configmap/testing/configmap.go b/vendor/knative.dev/pkg/configmap/testing/configmap.go new file mode 100644 index 0000000000..909036892a --- /dev/null +++ b/vendor/knative.dev/pkg/configmap/testing/configmap.go @@ -0,0 +1,109 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + "unicode" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/configmap" + "sigs.k8s.io/yaml" +) + +// ConfigMapFromTestFile creates a v1.ConfigMap from a YAML file +// It loads the YAML file from the testdata folder. +func ConfigMapFromTestFile(t testing.TB, name string, allowed ...string) *corev1.ConfigMap { + t.Helper() + + cm, _ := ConfigMapsFromTestFile(t, name, allowed...) + return cm +} + +// ConfigMapsFromTestFile creates two corev1.ConfigMap resources from the config +// file read from the testdata directory: +// 1. The raw configmap read in. +// 2. A second version of the configmap augmenting `data:` with what's parsed from the value of `_example:` +func ConfigMapsFromTestFile(t testing.TB, name string, allowed ...string) (*corev1.ConfigMap, *corev1.ConfigMap) { + t.Helper() + + b, err := ioutil.ReadFile(fmt.Sprintf("testdata/%s.yaml", name)) + if err != nil { + t.Fatal("ReadFile() =", err) + } + + var orig corev1.ConfigMap + + // Use sigs.k8s.io/yaml since it reads json struct + // tags so things unmarshal properly. + if err := yaml.Unmarshal(b, &orig); err != nil { + t.Fatal("yaml.Unmarshal() =", err) + } + + // We expect each of the allowed keys, and a key holding an example + // configuration for us to validate. + allowed = append(allowed, configmap.ExampleKey) + + if len(orig.Data) != len(allowed) { + // See here for why we only check in empty ConfigMaps: + // https://github.com/knative/serving/issues/2668 + t.Errorf("Data = %v, wanted %v", orig.Data, allowed) + } + allow := sets.NewString(allowed...) + for key := range orig.Data { + if !allow.Has(key) { + t.Errorf("Encountered key %q in %q that wasn't on the allowed list", key, name) + } + } + // With the length and membership checks, we know that the keyspace matches. + + exampleBody, hasExampleBody := orig.Data[configmap.ExampleKey] + // Check that exampleBody does not have lines that end in a trailing space. + for i, line := range strings.Split(exampleBody, "\n") { + if strings.TrimRightFunc(line, unicode.IsSpace) != line { + t.Errorf("line %d of %q example contains trailing spaces", i, name) + } + } + + // Check that the hashed exampleBody matches the assigned annotation, if present. + gotChecksum, hasExampleChecksumAnnotation := orig.Annotations[configmap.ExampleChecksumAnnotation] + if hasExampleBody && hasExampleChecksumAnnotation { + wantChecksum := configmap.Checksum(exampleBody) + if gotChecksum != wantChecksum { + t.Errorf("example checksum annotation = %s, want %s (you may need to re-run ./hack/update-codegen.sh)", gotChecksum, wantChecksum) + } + } + + // Parse exampleBody into exemplar.Data. + exemplar := orig.DeepCopy() + if err := yaml.Unmarshal([]byte(exampleBody), &exemplar.Data); err != nil { + t.Fatal("yaml.Unmarshal() =", err) + } + // Augment the sample with actual configuration. + for k, v := range orig.Data { + if _, ok := exemplar.Data[k]; ok { + continue + } + exemplar.Data[k] = v + } + + return &orig, exemplar +} diff --git a/vendor/modules.txt b/vendor/modules.txt index eeff4775ef..2b2bd0b206 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1283,7 +1283,9 @@ knative.dev/pkg/codegen/cmd/injection-gen knative.dev/pkg/codegen/cmd/injection-gen/args knative.dev/pkg/codegen/cmd/injection-gen/generators knative.dev/pkg/configmap +knative.dev/pkg/configmap/hash-gen knative.dev/pkg/configmap/informer +knative.dev/pkg/configmap/testing knative.dev/pkg/controller knative.dev/pkg/environment knative.dev/pkg/hack