Skip to content

Commit

Permalink
chore: remove the sharding selector of cluster service (#8210)
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf authored Sep 26, 2024
1 parent beedc37 commit 54328df
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 238 deletions.
10 changes: 2 additions & 8 deletions apis/apps/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,15 +606,9 @@ type ShardingSpec struct {
type ClusterService struct {
Service `json:",inline"`

// Extends the ServiceSpec.Selector by allowing the specification of a sharding name, which is defined in
// `cluster.spec.shardingSpecs[*].name`, to be used as a selector for the service.
// Note that this and the `componentSelector` are mutually exclusive and cannot be set simultaneously.
// Extends the ServiceSpec.Selector by allowing the specification of components, to be used as a selector for the service.
//
// +optional
ShardingSelector string `json:"shardingSelector,omitempty"`

// Extends the ServiceSpec.Selector by allowing the specification of a component, to be used as a selector for the service.
// Note that this and the `shardingSelector` are mutually exclusive and cannot be set simultaneously.
// If the `componentSelector` is set as the name of a sharding, the service will be exposed to all components in the sharding.
//
// +optional
ComponentSelector string `json:"componentSelector,omitempty"`
Expand Down
12 changes: 4 additions & 8 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8350,8 +8350,10 @@ spec:
type: object
componentSelector:
description: |-
Extends the ServiceSpec.Selector by allowing the specification of a component, to be used as a selector for the service.
Note that this and the `shardingSelector` are mutually exclusive and cannot be set simultaneously.
Extends the ServiceSpec.Selector by allowing the specification of components, to be used as a selector for the service.


If the `componentSelector` is set as the name of a sharding, the service will be exposed to all components in the sharding.
type: string
name:
description: |-
Expand Down Expand Up @@ -8390,12 +8392,6 @@ spec:
maxLength: 25
pattern: ^[a-z]([a-z0-9\-]*[a-z0-9])?$
type: string
shardingSelector:
description: |-
Extends the ServiceSpec.Selector by allowing the specification of a sharding name, which is defined in
`cluster.spec.shardingSpecs[*].name`, to be used as a selector for the service.
Note that this and the `componentSelector` are mutually exclusive and cannot be set simultaneously.
type: string
spec:
description: |-
Spec defines the behavior of a service.
Expand Down
56 changes: 20 additions & 36 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ var _ = Describe("Cluster Controller", func() {
svcType corev1.ServiceType
}

validateClusterServiceList := func(g Gomega, expectServices map[string]expectService, compName string, shardCount *int, enableShardOrdinal bool) {
validateClusterServiceList := func(g Gomega, expectServices map[string]expectService, compName string, shardCount *int) {
svcList := &corev1.ServiceList{}
g.Expect(testCtx.Cli.List(testCtx.Ctx, svcList, client.MatchingLabels{
constant.AppInstanceLabelKey: clusterKey.Name,
Expand All @@ -553,9 +553,6 @@ var _ = Describe("Cluster Controller", func() {
g.Expect(svc.Spec.ClusterIP).ShouldNot(Equal(corev1.ClusterIPNone))
case svc.Spec.Type == corev1.ServiceTypeClusterIP && len(svcSpec.clusterIP) != 0:
g.Expect(svc.Spec.ClusterIP).Should(Equal(corev1.ClusterIPNone))
// for _, port := range getHeadlessSvcPorts(g, compDefName) {
// g.Expect(slices.Index(svc.Spec.Ports, port) >= 0).Should(BeTrue())
// }
}
}

Expand All @@ -571,20 +568,16 @@ var _ = Describe("Cluster Controller", func() {
}
g.Expect(len(expectServices)).Should(Equal(len(services)))
} else {
if enableShardOrdinal {
g.Expect(len(expectServices) * *shardCount).Should(Equal(len(services)))
} else {
for svcName, svcSpec := range expectServices {
idx := slices.IndexFunc(services, func(e *corev1.Service) bool {
return e.Name == constant.GenerateClusterServiceName(clusterObj.Name, svcName)
})
g.Expect(idx >= 0).To(BeTrue())
svc := services[idx]
g.Expect(svc.Spec.Selector).Should(HaveKeyWithValue(constant.KBAppShardingNameLabelKey, compName))
validateSvc(svc, svcSpec)
}
g.Expect(len(expectServices)).Should(Equal(len(services)))
for svcName, svcSpec := range expectServices {
idx := slices.IndexFunc(services, func(e *corev1.Service) bool {
return e.Name == constant.GenerateClusterServiceName(clusterObj.Name, svcName)
})
g.Expect(idx >= 0).To(BeTrue())
svc := services[idx]
g.Expect(svc.Spec.Selector).Should(HaveKeyWithValue(constant.KBAppShardingNameLabelKey, compName))
validateSvc(svc, svcSpec)
}
g.Expect(len(expectServices)).Should(Equal(len(services)))
}
}

Expand Down Expand Up @@ -635,7 +628,7 @@ var _ = Describe("Cluster Controller", func() {
Expect(testCtx.CheckedCreateObj(testCtx.Ctx, svcObj)).Should(Succeed())

By("check all services created")
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil, false) }).Should(Succeed())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil) }).Should(Succeed())

By("delete a cluster service")
delete(expectServices, deleteService.Name)
Expand All @@ -650,14 +643,14 @@ var _ = Describe("Cluster Controller", func() {
})()).ShouldNot(HaveOccurred())

By("check the service has been deleted, and the non-managed service has not been deleted")
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil, false) }).Should(Succeed())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil) }).Should(Succeed())

