From 91ab07e3420a51f3b12c0942a2aa06e24e036278 Mon Sep 17 00:00:00 2001 From: lwpk110 Date: Fri, 10 Jan 2025 16:23:06 +0800 Subject: [PATCH] fix: fix issue of e2e --- api/v1alpha1/kafkacluster_types.go | 2 +- internal/controller/configmap.go | 7 ++++++- internal/controller/statefulset.go | 6 ++++++ internal/controller/svc/cluster.go | 2 +- internal/controller/svc/pod.go | 2 +- internal/security/tls.go | 20 +++++--------------- 6 files changed, 20 insertions(+), 19 deletions(-) diff --git a/api/v1alpha1/kafkacluster_types.go b/api/v1alpha1/kafkacluster_types.go index 4da31d4..6d7c8ab 100644 --- a/api/v1alpha1/kafkacluster_types.go +++ b/api/v1alpha1/kafkacluster_types.go @@ -46,7 +46,7 @@ const ( const ( ImageRepository = "quay.io/zncdatadev/kafka" ImageTag = "3.7.1-kubedoop0.0.0-dev" - ImagePullPolicy = corev1.PullAlways + ImagePullPolicy = corev1.PullIfNotPresent KubedoopKafkaDataDirName = "data" // kafka log dirs KubedoopLogConfigDirName = "log-config" diff --git a/internal/controller/configmap.go b/internal/controller/configmap.go index f8434a3..ad8c4f4 100644 --- a/internal/controller/configmap.go +++ b/internal/controller/configmap.go @@ -44,12 +44,17 @@ func NewConfigMap( } } func (c *ConfigMapReconciler) Build(ctx context.Context) (client.Object, error) { + var loggingConfigSpec *kafkav1alpha1.LoggingConfigSpec + if c.MergedCfg.Config != nil && c.MergedCfg.Config.Logging != nil && c.MergedCfg.Config.Logging.Broker != nil { + loggingConfigSpec = c.MergedCfg.Config.Logging.Broker + } + builder := common.ConfigMapBuilder{ Name: common.CreateConfigName(c.Instance.GetName(), c.GroupName), Namespace: c.Instance.Namespace, Labels: c.Labels, ConfigGenerators: []common.ConfigGenerator{ - &config.Log4jConfGenerator{LoggingSpec: c.MergedCfg.Config.Logging.Broker, Container: string(common.Kafka)}, + &config.Log4jConfGenerator{LoggingSpec: loggingConfigSpec, Container: string(common.Kafka)}, &config.SecurityConfGenerator{}, &config.KafkaServerConfGenerator{KafkaTlsSecurity: c.KafkaTlsSecurity}, // &KafkaConfGenerator{sslSpec: c.MergedCfg.Config.Ssl}, diff --git a/internal/controller/statefulset.go b/internal/controller/statefulset.go index c1e0806..dd87250 100644 --- a/internal/controller/statefulset.go +++ b/internal/controller/statefulset.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -128,6 +129,11 @@ func (s *StatefulSetReconciler) makeKafkaContainer() []corev1.Container { groupSvcName := svc.CreateGroupServiceName(s.Instance.GetName(), s.GroupName) builder := container.NewKafkaContainerBuilder(imageName, util.ImagePullPolicy(imageSpec), zNode, resourceSpec, s.KafkaTlsSecurity, s.Instance.Namespace, groupSvcName) kafkaContainer := builder.Build(builder) + + // for test + kafkaContainer.SecurityContext = &corev1.SecurityContext{ + RunAsUser: ptr.To(int64(0)), + } return []corev1.Container{ kafkaContainer, } diff --git a/internal/controller/svc/cluster.go b/internal/controller/svc/cluster.go index b621c02..bfb3f14 100644 --- a/internal/controller/svc/cluster.go +++ b/internal/controller/svc/cluster.go @@ -21,7 +21,7 @@ func NewClusterService( headlessServiceType := common.Service serviceType := corev1.ServiceTypeNodePort builder := common.NewServiceBuilder( - CreateGroupServiceName(instance.GetName(), ""), + CreateClusterServiceName(instance.GetName()), instance.GetNamespace(), labels, makePorts(tlsSecurity), diff --git a/internal/controller/svc/pod.go b/internal/controller/svc/pod.go index 084f5c4..cba44fa 100644 --- a/internal/controller/svc/pod.go +++ b/internal/controller/svc/pod.go @@ -133,7 +133,7 @@ func (p *PodServiceReconciler) ServicePorts() []corev1.ServicePort { { Name: p.KafkaTlsSecurity.ClientPortName(), Port: int32(p.KafkaTlsSecurity.ClientPort()), - TargetPort: intstr.FromString(kafkav1alpha1.ClientPortName), + TargetPort: intstr.FromString(p.KafkaTlsSecurity.ClientPortName()), }, { Name: kafkav1alpha1.MetricsPortName, diff --git a/internal/security/tls.go b/internal/security/tls.go index 4c94195..6b06cf9 100644 --- a/internal/security/tls.go +++ b/internal/security/tls.go @@ -2,6 +2,7 @@ package security import ( "fmt" + "strings" kafkav1alpha1 "github.com/zncdatadev/kafka-operator/api/v1alpha1" "github.com/zncdatadev/kafka-operator/internal/util" @@ -169,7 +170,6 @@ func (k *KafkaTlsSecurity) KcatClientSsl(certDirectory string) []string { func (k *KafkaTlsSecurity) AddVolumeAndVolumeMounts(sts *appsv1.StatefulSet) { kafkaContainer := k.getContainer(sts.Spec.Template.Spec.Containers, "kafka") if tlsServerSecretClass := k.TlsServerSecretClass(); tlsServerSecretClass != "" { - k.AddVolume(sts, CreateTlsVolume(KubedoopTLSCertServerDirName, tlsServerSecretClass, k.SSLStorePassword)) // cbKcatProber.AddVolumeMount(KubedoopTLSCertServerDirName, KubedoopTLSCertServerDir) todo k.AddVolume(sts, CreateTlsKeystoreVolume(KubedoopTLSKeyStoreServerDirName, tlsServerSecretClass, k.SSLStorePassword)) k.AddVolumeMount(kafkaContainer, KubedoopTLSKeyStoreServerDirName, KubedoopTLSKeyStoreServerDir) @@ -235,28 +235,18 @@ func (k *KafkaTlsSecurity) ConfigSettings() map[string]string { config[InterSSLClientAuth] = "required" } // common - config[InterBrokerListenerName] = "internal" + config[InterBrokerListenerName] = "INTERNAL" return config } -func CreateTlsVolume(volumeName, secretClass, sslStorePassword string) corev1.Volume { - builder := util.SecretVolumeBuilder{VolumeName: volumeName} - builder.SetAnnotations(map[string]string{ - constants.AnnotationSecretsClass: secretClass, - constants.AnnotationSecretsScope: fmt.Sprintf("%s,%s", constants.PodScope, constants.NodeScope), - }) - if sslStorePassword != "" { - builder.AddAnnotation(constants.AnnotationSecretsPKCS12Password, sslStorePassword) - } - return builder.Build() -} - // // CreateTlsKeystoreVolume creates ephemeral volumes to mount the SecretClass into the Pods as keystores func CreateTlsKeystoreVolume(volumeName, secretClass, sslStorePassword string) corev1.Volume { builder := util.SecretVolumeBuilder{VolumeName: volumeName} + svcScope := fmt.Sprintf("%s=%s", constants.ServiceScope, "kafkacluster-sample") + secretScopes := []string{svcScope, string(constants.PodScope), string(constants.NodeScope)} builder.SetAnnotations(map[string]string{ constants.AnnotationSecretsClass: secretClass, - constants.AnnotationSecretsScope: fmt.Sprintf("%s,%s", constants.PodScope, constants.NodeScope), + constants.AnnotationSecretsScope: strings.Join(secretScopes, constants.CommonDelimiter), constants.AnnotationSecretsFormat: string(constants.TLSP12), }) if sslStorePassword != "" {