Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Aug 26, 2024
1 parent 6c0c6d3 commit a1bb01a
Show file tree
Hide file tree
Showing 6 changed files with 488 additions and 157 deletions.
60 changes: 40 additions & 20 deletions pkg/controller/component/lifecycle/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (a *kbagent) PostProvision(ctx context.Context, cli client.Reader, opts *Op
compName: a.synthesizedComp.Name,
action: a.synthesizedComp.LifecycleActions.PostProvision,
}
return a.checkedCallAction(ctx, cli, lfa.action, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, lfa.action, lfa, opts))
}

func (a *kbagent) PreTerminate(ctx context.Context, cli client.Reader, opts *Options) error {
Expand All @@ -69,7 +69,11 @@ func (a *kbagent) PreTerminate(ctx context.Context, cli client.Reader, opts *Opt
compName: a.synthesizedComp.Name,
action: a.synthesizedComp.LifecycleActions.PreTerminate,
}
return a.checkedCallAction(ctx, cli, lfa.action, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, lfa.action, lfa, opts))
}

func (a *kbagent) RoleProbe(ctx context.Context, cli client.Reader, opts *Options) ([]byte, error) {
return a.checkedCallProbe(ctx, cli, a.synthesizedComp.LifecycleActions.RoleProbe, &roleProbe{}, opts)
}

func (a *kbagent) Switchover(ctx context.Context, cli client.Reader, opts *Options, candidate string) error {
Expand All @@ -80,7 +84,7 @@ func (a *kbagent) Switchover(ctx context.Context, cli client.Reader, opts *Optio
roles: a.synthesizedComp.Roles,
candidate: candidate,
}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.Switchover, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.Switchover, lfa, opts))
}

func (a *kbagent) MemberJoin(ctx context.Context, cli client.Reader, opts *Options) error {
Expand All @@ -90,7 +94,7 @@ func (a *kbagent) MemberJoin(ctx context.Context, cli client.Reader, opts *Optio
compName: a.synthesizedComp.Name,
pod: a.pod,
}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberJoin, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberJoin, lfa, opts))
}

func (a *kbagent) MemberLeave(ctx context.Context, cli client.Reader, opts *Options) error {
Expand All @@ -100,17 +104,17 @@ func (a *kbagent) MemberLeave(ctx context.Context, cli client.Reader, opts *Opti
compName: a.synthesizedComp.Name,
pod: a.pod,
}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberLeave, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberLeave, lfa, opts))
}

func (a *kbagent) DataDump(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &dataDump{}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.DataDump, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.DataDump, lfa, opts))
}

func (a *kbagent) DataLoad(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &dataLoad{}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.DataLoad, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.DataLoad, lfa, opts))
}

func (a *kbagent) AccountProvision(ctx context.Context, cli client.Reader, opts *Options, statement, user, password string) error {
Expand All @@ -119,20 +123,31 @@ func (a *kbagent) AccountProvision(ctx context.Context, cli client.Reader, opts
user: user,
password: password,
}
return a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.AccountProvision, lfa, opts)
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.AccountProvision, lfa, opts))
}

func (a *kbagent) ignoreOutput(_ []byte, err error) error {
return err
}

