Skip to content

Commit

Permalink
Merge branch 'main' into support/test-for-kbagent
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Aug 26, 2024
2 parents a0aad1a + 9b0c4f0 commit 60d684b
Show file tree
Hide file tree
Showing 21 changed files with 334 additions and 272 deletions.
4 changes: 2 additions & 2 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ var mockKBAgentClient = func(mock func(*kbagent.MockClientMockRecorder)) {

var mockKBAgentClientDefault = func() {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
return kbagentproto.ActionResponse{}, nil
}).AnyTimes()
})
}

var mockKBAgentClient4HScale = func(clusterKey types.NamespacedName, compName string, replicas int) {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
rsp := kbagentproto.ActionResponse{}
if req.Action != "memberLeave" {
return rsp, nil
Expand Down
22 changes: 9 additions & 13 deletions pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ import (
)

const (
kbAgentContainerName = "kbagent"
kbAgentInitContainerName = "init-kbagent"
kbAgentCommand = "/bin/kbagent"
kbAgentPortName = "http"

kbAgentCommand = "/bin/kbagent"
kbAgentSharedMountPath = "/kubeblocks"
kbAgentCommandOnSharedMount = "/kubeblocks/kbagent"

Expand All @@ -56,18 +52,18 @@ var (
)

func IsKBAgentContainer(c *corev1.Container) bool {
return c.Name == kbAgentContainerName || c.Name == kbAgentInitContainerName
return c.Name == kbagent.ContainerName || c.Name == kbagent.InitContainerName
}

func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbAgentContainerName)
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbagent.ContainerName)
if c == nil {
return
}

httpPort := 0
for _, port := range c.Ports {
if port.Name == kbAgentPortName {
if port.Name == kbagent.DefaultPortName {
httpPort = int(port.ContainerPort)
break
}
Expand Down Expand Up @@ -108,7 +104,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
}

port := int(ports[0])
container := builder.NewContainerBuilder(kbAgentContainerName).
container := builder.NewContainerBuilder(kbagent.ContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands(kbAgentCommand).
Expand All @@ -117,7 +113,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
AddEnv(envVars...).
AddPorts(corev1.ContainerPort{
ContainerPort: int32(port),
Name: kbAgentPortName,
Name: kbagent.DefaultPortName,
Protocol: "TCP",
}).
SetStartupProbe(corev1.Probe{
Expand All @@ -139,7 +135,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
synthesizedComp.HostNetwork.ContainerPorts,
appsv1alpha1.HostNetworkContainerPort{
Container: container.Name,
Ports: []string{kbAgentPortName},
Ports: []string{kbagent.DefaultPortName},
})
}

Expand Down Expand Up @@ -229,7 +225,7 @@ func buildKBAgentStartupEnvs(synthesizedComp *SynthesizedComponent) ([]corev1.En
probes = append(probes, *p)
}

return kbagent.BuildStartupEnvs(actions, probes)
return kbagent.BuildStartupEnv(actions, probes)
}

func buildAction4KBAgent(action *appsv1alpha1.Action, name string) *proto.Action {
Expand Down Expand Up @@ -360,7 +356,7 @@ func customExecActionImageNContainer(synthesizedComp *SynthesizedComponent) (str
}

func buildKBAgentInitContainer() *corev1.Container {
return builder.NewContainerBuilder(kbAgentInitContainerName).
return builder.NewContainerBuilder(kbagent.InitContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands([]string{"cp", "-r", kbAgentCommand, "/bin/curl", kbAgentSharedMountPath + "/"}...).
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/component/kbagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
kbagent "github.com/apecloud/kubeblocks/pkg/kbagent"
)

var _ = Describe("kb-agent", func() {
Expand All @@ -55,7 +56,7 @@ var _ = Describe("kb-agent", func() {

kbAgentContainer := func() *corev1.Container {
for _, c := range synthesizedComp.PodSpec.Containers {
if c.Name == kbAgentContainerName {
if c.Name == kbagent.ContainerName {
return &c
}
}
Expand All @@ -64,7 +65,7 @@ var _ = Describe("kb-agent", func() {

kbAgentInitContainer := func() *corev1.Container {
for _, c := range synthesizedComp.PodSpec.InitContainers {
if c.Name == kbAgentInitContainerName {
if c.Name == kbagent.InitContainerName {
return &c
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/component/lifecycle/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (
ErrActionNotImplemented = errors.New("action is not implemented")
ErrActionInProgress = errors.New("action is in progress")
ErrActionBusy = errors.New("action is busy")
ErrActionTimeout = errors.New("action timeout")
ErrActionTimedOut = errors.New("action timed-out")
ErrActionFailed = errors.New("action failed")
ErrActionCanceled = errors.New("action canceled")
ErrActionInternalError = errors.New("action internal error")
)

Expand Down
75 changes: 51 additions & 24 deletions pkg/controller/component/lifecycle/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
kbagt "github.com/apecloud/kubeblocks/pkg/kbagent"
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
"github.com/apecloud/kubeblocks/pkg/kbagent/service"
)

type lifecycleAction interface {
Expand Down Expand Up @@ -281,16 +282,23 @@ func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1
// - timeout
var output []byte
for _, pod := range pods {
cli, err1 := kbacli.NewClient(*pod)
if err1 != nil {
return nil, err1
host, port, err := a.serverEndpoint(pod)
if err != nil {
return nil, err
}
cli, err := kbacli.NewClient(host, port)
if err != nil {
return nil, err
}
if cli == nil {
continue // not defined, for test only
continue // not kb-agent container and port defined, for test only
}
rsp, err := cli.Action(ctx, *req)
if err != nil {
return nil, err // http error
}
rsp, err2 := cli.CallAction(ctx, *req)
if err2 != nil {
return nil, a.error2(lfa, err2)
if len(rsp.Error) > 0 {
return nil, a.formatError(lfa, rsp)
}
// take first non-nil output
if output == nil && rsp.Output != nil {
Expand Down Expand Up @@ -341,25 +349,44 @@ func (a *kbagent) selectTargetPods(spec *appsv1alpha1.Action) ([]*corev1.Pod, er
}
}

func (a *kbagent) error2(lfa lifecycleAction, err error) error {
func (a *kbagent) serverEndpoint(pod *corev1.Pod) (string, int32, error) {
port, err := intctrlutil.GetPortByName(*pod, kbagt.ContainerName, kbagt.DefaultPortName)
if err != nil {
// has no kb-agent defined
return "", 0, nil
}
host := pod.Status.PodIP
if host == "" {
return "", 0, fmt.Errorf("pod %v has no ip", pod.Name)
}
return host, port, nil
}

func (a *kbagent) formatError(lfa lifecycleAction, rsp proto.ActionResponse) error {
wrapError := func(err error) error {
return errors.Wrapf(err, "action: %s, error: %s", lfa.name(), rsp.Message)
}
err := proto.Type2Error(rsp.Error)
switch {
case err == nil:
return nil
case errors.Is(err, service.ErrNotDefined):
return errors.Wrap(ErrActionNotDefined, lfa.name())
case errors.Is(err, service.ErrNotImplemented):
return errors.Wrap(ErrActionNotImplemented, lfa.name())
case errors.Is(err, service.ErrInProgress):
return errors.Wrap(ErrActionInProgress, lfa.name())
case errors.Is(err, service.ErrBusy):
return errors.Wrap(ErrActionBusy, lfa.name())
case errors.Is(err, service.ErrTimeout):
return errors.Wrap(ErrActionTimeout, lfa.name())
case errors.Is(err, service.ErrFailed):
return errors.Wrap(ErrActionFailed, lfa.name())
case errors.Is(err, service.ErrInternalError):
return errors.Wrap(ErrActionInternalError, lfa.name())
case errors.Is(err, proto.ErrNotDefined):
return wrapError(ErrActionNotDefined)
case errors.Is(err, proto.ErrNotImplemented):
return wrapError(ErrActionNotImplemented)
case errors.Is(err, proto.ErrBadRequest):
return wrapError(ErrActionInternalError)
case errors.Is(err, proto.ErrInProgress):
return wrapError(ErrActionInProgress)
case errors.Is(err, proto.ErrBusy):
return wrapError(ErrActionBusy)
case errors.Is(err, proto.ErrTimedOut):
return wrapError(ErrActionTimedOut)
case errors.Is(err, proto.ErrFailed):
return wrapError(ErrActionFailed)
case errors.Is(err, proto.ErrInternalError):
return wrapError(ErrActionInternalError)
default:
return err
return wrapError(err)
}
}
49 changes: 26 additions & 23 deletions pkg/controller/component/lifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/component"
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
"github.com/apecloud/kubeblocks/pkg/kbagent/service"
)

type mockReader struct {
Expand Down Expand Up @@ -210,7 +209,7 @@ var _ = Describe("lifecycle", func() {

action := synthesizedComp.LifecycleActions.PostProvision
mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
Expect(req.Action).Should(Equal("postProvision"))
Expect(req.Parameters).ShouldNot(BeNil()) // legacy parameters for post-provision action
Expect(req.NonBlocking).ShouldNot(BeNil())
Expand Down Expand Up @@ -239,7 +238,7 @@ var _ = Describe("lifecycle", func() {
Expect(lifecycle).ShouldNot(BeNil())

mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, nil
}).AnyTimes()
})
Expand All @@ -254,7 +253,7 @@ var _ = Describe("lifecycle", func() {
Expect(lifecycle).ShouldNot(BeNil())

mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{
Output: []byte("role-probe"),
}, nil
Expand All @@ -273,38 +272,42 @@ var _ = Describe("lifecycle", func() {

unknownErr := fmt.Errorf("%s", "unknown error")
mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrNotDefined
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrNotDefined
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrNotImplemented
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrNotImplemented
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrInProgress
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrBadRequest
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrBusy
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrInProgress
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrTimeout
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrBusy
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrFailed
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrTimedOut
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, service.ErrInternalError
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrFailed
}).MaxTimes(1)
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, proto.ErrInternalError
}).MaxTimes(1)
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, unknownErr
}).MaxTimes(1)
})

for _, expected := range []error{
ErrActionNotDefined,
ErrActionNotImplemented,
ErrActionInternalError,
ErrActionInProgress,
ErrActionBusy,
ErrActionTimeout,
ErrActionTimedOut,
ErrActionFailed,
ErrActionInternalError,
unknownErr,
Expand Down Expand Up @@ -349,7 +352,7 @@ var _ = Describe("lifecycle", func() {
}

mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
Expect(req.Action).Should(Equal("postProvision"))
Expect(req.Parameters).ShouldNot(BeNil()) // legacy parameters for post-provision action
Expect(req.Parameters[hackedAllCompList]).Should(Equal(strings.Join([]string{synthesizedComp.Name, "another"}, ",")))
Expand All @@ -371,7 +374,7 @@ var _ = Describe("lifecycle", func() {
Expect(lifecycle).ShouldNot(BeNil())

mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
Expect(req.Action).Should(Equal("roleProbe"))
Expect(req.Parameters).ShouldNot(BeNil())
Expect(req.Parameters[key]).Should(Equal(val))
Expand Down Expand Up @@ -410,7 +413,7 @@ var _ = Describe("lifecycle", func() {
}

mockKBAgentClient(func(recorder *kbacli.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
return proto.ActionResponse{}, nil
}).AnyTimes()
})
Expand Down
Loading

0 comments on commit 60d684b

Please sign in to comment.