Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: expose with empty componentName #6712

Merged
merged 5 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,13 @@ type Expose struct {

// Controls the expose operation.
// If set to Enable, the corresponding service will be exposed. Conversely, if set to Disable, the service will be removed.
//
// +kubebuilder:validation:Required
Switch ExposeSwitch `json:"switch"`

// A list of services that are to be exposed or removed.
// If componentNamem is not specified, each `OpsService` in the list must specify ports and selectors.
//
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minitems=0
Services []OpsService `json:"services"`
Expand Down
33 changes: 33 additions & 0 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,43 @@ func (r *OpsRequest) validateOps(ctx context.Context,
return r.validateSwitchover(ctx, k8sClient, cluster)
case DataScriptType:
return r.validateDataScript(ctx, k8sClient, cluster)
case ExposeType:
return r.validateExpose(ctx, cluster)
}
return nil
}

// validateExpose validates expose api when spec.type is Expose
func (r *OpsRequest) validateExpose(ctx context.Context, cluster *Cluster) error {
exposeList := r.Spec.ExposeList
if exposeList == nil {
return notEmptyError("spec.expose")
}

// compNames := make([]string, len(exposeList))
var componentNames []string
counter := 0
for _, v := range exposeList {
if len(v.ComponentName) > 0 {
componentNames = append(componentNames, v.ComponentName)
continue
} else {
counter++
}
if counter > 1 {
return fmt.Errorf("at most one spec.expose.componentName can be empty")
}
if v.Switch == EnableExposeSwitch {
for _, opssvc := range v.Services {
if len(opssvc.Ports) == 0 {
return fmt.Errorf("spec.expose.services.ports must be specified when componentName is empty")
}
}
}
}
return r.checkComponentExistence(cluster, componentNames)
}