By("add the deleted service back")
expectServices[deleteService.Name] = expectService{deleteService.Spec.ClusterIP, deleteService.Spec.Type}
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1.Cluster) {
cluster.Spec.Services = append(cluster.Spec.Services, deleteService)
})()).ShouldNot(HaveOccurred())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil, false) }).Should(Succeed())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compName, nil) }).Should(Succeed())
}

testShardingClusterServiceCreateAndDelete := func(compTplName, compDefName string, createObj func(string, string, func(*testapps.MockClusterFactory))) {
Expand All @@ -680,7 +673,7 @@ var _ = Describe("Cluster Controller", func() {
ClusterIP: svc.clusterIP,
},
},
ShardingSelector: compTplName,
ComponentSelector: compTplName,
})
}
createObj(compTplName, compDefName, func(f *testapps.MockClusterFactory) {
Expand All @@ -690,19 +683,10 @@ var _ = Describe("Cluster Controller", func() {
shards := defaultShardCount
deleteService := services[0]

By("check only one service created for each shard when ShardSvcAnnotationKey is not set")
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards, false) }).Should(Succeed())

By("check shards number services were created for each shard when ShardSvcAnnotationKey is set")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1.Cluster) {
if cluster.Annotations == nil {
cluster.Annotations = map[string]string{}
}
cluster.Annotations[constant.ShardSvcAnnotationKey] = compTplName
})()).ShouldNot(HaveOccurred())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards, true) }).Should(Succeed())
By("check service created for sharding")
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards) }).Should(Succeed())

By("delete a cluster shard service")
By("delete a sharding cluster service")
delete(expectServices, deleteService.Name)
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1.Cluster) {
var svcs []appsv1.ClusterService
Expand All @@ -715,14 +699,14 @@ var _ = Describe("Cluster Controller", func() {
})()).ShouldNot(HaveOccurred())

By("check the service has been deleted, and the non-managed service has not been deleted")
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards, true) }).Should(Succeed())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards) }).Should(Succeed())

By("add the deleted service back")
expectServices[deleteService.Name] = expectService{deleteService.Spec.ClusterIP, deleteService.Spec.Type}
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1.Cluster) {
cluster.Spec.Services = append(cluster.Spec.Services, deleteService)
})()).ShouldNot(HaveOccurred())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards, true) }).Should(Succeed())
Eventually(func(g Gomega) { validateClusterServiceList(g, expectServices, compTplName, &shards) }).Should(Succeed())
}

testClusterFinalizer := func(compName string, createObj func(appsv1.TerminationPolicyType)) {
Expand Down Expand Up @@ -1122,7 +1106,7 @@ var _ = Describe("Cluster Controller", func() {
testClusterServiceCreateAndDelete(defaultCompName, compDefObj.Name, createClusterObj)
})

It("should create and delete shard topology cluster service correctly", func() {
It("should create and delete sharding cluster service correctly", func() {
testShardingClusterServiceCreateAndDelete(defaultCompName, compDefObj.Name, createClusterObjWithSharding)
})
})
Expand Down
13 changes: 10 additions & 3 deletions controllers/apps/transformer_cluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func (t *clusterComponentTransformer) reconcileComponents(transCtx *clusterTrans
return err
}

createCompSet := protoCompSet.Difference(runningCompSet)
updateCompSet := protoCompSet.Intersection(runningCompSet)
deleteCompSet := runningCompSet.Difference(protoCompSet)
createCompSet, deleteCompSet, updateCompSet := setDiff(runningCompSet, protoCompSet)

// component objects to be deleted (scale-in)
if err := deleteCompsInOrder(transCtx, dag, deleteCompSet, false); err != nil {
Expand Down Expand Up @@ -600,3 +598,12 @@ func shardingNameFromComp(transCtx *clusterTransformContext, compName string) st
}
return ""
}

func setDiff(s1, s2 sets.Set[string]) (sets.Set[string], sets.Set[string], sets.Set[string]) {
return s2.Difference(s1), s1.Difference(s2), s1.Intersection(s2)
}

func mapDiff[T interface{}](m1, m2 map[string]T) (sets.Set[string], sets.Set[string], sets.Set[string]) {
s1, s2 := sets.KeySet(m1), sets.KeySet(m2)
return setDiff(s1, s2)
}
Loading

0 comments on commit 54328df

Please sign in to comment.