Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Add Kafka Source defaults, starting with KEDA annotations #855

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions cmd/source/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,29 @@ import (
"context"
"os"

"knative.dev/eventing-kafka/pkg/apis/bindings"
"knative.dev/eventing-kafka/pkg/apis/sources"
"knative.dev/pkg/webhook/resourcesemantics/conversion"

"k8s.io/apimachinery/pkg/runtime/schema"

bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
"knative.dev/eventing-kafka/pkg/source/reconciler/source"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/certificates"
"knative.dev/pkg/webhook/psbinding"
"knative.dev/pkg/webhook/resourcesemantics"
"knative.dev/pkg/webhook/resourcesemantics/conversion"
"knative.dev/pkg/webhook/resourcesemantics/defaulting"
"knative.dev/pkg/webhook/resourcesemantics/validation"

"knative.dev/eventing-kafka/pkg/apis/bindings"
bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
"knative.dev/eventing-kafka/pkg/apis/sources"
kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
"knative.dev/eventing-kafka/pkg/source/reconciler/source"
)

const (
Expand All @@ -57,6 +58,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
var callbacks = map[schema.GroupVersionKind]validation.Callback{}

func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
// Decorate contexts with the current state of the config.
kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store"))
kafkaStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return kafkaStore.ToContext(ctx)
}

return defaulting.NewAdmissionController(ctx,

// Name of the resource webhook.
Expand All @@ -69,11 +79,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
// (e.g. attach a store with configmap data)
return ctx
},
ctxFunc,

// Whether to disallow unknown fields.
true,
Expand Down
31 changes: 20 additions & 11 deletions cmd/source/mtcontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,31 @@ import (
"context"
"os"

"knative.dev/eventing-kafka/pkg/apis/bindings"
"knative.dev/eventing-kafka/pkg/apis/sources"
"knative.dev/pkg/webhook/resourcesemantics/conversion"

"k8s.io/apimachinery/pkg/runtime/schema"

bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
source "knative.dev/eventing-kafka/pkg/source/reconciler/mtsource"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/certificates"
"knative.dev/pkg/webhook/psbinding"
"knative.dev/pkg/webhook/resourcesemantics"
"knative.dev/pkg/webhook/resourcesemantics/defaulting"
"knative.dev/pkg/webhook/resourcesemantics/validation"

"knative.dev/eventing-kafka/pkg/apis/bindings"
bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
"knative.dev/eventing-kafka/pkg/apis/sources"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka/pkg/source/reconciler/binding"

kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config"
)

const (
Expand All @@ -57,6 +60,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
var callbacks = map[schema.GroupVersionKind]validation.Callback{}

func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
// Decorate contexts with the current state of the config.
kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store"))
kafkaStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return kafkaStore.ToContext(ctx)
}

return defaulting.NewAdmissionController(ctx,

// Name of the resource webhook.
Expand All @@ -69,11 +81,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
// (e.g. attach a store with configmap data)
return ctx
},
ctxFunc,

// Whether to disallow unknown fields.
true,
Expand All @@ -92,6 +100,7 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher
// The resources to validate.
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
Expand Down
58 changes: 58 additions & 0 deletions config/source/common/configmaps/config-kafka-source-defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-source-defaults
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
annotations:
knative.dev/example-checksum: "b6ed351d"
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################

# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.

# autoscalingClass is the autoscaler class name to use.
# valid value: keda.autoscaling.knative.dev
# autoscalingClass: ""

# minScale is the minimum number of replicas to scale down to.
# minScale: "1"

# maxScale is the maximum number of replicas to scale up to.
# maxScale: "1"

# pollingInterval is the interval in seconds KEDA uses to poll metrics.
# pollingInterval: "30"

# cooldownPeriod is the period of time in seconds KEDA waits until it scales down.
# cooldownPeriod: "300"

# kafkaLagThreshold is the lag (ie. number of messages in a partition) threshold for KEDA to scale up sources.
# kafkaLagThreshold: "10"
1 change: 1 addition & 0 deletions config/source/multi/400-config-kafka-source-defaults.yaml
1 change: 1 addition & 0 deletions config/source/single/400-config-kafka-source-defaults.yaml
1 change: 1 addition & 0 deletions hack/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tools

import (
_ "knative.dev/hack"
_ "knative.dev/pkg/configmap/hash-gen"
_ "knative.dev/pkg/hack"

// Test images from eventing
Expand Down
29 changes: 29 additions & 0 deletions hack/update-checksums.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash

# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -o errexit
set -o nounset
set -o pipefail

export GO111MODULE=on

if [ -z "${GOPATH:-}" ]; then
export GOPATH=$(go env GOPATH)
fi

source $(dirname $0)/../vendor/knative.dev/hack/library.sh

go run "${REPO_ROOT_DIR}/vendor/knative.dev/pkg/configmap/hash-gen" "${REPO_ROOT_DIR}"/config/source/common/configmaps/*.yaml
21 changes: 21 additions & 0 deletions pkg/apis/sources/config/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// +k8s:deepcopy-gen=package

// Package config holds the typed objects that define the schemas for
// ConfigMap objects that pertain to our API objects.
package config
Loading