// validateUpgrade validates spec.restart
func (r *OpsRequest) validateRestart(cluster *Cluster) error {
restartList := r.Spec.RestartList
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ spec:
type: string
services:
description: A list of services that are to be exposed or removed.
If componentNamem is not specified, each `OpsService` in the
list must specify ports and selectors.
items:
properties:
annotations:
Expand Down
100 changes: 66 additions & 34 deletions controllers/apps/operations/expose.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,33 @@ func (e ExposeOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clien
exposeMap = opsRes.OpsRequest.Spec.ToExposeListToMap()
)
reqCtx.Log.Info("cluster service before action", "clusterService", opsRes.Cluster.Spec.Services)
for _, clusterCompSpec := range opsRes.Cluster.Spec.ComponentSpecs {
expose, ok := exposeMap[clusterCompSpec.Name]
if !ok {
continue

compMap := make(map[string]appsv1alpha1.ClusterComponentSpec)
for _, comp := range opsRes.Cluster.Spec.ComponentSpecs {
compMap[comp.Name] = comp
}

for _, expose := range exposeMap {
clusterCompSpecName := ""
clusterCompDef := ""
clusterCompDefRefName := ""
if len(expose.ComponentName) > 0 {
clusterCompSpec, ok := compMap[expose.ComponentName]
if !ok {
return fmt.Errorf("component spec not found: %s", expose.ComponentName)
}
clusterCompSpecName = clusterCompSpec.Name
clusterCompDef = clusterCompSpec.ComponentDef
clusterCompDefRefName = clusterCompSpec.ComponentDefRef
}

switch expose.Switch {
case appsv1alpha1.EnableExposeSwitch:
if err := e.buildClusterServices(reqCtx, cli, opsRes.Cluster, &clusterCompSpec, expose.Services); err != nil {
if err := e.buildClusterServices(reqCtx, cli, opsRes.Cluster, clusterCompSpecName, clusterCompDef, clusterCompDefRefName, expose.Services); err != nil {
return err
}
case appsv1alpha1.DisableExposeSwitch:
if err := e.removeClusterServices(opsRes.Cluster, &clusterCompSpec, expose.Services); err != nil {
if err := e.removeClusterServices(opsRes.Cluster, clusterCompSpecName, expose.Services); err != nil {
return err
}
default:
Expand Down Expand Up @@ -172,12 +187,16 @@ func (e ExposeOpsHandler) handleComponentServices(reqCtx intctrlutil.RequestCtx,
continue
}

for _, ingress := range service.Status.LoadBalancer.Ingress {
if ingress.Hostname == "" && ingress.IP == "" {
continue
if item.ServiceType == corev1.ServiceTypeLoadBalancer {
for _, ingress := range service.Status.LoadBalancer.Ingress {
if ingress.Hostname == "" && ingress.IP == "" {
continue
}
actualCount += 1
break
}
} else {
actualCount += 1
break
}
}
}
Expand Down Expand Up @@ -222,16 +241,16 @@ func (e ExposeOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.RequestCtx, c
}

func (e ExposeOpsHandler) removeClusterServices(cluster *appsv1alpha1.Cluster,
clusterCompSpec *appsv1alpha1.ClusterComponentSpec,
clusterCompSpecName string,
exposeServices []appsv1alpha1.OpsService) error {
if cluster == nil || clusterCompSpec == nil || len(exposeServices) == 0 {
if cluster == nil || len(exposeServices) == 0 {
return nil
}
for _, exposeService := range exposeServices {
genServiceName := fmt.Sprintf("%s-%s", clusterCompSpec.Name, exposeService.Name)
genServiceName := generateServiceName(clusterCompSpecName, exposeService.Name)
for i, clusterService := range cluster.Spec.Services {
// remove service from cluster
if clusterService.Name == genServiceName && clusterService.ComponentSelector == clusterCompSpec.Name {
if clusterService.Name == genServiceName && clusterService.ComponentSelector == clusterCompSpecName {
cluster.Spec.Services = append(cluster.Spec.Services[:i], cluster.Spec.Services[i+1:]...)
break
}
Expand All @@ -243,21 +262,25 @@ func (e ExposeOpsHandler) removeClusterServices(cluster *appsv1alpha1.Cluster,
func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
cli client.Client,
cluster *appsv1alpha1.Cluster,
clusterCompSpec *appsv1alpha1.ClusterComponentSpec,
clusterCompSpecName string,
clusterCompDefName string,
clusterCompDefRefName string,
exposeServices []appsv1alpha1.OpsService) error {
if cluster == nil || clusterCompSpec == nil || len(exposeServices) == 0 {
if cluster == nil || len(exposeServices) == 0 {
return nil
}

checkServiceExist := func(exposeService appsv1alpha1.OpsService) bool {
if len(cluster.Spec.Services) == 0 {
return false
}

genServiceName := generateServiceName(clusterCompSpecName, exposeService.Name)

for _, clusterService := range cluster.Spec.Services {
if clusterService.ComponentSelector != clusterCompSpec.Name {
if clusterService.ComponentSelector != clusterCompSpecName {
continue
}
genServiceName := fmt.Sprintf("%s-%s", clusterCompSpec.Name, exposeService.Name)
if clusterService.Name == genServiceName {
return true
}
Expand All @@ -267,7 +290,7 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,

convertDefaultCompDefServicePorts := func(compServices []appsv1alpha1.ComponentService) ([]corev1.ServicePort, error) {
if len(compServices) == 0 {
return nil, fmt.Errorf("component service is not defined, expose operation is not supported, cluster: %s, component: %s", cluster.Name, clusterCompSpec.Name)
return nil, fmt.Errorf("component service is not defined, expose operation is not supported, cluster: %s, component: %s", cluster.Name, clusterCompSpecName)
}
defaultServicePorts := make([]corev1.ServicePort, 0, len(compServices))
portsSet := make(map[string]bool) // to avoid duplicate ports
Expand All @@ -293,36 +316,36 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
}
}
if len(defaultServicePorts) == 0 {
return nil, fmt.Errorf("component does not define an available service, expose operation is not supported, cluster: %s, component: %s", cluster.Name, clusterCompSpec.Name)
return nil, fmt.Errorf("component does not define an available service, expose operation is not supported, cluster: %s, component: %s", cluster.Name, clusterCompSpecName)
}
return defaultServicePorts, nil
}

defaultServicePortsFunc := func() ([]corev1.ServicePort, error) {
if clusterCompSpec.ComponentDef != "" {
compDef, err := component.GetCompDefinition(reqCtx, cli, cluster, clusterCompSpec.Name)
if clusterCompDefName != "" {
compDef, err := component.GetCompDefinition(reqCtx, cli, cluster, clusterCompSpecName)
if err != nil {
return nil, err
}
return convertDefaultCompDefServicePorts(compDef.Spec.Services)
}
if cluster.Spec.ClusterDefRef != "" && clusterCompSpec.ComponentDefRef != "" {
if cluster.Spec.ClusterDefRef != "" && clusterCompDefRefName != "" {
clusterDef, err := getClusterDefByName(reqCtx.Ctx, cli, cluster.Spec.ClusterDefRef)
if err != nil {
return nil, err
}
clusterCompDef := clusterDef.GetComponentDefByName(clusterCompSpec.ComponentDefRef)
clusterCompDef := clusterDef.GetComponentDefByName(clusterCompDefRefName)
if clusterCompDef == nil || clusterCompDef.Service == nil {
return nil, fmt.Errorf("referenced cluster component definition or services is not defined: %s", clusterCompSpec.ComponentDefRef)
return nil, fmt.Errorf("referenced cluster component definition or services is not defined: %s", clusterCompDefRefName)
}
return clusterCompDef.Service.ToSVCPorts(), nil
}
return nil, fmt.Errorf("component definition is not defined, cluster: %s, component: %s", cluster.Name, clusterCompSpec.Name)
return nil, fmt.Errorf("component definition is not defined, cluster: %s, component: %s", cluster.Name, clusterCompSpecName)
}

defaultRoleSelectorFunc := func() (string, error) {
if clusterCompSpec.ComponentDef != "" {
compDef, err := component.GetCompDefinition(reqCtx, cli, cluster, clusterCompSpec.Name)
if clusterCompDefName != "" {
compDef, err := component.GetCompDefinition(reqCtx, cli, cluster, clusterCompSpecName)
if err != nil {
return "", err
}
Expand All @@ -336,14 +359,14 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
}
return "", nil
}
if cluster.Spec.ClusterDefRef != "" && clusterCompSpec.ComponentDefRef != "" {
if cluster.Spec.ClusterDefRef != "" && clusterCompDefRefName != "" {
clusterDef, err := getClusterDefByName(reqCtx.Ctx, cli, cluster.Spec.ClusterDefRef)
if err != nil {
return "", err
}
clusterCompDef := clusterDef.GetComponentDefByName(clusterCompSpec.ComponentDefRef)
clusterCompDef := clusterDef.GetComponentDefByName(clusterCompDefRefName)
if clusterCompDef == nil {
return "", fmt.Errorf("referenced cluster component definition is not defined: %s", clusterCompSpec.ComponentDefRef)
return "", fmt.Errorf("referenced cluster component definition is not defined: %s", clusterCompDefRefName)
}
switch clusterCompDef.WorkloadType {
case appsv1alpha1.Replication:
Expand All @@ -363,7 +386,9 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
reqCtx.Log.Info("cluster service already exists, skip", "service", exposeService.Name)
continue
}
genServiceName := fmt.Sprintf("%s-%s", clusterCompSpec.Name, exposeService.Name)

genServiceName := generateServiceName(clusterCompSpecName, exposeService.Name)

clusterService := appsv1alpha1.ClusterService{
Service: appsv1alpha1.Service{
Name: genServiceName,
Expand All @@ -373,7 +398,7 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
Type: exposeService.ServiceType,
},
},
ComponentSelector: clusterCompSpec.Name,
ComponentSelector: clusterCompSpecName,
}

// set service selector
Expand All @@ -395,7 +420,7 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
// set role selector
if len(exposeService.RoleSelector) != 0 {
clusterService.RoleSelector = exposeService.RoleSelector
} else {
} else if len(clusterCompSpecName) > 0 {
defaultRoleSelector, err := defaultRoleSelectorFunc()
if err != nil {
return err
Expand All @@ -408,3 +433,10 @@ func (e ExposeOpsHandler) buildClusterServices(reqCtx intctrlutil.RequestCtx,
}
return nil
}

func generateServiceName(clusterCompSpecName, exposeServiceName string) string {
if len(clusterCompSpecName) > 0 {
return fmt.Sprintf("%s-%s", clusterCompSpecName, exposeServiceName)
}
return exposeServiceName
}
45 changes: 45 additions & 0 deletions controllers/apps/operations/expose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -102,5 +103,49 @@ var _ = Describe("", func() {
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
})

It("Test expose OpsRequest with empty ComponentName", func() {
reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx}
opsRes, _, clusterObject := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)

By("create Expose opsRequest")
ops := testapps.NewOpsRequestObj("expose-expose-"+randomStr, testCtx.DefaultNamespace,
clusterObject.Name, appsv1alpha1.ExposeType)
ops.Spec.ExposeList = []appsv1alpha1.Expose{
{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: ""},
Switch: appsv1alpha1.EnableExposeSwitch,
Services: []appsv1alpha1.OpsService{
{
Name: testapps.ServiceVPCName,
ServiceType: corev1.ServiceTypeLoadBalancer,
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt(80),
},
},
},
},
},
}
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
// set ops phase to Pending
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase

By("mock expose OpsRequest phase is Creating")
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsCreatingPhase))

// do expose action
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("Test OpsManager.MainEnter function")
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
})
})
})
2 changes: 2 additions & 0 deletions deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ spec:
type: string
services:
description: A list of services that are to be exposed or removed.
If componentNamem is not specified, each `OpsService` in the
list must specify ports and selectors.
items:
properties:
annotations:
Expand Down
5 changes: 3 additions & 2 deletions pkg/lorry/engines/dbmanager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading