diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index fa45f52966..2a9d3a51bb 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "fmt" "math" + "sort" "github.com/rickb777/date/period" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -124,6 +125,13 @@ func TrustBundles(lister corev1listers.ConfigMapNamespaceLister) ([]string, erro } } } + if len(trustBundles) == 0 { + return nil, nil + } + + sort.SliceStable(trustBundles, func(i, j int) bool { + return trustBundles[i] < trustBundles[j] + }) return trustBundles, nil } diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 269a00677c..7e0c89f943 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/api/equality" "knative.dev/eventing/pkg/auth" "knative.dev/pkg/logging" @@ -170,7 +171,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct)) - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -188,7 +190,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Change detector", zap.Int("changed", changed)) - if changed == coreconfig.ResourceChanged { + if changed == coreconfig.ResourceChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) @@ -735,11 +737,15 @@ func (r *Reconciler) getCaCerts() (*string, error) { return pointer.String(string(caCerts)), nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 60a2a88caa..c2c55cf774 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -221,7 +221,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta } logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct)) - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -240,7 +241,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta changed := coreconfig.AddOrUpdateResourceConfig(ct, channelResource, channelIndex, logger) logger.Debug("Change detector", zap.Int("changed", changed)) - if changed == coreconfig.ResourceChanged || subscribersChanged == coreconfig.EgressChanged { + if changed == coreconfig.ResourceChanged || subscribersChanged == coreconfig.EgressChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) @@ -782,11 +783,15 @@ func findSubscriptionStatus(kc *messagingv1beta1.KafkaChannel, subUID types.UID) return nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index c27d4d93f3..b7ee94fd41 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -401,11 +402,12 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai return false, fmt.Errorf("failed to get contract from ConfigMap %s/%s: %w", p.GetNamespace(), cmName, err) } - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return false, fmt.Errorf("failed to set trust bundles: %w", err) } - if changed := mutatorFunc(logger, ct, c); changed == coreconfig.ResourceChanged { + if changed := mutatorFunc(logger, ct, c); changed == coreconfig.ResourceChanged || trustBundlesChanged { logger.Debug("Contract changed", zap.Int("changed", changed)) ct.IncrementGeneration() @@ -465,13 +467,17 @@ func FalseAnyStatus(*corev1.Pod) bool { return false } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.TrustBundleConfigMapLister) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } type PodStatusSummary struct { diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 74bf4d7a8b..40bb6e1cca 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -23,6 +23,7 @@ import ( "github.com/IBM/sarama" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/retry" @@ -166,7 +167,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) ct = &contract.Contract{} } - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -214,7 +216,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) // Update contract data with the new sink configuration. changed := coreconfig.AddOrUpdateResourceConfig(ct, sinkConfig, sinkIndex, logger) - if changed == coreconfig.ResourceChanged { + if changed == coreconfig.ResourceChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) // Update the configuration map with the new contract data. @@ -425,11 +427,15 @@ func (r *Reconciler) getCaCerts() (*string, error) { return pointer.String(string(caCerts)), nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil }