Skip to content

Commit

Permalink
Merge branch 'main' into support/instance-update-strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
free6om committed Dec 19, 2024
2 parents c47b102 + 274793f commit d2c0b28
Show file tree
Hide file tree
Showing 101 changed files with 885 additions and 1,318 deletions.
4 changes: 2 additions & 2 deletions apis/apps/v1/component_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ type ComponentSpec struct {
// Specifies Labels to override or add for underlying Pods, PVCs, Account & TLS Secrets, Services Owned by Component.
//
// +optional
Labels map[string]string `json:"labels,omitempty"`
Labels map[string]string `json:"labels"`

// Specifies Annotations to override or add for underlying Pods, PVCs, Account & TLS Secrets, Services Owned by Component.
//
// +optional
Annotations map[string]string `json:"annotations,omitempty"`
Annotations map[string]string `json:"annotations"`

// List of environment variables to add.
//
Expand Down
3 changes: 1 addition & 2 deletions apis/apps/v1/componentdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,11 +1264,10 @@ type TLS struct {
// +kubebuilder:validation:Required
MountPath string `json:"mountPath"`

// The default permissions for the mounted path.
// The permissions for the mounted path. Defaults to 0600.
//
// This field is immutable once set.
//
// +kubebuilder:default=0600
// +optional
DefaultMode *int32 `json:"defaultMode,omitempty"`

Expand Down
17 changes: 4 additions & 13 deletions apis/apps/v1alpha1/cluster_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (r *Cluster) ConvertTo(dstRaw conversion.Hub) error {
if err := incrementConvertTo(r, dst); err != nil {
return err
}

// status
if err := copier.Copy(&dst.Status, &r.Status); err != nil {
return err
Expand All @@ -65,15 +64,13 @@ func (r *Cluster) ConvertFrom(srcRaw conversion.Hub) error {
if err := copier.Copy(&r.Spec, &src.Spec); err != nil {
return err
}
if err := incrementConvertFrom(r, src, &clusterConverter{}); err != nil {
return err
}

// status
if err := copier.Copy(&r.Status, &src.Status); err != nil {
return err
}

if err := incrementConvertFrom(r, src, &clusterConverter{}); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -116,9 +113,6 @@ func (r *Cluster) changesToCluster(cluster *appsv1.Cluster) {
// status
// components
// - message: ComponentMessageMap -> map[string]string
if len(r.Spec.ClusterDefRef) > 0 {
cluster.Spec.ClusterDef = r.Spec.ClusterDefRef
}
if r.Spec.TerminationPolicy == Halt {
cluster.Spec.TerminationPolicy = appsv1.DoNotTerminate
} else {
Expand Down Expand Up @@ -176,9 +170,6 @@ func (r *Cluster) changesFromCluster(cluster *appsv1.Cluster) {
// status
// components
// - message: ComponentMessageMap -> map[string]string
if len(cluster.Spec.ClusterDef) > 0 {
r.Spec.ClusterDefRef = cluster.Spec.ClusterDef
}
// appsv1.TerminationPolicyType is a subset of appsv1alpha1.TerminationPolicyType, it can be converted directly.
for _, spec := range cluster.Spec.ComponentSpecs {
if spec.UpdateStrategy == nil || spec.UpdateStrategy.InstanceUpdatePolicy == nil {
Expand Down Expand Up @@ -212,7 +203,7 @@ type clusterConverter struct {
}

type clusterSpecConverter struct {
ClusterDefRef string `json:"clusterDefRef,omitempty"`
ClusterDefRef string `json:"clusterDefinitionRef,omitempty"`
ClusterVersionRef string `json:"clusterVersionRef,omitempty"`
TerminationPolicy TerminationPolicyType `json:"terminationPolicy"`
Affinity *Affinity `json:"affinity,omitempty"`
Expand Down
101 changes: 85 additions & 16 deletions apis/workloads/v1alpha1/instanceset_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ package v1alpha1

import (
"github.com/jinzhu/copier"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/conversion"

workloadsv1 "github.com/apecloud/kubeblocks/apis/workloads/v1"
)

const (
kbIncrementConverterAK = "kb-increment-converter"
)

// ConvertTo converts this InstanceSet to the Hub version (v1).
func (r *InstanceSet) ConvertTo(dstRaw conversion.Hub) error {
dst := dstRaw.(*workloadsv1.InstanceSet)
Expand All @@ -47,6 +52,10 @@ func (r *InstanceSet) ConvertTo(dstRaw conversion.Hub) error {
return err
}

if err := r.incrementConvertTo(dst); err != nil {
return err
}

return nil
}

Expand All @@ -68,33 +77,87 @@ func (r *InstanceSet) ConvertFrom(srcRaw conversion.Hub) error {
return err
}

if err := r.incrementConvertFrom(src); err != nil {
return err
}
return nil
}

func (r *InstanceSet) incrementConvertTo(dstRaw metav1.Object) error {
if r.Spec.RoleProbe == nil {
return nil
}
// changed
instanceConvert := instanceSetConverter{
RoleProbe: r.Spec.RoleProbe,
}
bytes, err := json.Marshal(instanceConvert)
if err != nil {
return err
}
annotations := dstRaw.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[kbIncrementConverterAK] = string(bytes)
dstRaw.SetAnnotations(annotations)
return nil
}

func (r *InstanceSet) incrementConvertFrom(srcRaw metav1.Object) error {
data, ok := srcRaw.GetAnnotations()[kbIncrementConverterAK]
if !ok {
return nil
}
instanceConvert := instanceSetConverter{}
if err := json.Unmarshal([]byte(data), &instanceConvert); err != nil {
return err
}
delete(srcRaw.GetAnnotations(), kbIncrementConverterAK)
r.Spec.RoleProbe = instanceConvert.RoleProbe
return nil
}

type instanceSetConverter struct {
RoleProbe *RoleProbe `json:"roleProbe,omitempty"`
}

func (r *InstanceSet) changesToInstanceSet(its *workloadsv1.InstanceSet) {
// changed:
// spec
// podUpdatePolicy -> updateStrategy.instanceUpdatePolicy
// memberUpdateStrategy -> updateStrategy.rollingUpdate.updateConcurrency
// updateStrategy.rollingUpdate.partition -> updateStrategy.rollingUpdate.replicas
// updateStrategy.partition -> updateStrategy.rollingUpdate.replicas
// updateStrategy.maxUnavailable -> updateStrategy.rollingUpdate.maxUnavailable
// updateStrategy.memberUpdateStrategy -> updateStrategy.rollingUpdate.updateConcurrency
if its.Spec.UpdateStrategy == nil {
its.Spec.UpdateStrategy = &workloadsv1.UpdateStrategy{}
}
its.Spec.UpdateStrategy.InstanceUpdatePolicy = (*workloadsv1.InstanceUpdatePolicyType)(&r.Spec.PodUpdatePolicy)
if r.Spec.MemberUpdateStrategy != nil {
initRollingUpdate := func() {
if its.Spec.UpdateStrategy.RollingUpdate == nil {
its.Spec.UpdateStrategy.RollingUpdate = &workloadsv1.RollingUpdate{}
}
its.Spec.UpdateStrategy.RollingUpdate.UpdateConcurrency = (*workloadsv1.UpdateConcurrency)(r.Spec.MemberUpdateStrategy)
}
if r.Spec.UpdateStrategy.RollingUpdate != nil {
if r.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if its.Spec.UpdateStrategy.RollingUpdate == nil {
its.Spec.UpdateStrategy.RollingUpdate = &workloadsv1.RollingUpdate{}
}
replicas := intstr.FromInt32(*r.Spec.UpdateStrategy.RollingUpdate.Partition)
}
setUpdateConcurrency := func(strategy *MemberUpdateStrategy) {
if strategy == nil {
return
}
initRollingUpdate()
its.Spec.UpdateStrategy.RollingUpdate.UpdateConcurrency = (*workloadsv1.UpdateConcurrency)(strategy)
}
setUpdateConcurrency(r.Spec.MemberUpdateStrategy)
if r.Spec.UpdateStrategy != nil {
setUpdateConcurrency(r.Spec.UpdateStrategy.MemberUpdateStrategy)
if r.Spec.UpdateStrategy.Partition != nil {
initRollingUpdate()
replicas := intstr.FromInt32(*r.Spec.UpdateStrategy.Partition)
its.Spec.UpdateStrategy.RollingUpdate.Replicas = &replicas
}
if r.Spec.UpdateStrategy.MaxUnavailable != nil {
initRollingUpdate()
its.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable = r.Spec.UpdateStrategy.MaxUnavailable
}
}
}

Expand All @@ -103,7 +166,9 @@ func (r *InstanceSet) changesFromInstanceSet(its *workloadsv1.InstanceSet) {
// spec
// podUpdatePolicy -> updateStrategy.instanceUpdatePolicy
// memberUpdateStrategy -> updateStrategy.rollingUpdate.updateConcurrency
// updateStrategy.rollingUpdate.partition -> updateStrategy.rollingUpdate.replicas
// updateStrategy.partition -> updateStrategy.rollingUpdate.replicas
// updateStrategy.maxUnavailable -> updateStrategy.rollingUpdate.maxUnavailable
// updateStrategy.memberUpdateStrategy -> updateStrategy.rollingUpdate.updateConcurrency
if its.Spec.UpdateStrategy == nil {
return
}
Expand All @@ -113,14 +178,18 @@ func (r *InstanceSet) changesFromInstanceSet(its *workloadsv1.InstanceSet) {
if its.Spec.UpdateStrategy.RollingUpdate == nil {
return
}
if r.Spec.UpdateStrategy == nil {
r.Spec.UpdateStrategy = &InstanceUpdateStrategy{}
}
if its.Spec.UpdateStrategy.RollingUpdate.UpdateConcurrency != nil {
r.Spec.MemberUpdateStrategy = (*MemberUpdateStrategy)(its.Spec.UpdateStrategy.RollingUpdate.UpdateConcurrency)
r.Spec.UpdateStrategy.MemberUpdateStrategy = r.Spec.MemberUpdateStrategy
}
if its.Spec.UpdateStrategy.RollingUpdate.Replicas != nil {
if r.Spec.UpdateStrategy.RollingUpdate == nil {
r.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{}
}
partition, _ := intstr.GetScaledValueFromIntOrPercent(its.Spec.UpdateStrategy.RollingUpdate.Replicas, int(*its.Spec.Replicas), false)
r.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(int32(partition))
r.Spec.UpdateStrategy.Partition = pointer.Int32(int32(partition))
}
if its.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable != nil {
r.Spec.UpdateStrategy.MaxUnavailable = its.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable
}
}
39 changes: 37 additions & 2 deletions apis/workloads/v1alpha1/instanceset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,35 @@ type SchedulingPolicy struct {
TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
}

// InstanceUpdateStrategy indicates the strategy that the InstanceSet
// controller will use to perform updates. It includes any additional parameters
// necessary to perform the update for the indicated strategy.
type InstanceUpdateStrategy struct {
// Partition indicates the number of pods that should be updated during a rolling update.
// The remaining pods will remain untouched. This is helpful in defining how many pods
// should participate in the update process. The update process will follow the order
// of pod names in descending lexicographical (dictionary) order. The default value is
// Replicas (i.e., update all pods).
// +optional
Partition *int32 `json:"partition,omitempty"`
// The maximum number of pods that can be unavailable during the update.
// Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
// Absolute number is calculated from percentage by rounding up. This can not be 0.
// Defaults to 1. The field applies to all pods. That means if there is any unavailable pod,
// it will be counted towards MaxUnavailable.
// +optional
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// Members(Pods) update strategy.
//
// - serial: update Members one by one that guarantee minimum component unavailable time.
// - bestEffortParallel: update Members in parallel that guarantee minimum component un-writable time.
// - parallel: force parallel
//
// +kubebuilder:validation:Enum={Serial,BestEffortParallel,Parallel}
// +optional
MemberUpdateStrategy *MemberUpdateStrategy `json:"memberUpdateStrategy,omitempty"`
}

// Range represents a range with a start and an end value.
// It is used to define a continuous segment.
type Range struct {
Expand Down Expand Up @@ -326,10 +355,9 @@ type InstanceSetSpec struct {
// Indicates the StatefulSetUpdateStrategy that will be
// employed to update Pods in the InstanceSet when a revision is made to
// Template.
// UpdateStrategy.Type will be set to appsv1.OnDeleteStatefulSetStrategyType if MemberUpdateStrategy is not nil
//
// Note: This field will be removed in future version.
UpdateStrategy appsv1.StatefulSetUpdateStrategy `json:"updateStrategy,omitempty"`
UpdateStrategy *InstanceUpdateStrategy `json:"updateStrategy,omitempty"`

// A list of roles defined in the system.
//
Expand Down Expand Up @@ -559,6 +587,13 @@ const (

// RoleProbe defines how to observe role
type RoleProbe struct {
// Specifies the builtin handler name to use to probe the role of the main container.
// Available handlers include: mysql, postgres, mongodb, redis, etcd, kafka.
// Use CustomHandler to define a custom role probe function if none of the built-in handlers meet the requirement.
//
// +optional
BuiltinHandler *string `json:"builtinHandlerName,omitempty"`

// Defines a custom method for role probing.
// Actions defined here are executed in series.
// Upon completion of all actions, the final output should be a single string representing the role name defined in spec.Roles.
Expand Down
41 changes: 40 additions & 1 deletion apis/workloads/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_clusterdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9001,6 +9001,12 @@ spec:
roleProbe:
description: Defines the method used to probe a role.
properties:
builtinHandlerName:
description: |-
Specifies the builtin handler name to use to probe the role of the main container.
Available handlers include: mysql, postgres, mongodb, redis, etcd, kafka.
Use CustomHandler to define a custom role probe function if none of the built-in handlers meet the requirement.
type: string
customHandler:
description: |-
Defines a custom method for role probing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16786,9 +16786,8 @@ spec:
This field is immutable once set.
type: string
defaultMode:
default: 600
description: |-
The default permissions for the mounted path.
The permissions for the mounted path. Defaults to 0600.


This field is immutable once set.
Expand Down
Loading

0 comments on commit d2c0b28

Please sign in to comment.