diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index fa45f52966..2a9d3a51bb 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "fmt" "math" + "sort" "github.com/rickb777/date/period" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -124,6 +125,13 @@ func TrustBundles(lister corev1listers.ConfigMapNamespaceLister) ([]string, erro } } } + if len(trustBundles) == 0 { + return nil, nil + } + + sort.SliceStable(trustBundles, func(i, j int) bool { + return trustBundles[i] < trustBundles[j] + }) return trustBundles, nil } diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 20cc869727..37d3e746d9 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -172,7 +173,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct)) - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -190,7 +192,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Change detector", zap.Int("changed", changed)) - if changed == coreconfig.ResourceChanged { + if changed == coreconfig.ResourceChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) @@ -741,11 +743,15 @@ func (r *Reconciler) getCaCerts() (*string, error) { return pointer.String(string(caCerts)), nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 2d2cb918b2..b07c6cc6a2 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -236,7 +236,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta } logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct)) - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -256,7 +257,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta changed := coreconfig.AddOrUpdateResourceConfig(ct, channelResource, channelIndex, logger) logger.Debug("Change detector", zap.Int("changed", changed)) - if changed == coreconfig.ResourceChanged || subscribersChanged == coreconfig.EgressChanged { + if changed == coreconfig.ResourceChanged || subscribersChanged == coreconfig.EgressChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) @@ -808,11 +809,15 @@ func findSubscriptionStatus(kc *messagingv1beta1.KafkaChannel, subUID types.UID) return nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index eda4cc81bc..3980524170 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -383,11 +384,12 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai return false, fmt.Errorf("failed to get contract from ConfigMap %s/%s: %w", p.GetNamespace(), cmName, err) } - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return false, fmt.Errorf("failed to set trust bundles: %w", err) } - if changed := mutatorFunc(logger, ct, c); changed == coreconfig.ResourceChanged { + if changed := mutatorFunc(logger, ct, c); changed == coreconfig.ResourceChanged || trustBundlesChanged { logger.Debug("Contract changed", zap.Int("changed", changed)) ct.IncrementGeneration() @@ -447,11 +449,15 @@ func FalseAnyStatus(*corev1.Pod) bool { return false } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.TrustBundleConfigMapLister) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil } diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index a77d09b915..dc406ba185 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -23,6 +23,7 @@ import ( "github.com/IBM/sarama" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/retry" @@ -174,7 +175,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) ct = &contract.Contract{} } - if err := r.setTrustBundles(ct); err != nil { + trustBundlesChanged, err := r.setTrustBundles(ct) + if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -222,7 +224,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) // Update contract data with the new sink configuration. changed := coreconfig.AddOrUpdateResourceConfig(ct, sinkConfig, sinkIndex, logger) - if changed == coreconfig.ResourceChanged { + if changed == coreconfig.ResourceChanged || trustBundlesChanged { // Resource changed, increment contract generation. coreconfig.IncrementContractGeneration(ct) // Update the configuration map with the new contract data. @@ -443,11 +445,15 @@ func (r *Reconciler) getCaCerts() (*string, error) { return pointer.String(string(caCerts)), nil } -func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { +func (r *Reconciler) setTrustBundles(ct *contract.Contract) (bool, error) { tb, err := coreconfig.TrustBundles(r.ConfigMapLister.ConfigMaps(r.SystemNamespace)) if err != nil { - return fmt.Errorf("failed to get trust bundles: %w", err) + return false, fmt.Errorf("failed to get trust bundles: %w", err) + } + changed := false + if !equality.Semantic.DeepEqual(tb, ct.TrustBundles) { + changed = true } ct.TrustBundles = tb - return nil + return changed, nil }