Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use triggerv2 controller in shared main #3558

Merged
merged 40 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cd2cbaa
Use triggerv2 controller in shared main
Cali0707 Jan 2, 2024
38db821
fixed broker deployment after v2 migration
Cali0707 Jan 2, 2024
52a46c8
updated consumer to properly reconcile new trigger filters
Cali0707 Jan 3, 2024
5697b7e
have namespaced broker reconcile data plane with stateful sets
Cali0707 Jan 3, 2024
f61df5b
fix permissions for statefulset
Cali0707 Jan 4, 2024
cc05ef5
fix error message
Cali0707 Jan 4, 2024
c6e70c4
update codegen
Cali0707 Jan 4, 2024
1c42c98
deploy the kafka-broker-receiver as a stateful set too
Cali0707 Jan 5, 2024
87a75ba
added broker migration post install job
Cali0707 Jan 5, 2024
f3304da
fix secret parsing for consumergroups
Cali0707 Jan 5, 2024
e0d8737
fix clusterrole for post install job
Cali0707 Jan 5, 2024
5bbb403
fix imports
Cali0707 Jan 5, 2024
528f899
correctly set consumer egress auth depending on secret format
Cali0707 Jan 8, 2024
107f00f
fix namespaced broker reconciliation of statefulsets
Cali0707 Jan 11, 2024
cf3820d
fix unit tests
Cali0707 Jan 11, 2024
426ee95
fix linter error
Cali0707 Jan 11, 2024
104c3bc
hack: set blockOwnerDeletion to false on consumergroups
Cali0707 Jan 16, 2024
5dcc93a
merged main
Cali0707 Jan 24, 2024
b25d2e8
Merge branch 'main' into use-triggerv2
Cali0707 Feb 9, 2024
e4a081f
fix: fixed consumergroup blockownerdeletion
Cali0707 Feb 9, 2024
a765040
hack: print cg in trigger status when cg is not ready
Cali0707 Feb 20, 2024
5aac963
Merge branch 'main' into use-triggerv2
Cali0707 Feb 21, 2024
8b4aba0
fixed merge conflict
Cali0707 Mar 4, 2024
70c1a38
Merge branch 'main' into use-triggerv2
Cali0707 Mar 12, 2024
ecc2285
fix: use kafka-broker-brokers-triggers for receiver cm
Cali0707 Mar 13, 2024
4b961a1
test fix: broker cm watcher watches correct cm
Cali0707 Mar 13, 2024
8424ad2
fix: OIDC works for triggerv2
Cali0707 Mar 13, 2024
1f84e84
use filtered informer
Cali0707 Mar 14, 2024
820e7b1
use filtered global resync for triggerv2
Cali0707 Mar 14, 2024
7fce2dc
maybe get more logs
Cali0707 Mar 14, 2024
3e76cbd
fix: upgrade tests cleanup triggerv2 resources
Cali0707 Mar 14, 2024
4b3776d
ignore not found error
Cali0707 Mar 15, 2024
9560405
upgrade tests fixes
Cali0707 Mar 15, 2024
cdd2d62
add logs to upgrade tests
Cali0707 Mar 18, 2024
e5aee80
fixed unit tests
Cali0707 Mar 18, 2024
9393f75
fix upgrade logger setup
Cali0707 Mar 18, 2024
9c54267
goimports
Cali0707 Mar 18, 2024
720bdef
fix linter error
Cali0707 Mar 18, 2024
d12c18f
use filtered global resync for namespaced triggers as well
Cali0707 Apr 2, 2024
8b07dc2
fix: triggerv2 supports cg id templates
Cali0707 Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
Expand All @@ -38,6 +39,7 @@
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/sink"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/source"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger"
triggerv2 "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/v2"
)

const (
Expand All @@ -64,6 +66,7 @@
ctx := signals.NewContext()
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
auth.OIDCLabelSelector,

Check warning on line 69 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L69

Added line #L69 was not covered by tests
)
ctx = clientpool.WithKafkaClientPool(ctx)

Expand All @@ -81,7 +84,7 @@
injection.NamedControllerConstructor{
Name: "trigger-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return trigger.NewController(ctx, watcher, brokerEnv)
return triggerv2.NewController(ctx, watcher, brokerEnv)

Check warning on line 87 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L87

Added line #L87 was not covered by tests
},
},

Expand Down
78 changes: 78 additions & 0 deletions control-plane/cmd/post-install/kafka_broker_deployment_deleter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/system"
)

type kafkaDeploymentDeleter struct {
k8s kubernetes.Interface
}

func (k *kafkaDeploymentDeleter) DeleteBrokerDeployments(ctx context.Context) error {
deployments := []string{
"kafka-broker-receiver",
"kafka-broker-dispatcher",

Check warning on line 38 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L35-L38

Added lines #L35 - L38 were not covered by tests
}

for _, deployment := range deployments {
if err := k.deleteDeployment(ctx, deployment); err != nil {
return fmt.Errorf("failed to delete deployment %s: %v", deployment, err)

Check warning on line 43 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L41-L43

Added lines #L41 - L43 were not covered by tests
}
}

return nil

Check warning on line 47 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L47

Added line #L47 was not covered by tests
}

func (k *kafkaDeploymentDeleter) deleteDeployment(ctx context.Context, deploymentName string) error {
err := k.waiteStatefulSetExists(ctx, deploymentName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo waite.... unless it's a word I don't know about

if err != nil {
return fmt.Errorf("failed while waiting for statefulset to come up: %w", err)

Check warning on line 53 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}

err = k.k8s.
AppsV1().
Deployments(system.Namespace()).
Delete(ctx, deploymentName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment %s/%s: %w", system.Namespace(), deploymentName, err)

Check warning on line 61 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L56-L61

Added lines #L56 - L61 were not covered by tests
}

return nil

Check warning on line 64 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L64

Added line #L64 was not covered by tests
}

func (k *kafkaDeploymentDeleter) waiteStatefulSetExists(ctx context.Context, statefulSetName string) error {
return wait.PollUntilContextTimeout(ctx, 10*time.Second, 10*time.Minute, false, func(ctx context.Context) (done bool, err error) {
_, err = k.k8s.AppsV1().StatefulSets(system.Namespace()).Get(ctx, statefulSetName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil

Check warning on line 71 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L67-L71

Added lines #L67 - L71 were not covered by tests
}
if err != nil {
return false, fmt.Errorf("failed to get statefulset %s/%s: %w", system.Namespace(), statefulSetName, err)

Check warning on line 74 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}
return true, nil

Check warning on line 76 in control-plane/cmd/post-install/kafka_broker_deployment_deleter.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/kafka_broker_deployment_deleter.go#L76

Added line #L76 was not covered by tests
})
}
48 changes: 48 additions & 0 deletions control-plane/cmd/post-install/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,54 @@

package main

import (
"context"
"flag"
"fmt"
"log"

"k8s.io/client-go/kubernetes"

"knative.dev/pkg/environment"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
)

func main() {
ctx := signals.NewContext()

Check warning on line 33 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L33

Added line #L33 was not covered by tests

config, err := logging.NewConfigFromMap(nil)
if err != nil {
log.Fatal("Failed to create logging config: ", err)

Check warning on line 37 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}

logger, _ := logging.NewLoggerFromConfig(config, "kafka-broker-post-install")
defer logger.Sync()

Check warning on line 41 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L40-L41

Added lines #L40 - L41 were not covered by tests

logging.WithLogger(ctx, logger)

Check warning on line 43 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L43

Added line #L43 was not covered by tests

if err := run(ctx); err != nil {
logger.Fatal(err)

Check warning on line 46 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}
}

func run(ctx context.Context) error {
env := environment.ClientConfig{}
env.InitFlags(flag.CommandLine)
flag.Parse()

Check warning on line 53 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L50-L53

Added lines #L50 - L53 were not covered by tests

config, err := env.GetRESTConfig()
if err != nil {
return fmt.Errorf("failed to get kubeconfig: %w", err)

Check warning on line 57 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L55-L57

Added lines #L55 - L57 were not covered by tests
}

deploymentDeleter := &kafkaDeploymentDeleter{
k8s: kubernetes.NewForConfigOrDie(config),

Check warning on line 61 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

if err := deploymentDeleter.DeleteBrokerDeployments(ctx); err != nil {
return fmt.Errorf("broker migration failed: %v", err)

Check warning on line 65 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

return nil

Check warning on line 68 in control-plane/cmd/post-install/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/post-install/main.go#L68

Added line #L68 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ rules:
- watch
- update
- patch
- create
- delete

# Internal APIs
- apiGroups:
Expand Down Expand Up @@ -293,3 +295,4 @@ rules:
- update
- create
- delete

Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,20 @@ metadata:
name: knative-kafka-controller-post-install
labels:
app.kubernetes.io/version: devel
rules: []
rules:
# we need to be able to delete old deployments
- apiGroups:
- "apps"
resources:
- "deployments"
verbs:
- "delete"
# we need to get statefulsets
- apiGroups:
- "apps"
resources:
- "statefulsets"
verbs:
- "get"
- "list"

7 changes: 6 additions & 1 deletion control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
config, err := kafka.GetSaramaConfig(security.NewSaramaSecurityOptionFromSecret(secret), kafka.DisableOffsetAutoCommitConfigOption)
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err

Check warning on line 159 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L159

Added line #L159 was not covered by tests
}

config, err := kafka.GetSaramaConfig(secretOpt, kafka.DisableOffsetAutoCommitConfigOption)
if err != nil {
return nil, err
}
Expand Down
58 changes: 45 additions & 13 deletions control-plane/pkg/reconciler/broker/namespaced_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
ServiceAccountLister corelisters.ServiceAccountLister
ServiceLister corelisters.ServiceLister
ClusterRoleBindingLister rbaclisters.ClusterRoleBindingLister
DeploymentLister appslisters.DeploymentLister
StatefulSetLister appslisters.StatefulSetLister
BrokerLister eventinglisters.BrokerLister

// GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can
Expand Down Expand Up @@ -316,7 +316,7 @@
}
resources = append(resources, additionalConfigMaps...)

additionalDeployments, err := r.deploymentsFromSystemNamespace(broker)
additionalDeployments, err := r.statefulSetsFromSystemNamespace(broker)
if err != nil {
return mf.Manifest{}, err
}
Expand Down Expand Up @@ -366,14 +366,14 @@
return mf.ManifestFrom(mf.Slice(additionalResources), mf.UseClient(r.ManifestivalClient))
}

func (r *NamespacedReconciler) deploymentsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
func (r *NamespacedReconciler) statefulSetsFromSystemNamespace(broker *eventing.Broker) ([]unstructured.Unstructured, error) {
deployments := []string{
"kafka-broker-receiver",
"kafka-broker-dispatcher",
}
resources := make([]unstructured.Unstructured, 0, len(deployments))
for _, name := range deployments {
resource, err := r.createManifestFromSystemDeployment(broker, name)
resource, err := r.createManifestFromSystemStatefulSet(broker, name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -420,21 +420,53 @@
return unstructuredFromObject(cm)
}

func (r *NamespacedReconciler) createManifestFromSystemDeployment(broker *eventing.Broker, name string) (unstructured.Unstructured, error) {
sysDeployment, err := r.DeploymentLister.Deployments(r.SystemNamespace).Get(name)
func (r *NamespacedReconciler) createManifestFromSystemStatefulSet(broker *eventing.Broker, name string) (unstructured.Unstructured, error) {
sysStatefulSet, err := r.StatefulSetLister.StatefulSets(r.SystemNamespace).Get(name)
if err != nil {
return unstructured.Unstructured{}, fmt.Errorf("failed to get Deployment %s/%s: %w", r.SystemNamespace, name, err)
return unstructured.Unstructured{}, fmt.Errorf("failed to get StatefulSet %s/%s: %w", r.SystemNamespace, name, err)
}

cm := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: appsv1.SchemeGroupVersion.String()},
spec := sysStatefulSet.Spec
if spec.Replicas != nil && *spec.Replicas != 1 {
spec.Replicas = pointer.Int32(1)
}

Check warning on line 432 in control-plane/pkg/reconciler/broker/namespaced_broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_broker.go#L432

Added line #L432 was not covered by tests

expectedVolume := corev1.Volume{
Name: "contract-resources",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "kafka-broker-brokers-triggers",
},
},
},
}

foundContractResource := false
for i, volume := range spec.Template.Spec.Volumes {
if volume.Name == "contract-resources" {
foundContractResource = true
if volume.ConfigMap == nil || volume.ConfigMap.Name != "kafka-broker-brokers-triggers" {
spec.Template.Spec.Volumes[i] = expectedVolume
}

Check warning on line 451 in control-plane/pkg/reconciler/broker/namespaced_broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_broker.go#L451

Added line #L451 was not covered by tests
}
}

if !foundContractResource {
// need to add the contract resource volume to the spec
spec.Template.Spec.Volumes = append(spec.Template.Spec.Volumes, expectedVolume)

Check warning on line 458 in control-plane/pkg/reconciler/broker/namespaced_broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_broker.go#L458

Added line #L458 was not covered by tests
}

cm := &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{Kind: "StatefulSet", APIVersion: appsv1.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Namespace: broker.GetNamespace(),
Name: sysDeployment.Name,
Labels: sysDeployment.Labels,
Annotations: sysDeployment.Annotations,
Name: sysStatefulSet.Name,
Labels: sysStatefulSet.Labels,
Annotations: sysStatefulSet.Annotations,
},
Spec: sysDeployment.Spec,
Spec: spec,
}
return unstructuredFromObject(cm)
}
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/broker/namespaced_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env)
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
reconcilertesting.NewDeployment("kafka-broker-receiver", SystemNamespace),
reconcilertesting.NewDeployment("kafka-broker-dispatcher", SystemNamespace),
NewStatefulSet("kafka-broker-receiver", SystemNamespace),
NewStatefulSet("kafka-broker-dispatcher", SystemNamespace),
NewServiceAccount(SystemNamespace, "knative-kafka-broker-data-plane"),
reconcilertesting.NewService("kafka-broker-ingress", SystemNamespace),
NewClusterRoleBinding("knative-kafka-broker-data-plane",
Expand Down Expand Up @@ -190,12 +190,12 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env)
WithNamespacedLabel,
),
ToManifestivalResource(t,
reconcilertesting.NewDeployment("kafka-broker-receiver", BrokerNamespace),
NewStatefulSet("kafka-broker-receiver", BrokerNamespace),
WithNamespacedBrokerOwnerRef,
WithNamespacedLabel,
),
ToManifestivalResource(t,
reconcilertesting.NewDeployment("kafka-broker-dispatcher", BrokerNamespace),
NewStatefulSet("kafka-broker-dispatcher", BrokerNamespace),
WithNamespacedBrokerOwnerRef,
WithNamespacedLabel,
),
Expand Down Expand Up @@ -361,8 +361,8 @@ func namespacedBrokerFinalization(t *testing.T, format string, env config.Env) {
reconcilertesting.NewConfigMap(env.DataPlaneConfigConfigMapName, SystemNamespace),
reconcilertesting.NewConfigMap("config-tracing", SystemNamespace),
reconcilertesting.NewConfigMap("kafka-config-logging", SystemNamespace),
reconcilertesting.NewDeployment("kafka-broker-receiver", SystemNamespace),
reconcilertesting.NewDeployment("kafka-broker-dispatcher", SystemNamespace),
NewStatefulSet("kafka-broker-receiver", SystemNamespace),
NewStatefulSet("kafka-broker-dispatcher", SystemNamespace),
NewServiceAccount(SystemNamespace, "knative-kafka-broker-data-plane"),
reconcilertesting.NewService("kafka-broker-ingress", SystemNamespace),
NewClusterRoleBinding("knative-kafka-broker-data-plane",
Expand Down Expand Up @@ -492,7 +492,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) {
},
NamespaceLister: listers.GetNamespaceLister(),
ConfigMapLister: listers.GetConfigMapLister(),
DeploymentLister: listers.GetDeploymentLister(),
StatefulSetLister: listers.GetStatefulSetLister(),
BrokerLister: listers.GetBrokerLister(),
ServiceAccountLister: listers.GetServiceAccountLister(),
ServiceLister: listers.GetServiceLister(),
Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
namespaceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
Expand Down Expand Up @@ -107,7 +107,7 @@
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
ClusterRoleBindingLister: clusterrolebindinginformer.Get(ctx).Lister(),
DeploymentLister: deploymentinformer.Get(ctx).Lister(),
StatefulSetLister: statefulsetinformer.Get(ctx).Lister(),

Check warning on line 110 in control-plane/pkg/reconciler/broker/namespaced_controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_controller.go#L110

Added line #L110 was not covered by tests
BrokerLister: brokerinformer.Get(ctx).Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
Expand Down Expand Up @@ -173,14 +173,14 @@
globalResync(configMap)
})

deploymentinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
statefulsetinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{

Check warning on line 176 in control-plane/pkg/reconciler/broker/namespaced_controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_controller.go#L176

Added line #L176 was not covered by tests
FilterFunc: kafka.FilterAny(
kafka.FilterWithLabel("app", "kafka-broker-dispatcher"),
kafka.FilterWithLabel("app", "kafka-broker-receiver"),
),
Handler: controller.HandleAll(controller.EnsureTypeMeta(
globalResync,
appsv1.SchemeGroupVersion.WithKind("Deployment"),
appsv1.SchemeGroupVersion.WithKind("StatefulSet"),

Check warning on line 183 in control-plane/pkg/reconciler/broker/namespaced_controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_controller.go#L183

Added line #L183 was not covered by tests
)),
})

Expand Down
Loading
Loading