Skip to content

Commit

Permalink
chore: handle host port configmap update conflict (#6218)
Browse files Browse the repository at this point in the history
  • Loading branch information
iziang authored Dec 25, 2023
1 parent 4ed7a92 commit 5f8d5dd
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
1 change: 1 addition & 0 deletions controllers/apps/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var _ = BeforeSuite(func() {
viper.SetDefault(constant.KBToolsImage, "apecloud/kubeblocks-tools:latest")
viper.SetDefault("PROBE_SERVICE_PORT", 3501)
viper.SetDefault("PROBE_SERVICE_LOG_LEVEL", "info")
viper.SetDefault("CM_NAMESPACE", "default")
viper.SetDefault(constant.EnableRBACManager, true)

err = intctrlutil.InitHostPortManager(k8sClient)
Expand Down
64 changes: 43 additions & 21 deletions pkg/controllerutil/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (

"github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

// ResultToP converts a Result object to a pointer.
Expand Down Expand Up @@ -290,20 +291,16 @@ const (
// The TCP/IP port numbers below 1024 are special in that normal users are not allowed to run servers on them.
// This is a security feaure, in that if you connect to a service on one of these ports you can be fairly sure
// that you have the real thing, and not a fake which some hacker has put up for you.
hostPortMin = int32(1025)
hostPortMax = int32(65536)
hostPortConfigMapName = "kubeblocks-host-ports"
hostPortConfigMapNamespace = "default"
hostPortMin = int32(1025)
hostPortMax = int32(65536)
hostPortConfigMapName = "kubeblocks-host-ports"
)

func InitHostPortManager(cli client.Client) error {
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: hostPortConfigMapName,
// TODO use the namespace where the operator is running in
Namespace: hostPortConfigMapNamespace,
// TODO add finalizers
Finalizers: make([]string, 0),
Name: hostPortConfigMapName,
Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS),
},
Data: make(map[string]string),
}
Expand All @@ -313,7 +310,7 @@ func InitHostPortManager(cli client.Client) error {
return err
}
}
portManager, err = NewPortManager(context.Background(), hostPortMin, hostPortMax, cli)
portManager, err = NewPortManager(hostPortMin, hostPortMax, cli)
return err
}

Expand All @@ -337,11 +334,26 @@ type PortManager struct {
// NewPortManager creates a new PortManager
// TODO[ziang] Putting all the port information in one configmap may have performance issues and is not secure enough.
// There is a risk of accidental deletion leading to the loss of cluster port information.
func NewPortManager(ctx context.Context, min, max int32, cli client.Client) (*PortManager, error) {
cm := &corev1.ConfigMap{}
if err := cli.Get(ctx, types.NamespacedName{Name: hostPortConfigMapName, Namespace: hostPortConfigMapNamespace}, cm); err != nil {
func NewPortManager(min, max int32, cli client.Client) (*PortManager, error) {
pm := &PortManager{min: min, max: max, cli: cli, portsUsed: make(map[int32]bool)}
if err := pm.sync(); err != nil {
return nil, err
}
return pm, nil
}

func (pm *PortManager) sync() error {
cm := &corev1.ConfigMap{}
objKey := types.NamespacedName{
Name: hostPortConfigMapName,
Namespace: viper.GetString(constant.CfgKeyCtrlrMgrNS),
}
if err := pm.cli.Get(context.Background(), objKey, cm); err != nil {
return err
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
portsUsed := make(map[int32]bool)
for _, item := range cm.Data {
port, err := strconv.ParseInt(strings.TrimSpace(item), 10, 32)
Expand All @@ -350,17 +362,27 @@ func NewPortManager(ctx context.Context, min, max int32, cli client.Client) (*Po
}
portsUsed[int32(port)] = true
}
return &PortManager{min: min, max: max, cm: cm, cli: cli, portsUsed: make(map[int32]bool)}, nil
pm.cm = cm
pm.portsUsed = portsUsed
return nil
}

func (pm *PortManager) update() error {
var err error
defer func() {
if apierrors.IsConflict(err) {
_ = pm.sync()
}
}()
err = pm.cli.Update(context.Background(), pm.cm)
return err
}

func (pm *PortManager) UsePort(key string, port int32) error {
pm.Lock()
defer pm.Unlock()
if pm.cm.Data == nil {
pm.cm.Data = make(map[string]string)
}
pm.cm.Data[key] = fmt.Sprintf("%d", port)
if err := pm.cli.Update(context.Background(), pm.cm); err != nil {
if err := pm.update(); err != nil {
return err
}
pm.portsUsed[port] = true
Expand All @@ -386,7 +408,7 @@ func (pm *PortManager) AllocatePort(key string) (int32, error) {
continue
}
pm.cm.Data[key] = fmt.Sprintf("%d", port)
if err := pm.cli.Update(context.Background(), pm.cm); err != nil {
if err := pm.update(); err != nil {
return 0, err
}
pm.portsUsed[port] = true
Expand All @@ -405,7 +427,7 @@ func (pm *PortManager) ReleasePorts(keys []string) error {
for _, key := range keys {
delete(pm.cm.Data, key)
}
if err := pm.cli.Update(context.Background(), pm.cm); err != nil {
if err := pm.update(); err != nil {
return err
}
return nil
Expand All @@ -422,7 +444,7 @@ func (pm *PortManager) ReleaseByPrefix(prefix string) error {
delete(pm.cm.Data, key)
}
}
if err := pm.cli.Update(context.Background(), pm.cm); err != nil {
if err := pm.update(); err != nil {
return err
}
return nil
Expand Down

0 comments on commit 5f8d5dd

Please sign in to comment.