func (a *kbagent) checkedCallAction(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Action, lfa lifecycleAction, opts *Options) error {
func (a *kbagent) checkedCallAction(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Action, lfa lifecycleAction, opts *Options) ([]byte, error) {
if spec == nil || spec.Exec == nil {
return errors.Wrap(ErrActionNotDefined, lfa.name())
return nil, errors.Wrap(ErrActionNotDefined, lfa.name())
}
if err := a.precondition(ctx, cli, spec); err != nil {
return err
return nil, err
}
// TODO: exactly once
return a.callAction(ctx, cli, spec, lfa, opts)
}

func (a *kbagent) checkedCallProbe(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Probe, lfa lifecycleAction, opts *Options) ([]byte, error) {
if spec == nil || spec.Exec == nil {
return nil, errors.Wrap(ErrActionNotDefined, lfa.name())
}
return a.checkedCallAction(ctx, cli, &spec.Action, lfa, opts)
}

func (a *kbagent) precondition(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Action) error {
if spec.PreCondition == nil {
return nil
Expand Down Expand Up @@ -191,10 +206,10 @@ func (a *kbagent) readyCheck(ctx context.Context, cli client.Reader, name, kind
return nil
}

func (a *kbagent) callAction(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Action, lfa lifecycleAction, opts *Options) error {
func (a *kbagent) callAction(ctx context.Context, cli client.Reader, spec *appsv1alpha1.Action, lfa lifecycleAction, opts *Options) ([]byte, error) {
req, err1 := a.buildActionRequest(ctx, cli, lfa, opts)
if err1 != nil {
return err1
return nil, err1
}
return a.callActionWithSelector(ctx, spec, lfa, req)
}
Expand Down Expand Up @@ -252,32 +267,37 @@ func (a *kbagent) templateVarsParameters() (map[string]string, error) {
return m, nil
}

func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1.Action, lfa lifecycleAction, req *proto.ActionRequest) error {
func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1.Action, lfa lifecycleAction, req *proto.ActionRequest) ([]byte, error) {
pods, err := a.selectTargetPods(spec)
if err != nil {
return err
return nil, err
}
if len(pods) == 0 {
return fmt.Errorf("no available pod to call action %s", lfa.name())
return nil, fmt.Errorf("no available pod to call action %s", lfa.name())
}

// TODO: impl
// - back-off to retry
// - timeout
var output []byte
for _, pod := range pods {
cli, err1 := kbacli.NewClient(*pod)
if err1 != nil {
return err1
return nil, err1
}
if cli == nil {
continue // not defined, for test only
}
_, err2 := cli.CallAction(ctx, *req)
rsp, err2 := cli.CallAction(ctx, *req)
if err2 != nil {
return a.error2(lfa, err2)
return nil, a.error2(lfa, err2)
}
// take first non-nil output
if output == nil && rsp.Output != nil {
output = rsp.Output
}
}
return nil
return output, nil
}

func (a *kbagent) selectTargetPods(spec *appsv1alpha1.Action) ([]*corev1.Pod, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/component/lifecycle/lfa_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

const (
hackedAllCompList = "KB_CLUSTER_COMPONENT_LIST" // declare it here for test
)

type postProvision struct {
namespace string
clusterName string
Expand Down Expand Up @@ -106,7 +110,7 @@ func hackParameters4Comp(ctx context.Context, cli client.Reader, namespace, clus
compPodIPList = "KB_CLUSTER_COMPONENT_POD_IP_LIST"
compPodHostNameList = "KB_CLUSTER_COMPONENT_POD_HOST_NAME_LIST"
compPodHostIPList = "KB_CLUSTER_COMPONENT_POD_HOST_IP_LIST"
allCompList = "KB_CLUSTER_COMPONENT_LIST"
allCompList = hackedAllCompList
deletingCompList = "KB_CLUSTER_COMPONENT_DELETING_LIST"
undeletedCompList = "KB_CLUSTER_COMPONENT_UNDELETED_LIST"
scalingInComp = "KB_CLUSTER_COMPONENT_IS_SCALING_IN"
Expand Down
109 changes: 105 additions & 4 deletions pkg/controller/component/lifecycle/lfa_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,68 @@ package lifecycle

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
)

const (
joinMemberPodFQDNVar = "KB_JOIN_MEMBER_POD_FQDN"
joinMemberPodNameVar = "KB_JOIN_MEMBER_POD_NAME"
leaveMemberPodFQDNVar = "KB_LEAVE_MEMBER_POD_FQDN"
leaveMemberPodNameVar = "KB_LEAVE_MEMBER_POD_NAME"
switchoverCandidateName = "KB_SWITCHOVER_CANDIDATE_NAME"
switchoverCandidateFQDN = "KB_SWITCHOVER_CANDIDATE_FQDN"
joinMemberPodFQDNVar = "KB_JOIN_MEMBER_POD_FQDN"
joinMemberPodNameVar = "KB_JOIN_MEMBER_POD_NAME"
leaveMemberPodFQDNVar = "KB_LEAVE_MEMBER_POD_FQDN"
leaveMemberPodNameVar = "KB_LEAVE_MEMBER_POD_NAME"
)

type roleProbe struct{}

var _ lifecycleAction = &roleProbe{}

func (a *roleProbe) name() string {
return "roleProbe"
}

func (a *roleProbe) parameters(ctx context.Context, cli client.Reader) (map[string]string, error) {
return nil, nil
}

type switchover struct {
namespace string
clusterName string
compName string
roles []appsv1alpha1.ReplicaRole
candidate string
}

var _ lifecycleAction = &switchover{}

func (a *switchover) name() string {
return "switchover"
}

func (a *switchover) parameters(ctx context.Context, cli client.Reader) (map[string]string, error) {
// The container executing this action has access to following variables:
//
// - KB_SWITCHOVER_CANDIDATE_NAME: The name of the pod for the new leader candidate, which may not be specified (empty).
// - KB_SWITCHOVER_CANDIDATE_FQDN: The FQDN of the new leader candidate's pod, which may not be specified (empty).
m, err := hackParameters4Switchover(ctx, cli, a.namespace, a.clusterName, a.compName, a.roles)
if err != nil {
return nil, err
}
if len(a.candidate) > 0 {
compName := constant.GenerateClusterComponentName(a.clusterName, a.compName)
m[switchoverCandidateName] = a.candidate
m[switchoverCandidateFQDN] = component.PodFQDN(a.namespace, compName, a.candidate)
}
return m, nil
}

type memberJoin struct {
namespace string
clusterName string
Expand Down Expand Up @@ -85,3 +132,57 @@ func (a *memberLeave) parameters(ctx context.Context, cli client.Reader) (map[st
leaveMemberPodNameVar: a.pod.Name,
}, nil
}

////////// hack for legacy Addons //////////
// The container executing this action has access to following variables:
//
// - KB_LEADER_POD_IP: The IP address of the current leader's pod prior to the switchover.
// - KB_LEADER_POD_NAME: The name of the current leader's pod prior to the switchover.
// - KB_LEADER_POD_FQDN: The FQDN of the current leader's pod prior to the switchover.

func hackParameters4Switchover(ctx context.Context, cli client.Reader, namespace, clusterName, compName string, roles []appsv1alpha1.ReplicaRole) (map[string]string, error) {
const (
leaderPodName = "KB_LEADER_POD_NAME"
leaderPodFQDN = "KB_LEADER_POD_FQDN"
leaderPodIP = "KB_LEADER_POD_IP"
)

role, err := leaderRole(roles)
if err != nil {
return nil, err
}

pods, err := component.ListOwnedPodsWithRole(ctx, cli, namespace, clusterName, compName, role)
if err != nil {
return nil, err
}
if len(pods) == 0 {
return nil, fmt.Errorf("has no pod with the leader role %s", role)
}
if len(pods) > 1 {
return nil, fmt.Errorf("more than one pod found as leader: %d, role: %s", len(pods), role)
}

pod := pods[0]
return map[string]string{
leaderPodName: pod.Name,
leaderPodFQDN: component.PodFQDN(namespace, constant.GenerateClusterComponentName(clusterName, compName), pod.Name),
leaderPodIP: pod.Status.PodIP,
}, nil
}

func leaderRole(roles []appsv1alpha1.ReplicaRole) (string, error) {
targetRole := ""
for _, role := range roles {
if role.Serviceable && role.Writable {
if targetRole != "" {
return "", fmt.Errorf("more than one role defined as leader: %s,%s", targetRole, role.Name)
}
targetRole = role.Name
}
}
if targetRole == "" {
return "", fmt.Errorf("%s", "has no appropriate role defined as leader")
}
return targetRole, nil
}
Loading

0 comments on commit a1bb01a

Please sign in to comment.