Skip to content

Commit

Permalink
fix: fix issue of e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
lwpk110 committed Jan 10, 2025
1 parent 7ea4f27 commit 91ab07e
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 19 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
const (
ImageRepository = "quay.io/zncdatadev/kafka"
ImageTag = "3.7.1-kubedoop0.0.0-dev"
ImagePullPolicy = corev1.PullAlways
ImagePullPolicy = corev1.PullIfNotPresent

KubedoopKafkaDataDirName = "data" // kafka log dirs
KubedoopLogConfigDirName = "log-config"
Expand Down
7 changes: 6 additions & 1 deletion internal/controller/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ func NewConfigMap(
}
}
func (c *ConfigMapReconciler) Build(ctx context.Context) (client.Object, error) {
var loggingConfigSpec *kafkav1alpha1.LoggingConfigSpec
if c.MergedCfg.Config != nil && c.MergedCfg.Config.Logging != nil && c.MergedCfg.Config.Logging.Broker != nil {
loggingConfigSpec = c.MergedCfg.Config.Logging.Broker
}

builder := common.ConfigMapBuilder{
Name: common.CreateConfigName(c.Instance.GetName(), c.GroupName),
Namespace: c.Instance.Namespace,
Labels: c.Labels,
ConfigGenerators: []common.ConfigGenerator{
&config.Log4jConfGenerator{LoggingSpec: c.MergedCfg.Config.Logging.Broker, Container: string(common.Kafka)},
&config.Log4jConfGenerator{LoggingSpec: loggingConfigSpec, Container: string(common.Kafka)},
&config.SecurityConfGenerator{},
&config.KafkaServerConfGenerator{KafkaTlsSecurity: c.KafkaTlsSecurity},
// &KafkaConfGenerator{sslSpec: c.MergedCfg.Config.Ssl},
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -128,6 +129,11 @@ func (s *StatefulSetReconciler) makeKafkaContainer() []corev1.Container {
groupSvcName := svc.CreateGroupServiceName(s.Instance.GetName(), s.GroupName)
builder := container.NewKafkaContainerBuilder(imageName, util.ImagePullPolicy(imageSpec), zNode, resourceSpec, s.KafkaTlsSecurity, s.Instance.Namespace, groupSvcName)
kafkaContainer := builder.Build(builder)

// for test
kafkaContainer.SecurityContext = &corev1.SecurityContext{
RunAsUser: ptr.To(int64(0)),
}
return []corev1.Container{
kafkaContainer,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/svc/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewClusterService(
headlessServiceType := common.Service
serviceType := corev1.ServiceTypeNodePort
builder := common.NewServiceBuilder(
CreateGroupServiceName(instance.GetName(), ""),
CreateClusterServiceName(instance.GetName()),
instance.GetNamespace(),
labels,
makePorts(tlsSecurity),
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/svc/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (p *PodServiceReconciler) ServicePorts() []corev1.ServicePort {
{
Name: p.KafkaTlsSecurity.ClientPortName(),
Port: int32(p.KafkaTlsSecurity.ClientPort()),
TargetPort: intstr.FromString(kafkav1alpha1.ClientPortName),
TargetPort: intstr.FromString(p.KafkaTlsSecurity.ClientPortName()),
},
{
Name: kafkav1alpha1.MetricsPortName,
Expand Down
20 changes: 5 additions & 15 deletions internal/security/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package security

import (
"fmt"
"strings"

kafkav1alpha1 "github.com/zncdatadev/kafka-operator/api/v1alpha1"
"github.com/zncdatadev/kafka-operator/internal/util"
Expand Down Expand Up @@ -169,7 +170,6 @@ func (k *KafkaTlsSecurity) KcatClientSsl(certDirectory string) []string {
func (k *KafkaTlsSecurity) AddVolumeAndVolumeMounts(sts *appsv1.StatefulSet) {
kafkaContainer := k.getContainer(sts.Spec.Template.Spec.Containers, "kafka")
if tlsServerSecretClass := k.TlsServerSecretClass(); tlsServerSecretClass != "" {
k.AddVolume(sts, CreateTlsVolume(KubedoopTLSCertServerDirName, tlsServerSecretClass, k.SSLStorePassword))
// cbKcatProber.AddVolumeMount(KubedoopTLSCertServerDirName, KubedoopTLSCertServerDir) todo
k.AddVolume(sts, CreateTlsKeystoreVolume(KubedoopTLSKeyStoreServerDirName, tlsServerSecretClass, k.SSLStorePassword))
k.AddVolumeMount(kafkaContainer, KubedoopTLSKeyStoreServerDirName, KubedoopTLSKeyStoreServerDir)
Expand Down Expand Up @@ -235,28 +235,18 @@ func (k *KafkaTlsSecurity) ConfigSettings() map[string]string {
config[InterSSLClientAuth] = "required"
}
// common
config[InterBrokerListenerName] = "internal"
config[InterBrokerListenerName] = "INTERNAL"
return config
}

func CreateTlsVolume(volumeName, secretClass, sslStorePassword string) corev1.Volume {
builder := util.SecretVolumeBuilder{VolumeName: volumeName}
builder.SetAnnotations(map[string]string{
constants.AnnotationSecretsClass: secretClass,
constants.AnnotationSecretsScope: fmt.Sprintf("%s,%s", constants.PodScope, constants.NodeScope),
})
if sslStorePassword != "" {
builder.AddAnnotation(constants.AnnotationSecretsPKCS12Password, sslStorePassword)
}
return builder.Build()
}

// // CreateTlsKeystoreVolume creates ephemeral volumes to mount the SecretClass into the Pods as keystores
func CreateTlsKeystoreVolume(volumeName, secretClass, sslStorePassword string) corev1.Volume {
builder := util.SecretVolumeBuilder{VolumeName: volumeName}
svcScope := fmt.Sprintf("%s=%s", constants.ServiceScope, "kafkacluster-sample")
secretScopes := []string{svcScope, string(constants.PodScope), string(constants.NodeScope)}
builder.SetAnnotations(map[string]string{
constants.AnnotationSecretsClass: secretClass,
constants.AnnotationSecretsScope: fmt.Sprintf("%s,%s", constants.PodScope, constants.NodeScope),
constants.AnnotationSecretsScope: strings.Join(secretScopes, constants.CommonDelimiter),
constants.AnnotationSecretsFormat: string(constants.TLSP12),
})
if sslStorePassword != "" {
Expand Down

0 comments on commit 91ab07e

Please sign in to comment.