Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mongodb supports connections from outside k8s using host network #6689

Merged
merged 49 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9fb1da6
refine lorry utils
xuriwuyun Jan 15, 2024
7cac1bc
add init container
xuriwuyun Jan 15, 2024
42ef83a
get role use command if specified
xuriwuyun Jan 15, 2024
149f5d6
add main container env to lorry
xuriwuyun Jan 15, 2024
b512f44
update lorry startup probe
xuriwuyun Jan 16, 2024
0fa2b0b
fix lorry spec copy
xuriwuyun Jan 16, 2024
c7129a7
update builtinhandler
xuriwuyun Jan 16, 2024
529850a
remove error log
xuriwuyun Jan 16, 2024
e285a63
fix
xuriwuyun Jan 16, 2024
2087916
add strerr log
xuriwuyun Jan 16, 2024
eea3c92
support specifing container for command
xuriwuyun Jan 17, 2024
3d76f54
fix lint
xuriwuyun Jan 17, 2024
9dee21b
update
xuriwuyun Jan 17, 2024
d5170c7
fix tests
xuriwuyun Jan 17, 2024
ff7ac5f
Merge branch 'main' into feature/lorry_support_cmd
xuriwuyun Feb 2, 2024
e7415f7
update
xuriwuyun Feb 2, 2024
ae2825c
redifine role probe
xuriwuyun Feb 4, 2024
39ddd1a
update role probe
xuriwuyun Feb 4, 2024
df04c9b
add comments
xuriwuyun Feb 4, 2024
d7f9547
add action envs
xuriwuyun Feb 19, 2024
fd0aeba
update api comments
xuriwuyun Feb 21, 2024
43b8dd5
fix tests
xuriwuyun Feb 21, 2024
60aed54
Merge branch 'main' into feature/lorry_support_cmd
xuriwuyun Feb 21, 2024
b9934b9
chore: auto update apis doc changes
xuriwuyun Feb 21, 2024
79689f8
fix bug
xuriwuyun Feb 21, 2024
a4ef723
Merge branch 'feature/lorry_support_cmd' of github.com:apecloud/kubeb…
xuriwuyun Feb 21, 2024
ade84ef
fix tests
xuriwuyun Feb 21, 2024
e7a3e29
add tests
xuriwuyun Feb 21, 2024
05c4e88
update
xuriwuyun Feb 21, 2024
fec8610
chore: auto update apis doc changes
xuriwuyun Feb 21, 2024
37f6bee
fix
xuriwuyun Feb 21, 2024
64e610b
Merge branch 'feature/lorry_support_cmd' of github.com:apecloud/kubeb…
xuriwuyun Feb 21, 2024
e57f4d7
update comments
xuriwuyun Feb 22, 2024
38d0ea5
chore: auto update apis doc changes
xuriwuyun Feb 22, 2024
3047814
update comments
xuriwuyun Feb 22, 2024
ae95280
Merge branch 'feature/lorry_support_cmd' of github.com:apecloud/kubeb…
xuriwuyun Feb 22, 2024
aafaeab
chore: auto update apis doc changes
xuriwuyun Feb 22, 2024
c3b68c4
Merge branch 'main' into feature/lorry_support_cmd
xuriwuyun Feb 22, 2024
4ddaf0e
Merge branch 'feature/lorry_support_cmd' of github.com:apecloud/kubeb…
xuriwuyun Feb 22, 2024
b482470
feat: mongodb support host network
xuriwuyun Feb 23, 2024
9482a81
chore: auto update apis doc changes
xuriwuyun Feb 23, 2024
ff8cf52
update member check
xuriwuyun Feb 23, 2024
dc3ec07
ha support mongodb hostnetwork
xuriwuyun Feb 26, 2024
d2e8f73
Merge branch 'feature/mongodb_support_hostnetwork' of github.com:apec…
xuriwuyun Feb 26, 2024
f1c66b5
Update MongoDB connect port
xuriwuyun Feb 26, 2024
c0fbb4e
add log
xuriwuyun Feb 26, 2024
96f7530
update
xuriwuyun Feb 26, 2024
b0ec502
Merge branch 'main' into feature/mongodb_support_hostnetwork
xuriwuyun Feb 27, 2024
3c7f127
fix test
xuriwuyun Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/lorry/dcs/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (store *KubernetesStore) GetMembers() ([]Member, error) {
member.DBPort = getDBPort(&pod)
member.LorryPort = getLorryPort(&pod)
member.UID = string(pod.UID)
if pod.Spec.HostNetwork {
member.UseIP = true
}
member.resource = pod.DeepCopy()
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/lorry/dcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dcs

import (
"fmt"
"strings"

"github.com/apecloud/kubeblocks/pkg/constant"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *Cluster) GetMemberWithName(name string) *Member {

func (c *Cluster) GetMemberWithHost(host string) *Member {
for _, m := range c.Members {
if host == c.GetMemberAddr(m) {
if strings.HasPrefix(host, m.Name) || strings.HasPrefix(host, m.PodIP) {
return &m
}
}
Expand All @@ -94,6 +95,9 @@ func (c *Cluster) GetMemberAddrWithPort(member Member) string {
}

func (c *Cluster) GetMemberAddr(member Member) string {
if member.UseIP {
return member.PodIP
}
clusterDomain := viper.GetString(constant.KubernetesClusterDomainEnv)
return fmt.Sprintf("%s.%s-headless.%s.svc.%s", member.Name, c.ClusterCompName, c.Namespace, clusterDomain)
}
Expand Down Expand Up @@ -201,6 +205,7 @@ type Member struct {
DBPort string
LorryPort string
UID string
UseIP bool
resource any
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/lorry/engines/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

type DBManagerBase struct {
CurrentMemberName string
CurrentMemberIP string
ClusterCompName string
Namespace string
DataDir string
Expand All @@ -52,6 +53,7 @@ func NewDBManagerBase(logger logr.Logger) (*DBManagerBase, error) {

mgr := DBManagerBase{
CurrentMemberName: currentMemberName,
CurrentMemberIP: viper.GetString(constant.KBEnvPodIP),
ClusterCompName: viper.GetString(constant.KBEnvClusterCompName),
Namespace: viper.GetString(constant.KBEnvNamespace),
Logger: logger,
Expand All @@ -60,7 +62,7 @@ func NewDBManagerBase(logger logr.Logger) (*DBManagerBase, error) {
}

func (mgr *DBManagerBase) IsDBStartupReady() bool {
return true
return mgr.DBStartupReady
}

func (mgr *DBManagerBase) GetLogger() logr.Logger {
Expand Down Expand Up @@ -95,8 +97,8 @@ func (mgr *DBManagerBase) Follow(context.Context, *dcs.Cluster) error {
return errors.New("not implemented")
}

func (mgr *DBManagerBase) Recover(context.Context) error {
return errors.New("not implemented")
func (mgr *DBManagerBase) Recover(context.Context, *dcs.Cluster) error {
return nil
}

func (mgr *DBManagerBase) IsLeader(context.Context, *dcs.Cluster) (bool, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/lorry/engines/custom/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {
return nil, err
}

managerBase.DBStartupReady = true
mgr := &Manager{
actionSvcPorts: &[]int{},
DBManagerBase: *managerBase,
Expand Down
27 changes: 6 additions & 21 deletions pkg/lorry/engines/dbmanager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/lorry/engines/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type DBManager interface {
Promote(context.Context, *dcs.Cluster) error
Demote(context.Context) error
Follow(context.Context, *dcs.Cluster) error
Recover(context.Context) error
Recover(context.Context, *dcs.Cluster) error

// Start and Stop just send signal to lorryctl
Start(context.Context, *dcs.Cluster) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/lorry/engines/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (*MockManager) Follow(context.Context, *dcs.Cluster) error {
return fmt.Errorf("NotSupported")
}

func (*MockManager) Recover(context.Context) error {
func (*MockManager) Recover(context.Context, *dcs.Cluster) error {
return nil

}
Expand Down
4 changes: 2 additions & 2 deletions pkg/lorry/engines/mongodb/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ mongosh mongodb://%s:%s@%s/%s
func (r Commands) ConnectCommand(connectInfo *engines.AuthInfo) []string {
userName := r.info.UserEnv
userPass := r.info.PasswordEnv
dsn := fmt.Sprintf("mongodb://%s:%s@$KB_POD_FQDN:27017/admin?replicaSet=$KB_CLUSTER_COMP_NAME", userName, userPass)
dsn := fmt.Sprintf("mongodb://%s:%s@$KB_POD_FQDN:$SERVICE_PORT/admin?replicaSet=$KB_CLUSTER_COMP_NAME", userName, userPass)
if connectInfo != nil {
userName = connectInfo.UserName
userPass = connectInfo.UserPasswd
dsn = fmt.Sprintf("mongodb://%s:%s@$KB_POD_FQDN:27017/admin?replicaSet=$KB_CLUSTER_COMP_NAME", userName, userPass)
dsn = fmt.Sprintf("mongodb://%s:%s@$KB_POD_FQDN:$SERVICE_PORT/admin?replicaSet=$KB_CLUSTER_COMP_NAME", userName, userPass)
}

mongodbCmd := fmt.Sprintf("export CLIENT=`which mongosh>/dev/null&&echo %s||echo mongo`; $CLIENT %s", r.info.Client, dsn)
Expand Down
107 changes: 84 additions & 23 deletions pkg/lorry/engines/mongodb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (mgr *Manager) InitiateReplSet(ctx context.Context, cluster *dcs.Cluster) e
for i, member := range cluster.Members {
configMembers[i].ID = i
configMembers[i].Host = cluster.GetMemberAddrWithPort(member)
if strings.HasPrefix(member.Name, mgr.CurrentMemberName) {
if strings.HasPrefix(member.Name, mgr.CurrentMemberName) || strings.HasPrefix(member.Name, mgr.CurrentMemberIP) {
configMembers[i].Priority = PrimaryPriority
} else {
configMembers[i].Priority = SecondaryPriority
Expand Down Expand Up @@ -306,7 +306,7 @@ func (mgr *Manager) IsLeaderMember(ctx context.Context, cluster *dcs.Cluster, dc
return false, err
}
for _, member := range status.Members {
if strings.HasPrefix(member.Name, dcsMember.Name) {
if strings.HasPrefix(member.Name, dcsMember.Name) || strings.HasPrefix(member.Name, dcsMember.PodIP) {
if member.StateStr == "PRIMARY" {
return true, nil
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (mgr *Manager) IsCurrentMemberInCluster(ctx context.Context, cluster *dcs.C
}

for _, member := range rsConfig.Members {
if strings.HasPrefix(member.Host, mgr.GetCurrentMemberName()) {
if strings.HasPrefix(member.Host, mgr.CurrentMemberName) || strings.HasPrefix(member.Host, mgr.CurrentMemberIP) {
return true
}
}
Expand All @@ -443,21 +443,72 @@ func (mgr *Manager) IsMemberHealthy(ctx context.Context, cluster *dcs.Cluster, m
memberName = mgr.CurrentMemberName
}

rsStatus, _ := mgr.GetReplSetStatus(ctx)
rsStatus, err := mgr.GetReplSetStatus(ctx)
if err != nil {
mgr.Logger.Info("get replset status failed", "error", err.Error())
return false
}

if rsStatus == nil {
return false
}

for _, member := range rsStatus.Members {
if strings.HasPrefix(member.Name, memberName) && member.Health == 1 {
if (strings.HasPrefix(member.Name, memberName) || strings.HasPrefix(member.Name, mgr.CurrentMemberIP)) &&
member.Health == 1 {
return true
}
}
return false
}

func (mgr *Manager) Recover(context.Context) error {
return nil
func (mgr *Manager) Recover(ctx context.Context, cluster *dcs.Cluster) error {
if mgr.IsCurrentMemberInCluster(ctx, cluster) {
return nil
}
return mgr.UpdateCurrentMemberHost(ctx, cluster)
}

func (mgr *Manager) UpdateCurrentMemberHost(ctx context.Context, cluster *dcs.Cluster) error {
client, err := mgr.GetReplSetClient(ctx, cluster)
if err != nil {
return err
}
defer client.Disconnect(ctx) //nolint:errcheck

currentMember := cluster.GetMemberWithName(mgr.GetCurrentMemberName())
currentHost := cluster.GetMemberAddrWithPort(*currentMember)
rsConfig, err := GetReplSetConfig(ctx, client)
if rsConfig == nil {
mgr.Logger.Error(err, "Get replSet config failed")
return err
}

var invalidMembers []*ConfigMember
for i, configMember := range rsConfig.Members {
host := configMember.Host
isInvalid := true
for _, member := range cluster.Members {
if strings.HasPrefix(host, member.Name) || strings.HasPrefix(host, member.PodIP) {
isInvalid = false
continue
}
}
if isInvalid {
invalidMembers = append(invalidMembers, &rsConfig.Members[i])
}
}
if len(invalidMembers) > 1 {
return errors.Errorf("the replica set has more than one invalid members: %v", invalidMembers)
}
if len(invalidMembers) == 0 {
return nil
}
configMember := invalidMembers[0]
configMember.Host = currentHost

rsConfig.Version++
return SetReplSetConfig(ctx, client, rsConfig)
}

func (mgr *Manager) JoinCurrentMemberToCluster(ctx context.Context, cluster *dcs.Cluster) error {
Expand Down Expand Up @@ -508,7 +559,7 @@ func (mgr *Manager) LeaveMemberFromCluster(ctx context.Context, cluster *dcs.Clu
configMembers := make([]ConfigMember, 0, len(rsConfig.Members)-1)
isDeleted := true
for _, configMember := range rsConfig.Members {
if strings.HasPrefix(configMember.Host, memberName) {
if strings.HasPrefix(configMember.Host, memberName) || strings.HasPrefix(configMember.Host, mgr.CurrentMemberIP) {
isDeleted = false
continue
}
Expand Down Expand Up @@ -557,7 +608,8 @@ func (mgr *Manager) IsPromoted(ctx context.Context) bool {
return false
}
for i := range rsConfig.Members {
if strings.HasPrefix(rsConfig.Members[i].Host, mgr.CurrentMemberName) {
host := rsConfig.Members[i].Host
if strings.HasPrefix(host, mgr.CurrentMemberName) || strings.HasPrefix(host, mgr.CurrentMemberIP) {
if rsConfig.Members[i].Priority == PrimaryPriority {
return true
}
Expand All @@ -574,7 +626,8 @@ func (mgr *Manager) Promote(ctx context.Context, cluster *dcs.Cluster) error {
}

for i := range rsConfig.Members {
if strings.HasPrefix(rsConfig.Members[i].Host, mgr.CurrentMemberName) {
host := rsConfig.Members[i].Host
if strings.HasPrefix(host, mgr.CurrentMemberName) || strings.HasPrefix(host, mgr.CurrentMemberIP) {
if rsConfig.Members[i].Priority == PrimaryPriority {
mgr.Logger.Info("Current member already has the highest priority!")
return nil
Expand Down Expand Up @@ -616,9 +669,13 @@ func (mgr *Manager) GetHealthiestMember(cluster *dcs.Cluster, candidate string)
var leader string
for _, member := range rsStatus.Members {
if member.Health == 1 {
memberName := strings.Split(member.Name, ".")[0]
m := cluster.GetMemberWithHost(member.Name)
if m == nil {
continue
}
memberName := m.Name
if memberName == candidate {
return cluster.GetMemberWithName(candidate)
return m
}
healthyMembers = append(healthyMembers, memberName)
if member.State == 1 {
Expand Down Expand Up @@ -651,20 +708,20 @@ func (mgr *Manager) HasOtherHealthyLeader(ctx context.Context, cluster *dcs.Clus
healthMembers := map[string]struct{}{}
var otherLeader string
for _, member := range rsStatus.Members {
memberName := strings.Split(member.Name, ".")[0]
memberName := member.Name
if member.State == 1 || member.State == 2 {
healthMembers[memberName] = struct{}{}
}

if member.State != 1 {
continue
}
if memberName != mgr.CurrentMemberName {
if !strings.HasPrefix(memberName, mgr.CurrentMemberName) && !strings.HasPrefix(memberName, mgr.CurrentMemberIP) {
otherLeader = memberName
}
}
if otherLeader != "" {
return cluster.GetMemberWithName(otherLeader)
return cluster.GetMemberWithHost(otherLeader)
}

rsConfig, err := mgr.GetReplSetConfig(ctx)
Expand All @@ -674,16 +731,16 @@ func (mgr *Manager) HasOtherHealthyLeader(ctx context.Context, cluster *dcs.Clus
}

for _, mb := range rsConfig.Members {
memberName := strings.Split(mb.Host, ".")[0]
if mb.Priority == PrimaryPriority && memberName != mgr.CurrentMemberName {
memberName := mb.Host
if mb.Priority == PrimaryPriority && !strings.HasPrefix(memberName, mgr.CurrentMemberName) && !strings.HasPrefix(memberName, mgr.CurrentMemberIP) {
if _, ok := healthMembers[memberName]; ok {
otherLeader = memberName
}
}
}

if otherLeader != "" {
return cluster.GetMemberWithName(otherLeader)
return cluster.GetMemberWithHost(otherLeader)
}

return nil
Expand All @@ -698,17 +755,21 @@ func (mgr *Manager) HasOtherHealthyMembers(ctx context.Context, cluster *dcs.Clu
}

for _, member := range rsStatus.Members {
if member == nil {
continue
}
if member.Health != 1 {
continue
}
memberName := strings.Split(member.Name, ".")[0]
if memberName == leader {
m := cluster.GetMemberWithHost(member.Name)
if m == nil {
continue
}
member := cluster.GetMemberWithName(memberName)
if member != nil {
members = append(members, member)
memberName := m.Name
if memberName == leader {
continue
}
members = append(members, m)
}

return members
Expand Down
Loading
Loading