Skip to content

Commit

Permalink
chore: refine kb-agent http client/server to pass errors to controller (
Browse files Browse the repository at this point in the history
#8022)

Co-authored-by: Ursasi <zshprint@163.com>
  • Loading branch information
leon-inf and Ursasi authored Aug 26, 2024
1 parent f8b783e commit 9b0c4f0
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 246 deletions.
4 changes: 2 additions & 2 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ var mockKBAgentClient = func(mock func(*kbagent.MockClientMockRecorder)) {

var mockKBAgentClientDefault = func() {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
return kbagentproto.ActionResponse{}, nil
}).AnyTimes()
})
}

var mockKBAgentClient4HScale = func(clusterKey types.NamespacedName, compName string, replicas int) {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
rsp := kbagentproto.ActionResponse{}
if req.Action != "memberLeave" {
return rsp, nil
Expand Down
22 changes: 9 additions & 13 deletions pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ import (
)

const (
kbAgentContainerName = "kbagent"
kbAgentInitContainerName = "init-kbagent"
kbAgentCommand = "/bin/kbagent"
kbAgentPortName = "http"

kbAgentCommand = "/bin/kbagent"
kbAgentSharedMountPath = "/kubeblocks"
kbAgentCommandOnSharedMount = "/kubeblocks/kbagent"

Expand All @@ -56,18 +52,18 @@ var (
)

func IsKBAgentContainer(c *corev1.Container) bool {
return c.Name == kbAgentContainerName || c.Name == kbAgentInitContainerName
return c.Name == kbagent.ContainerName || c.Name == kbagent.InitContainerName
}

func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbAgentContainerName)
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbagent.ContainerName)
if c == nil {
return
}

httpPort := 0
for _, port := range c.Ports {
if port.Name == kbAgentPortName {
if port.Name == kbagent.DefaultPortName {
httpPort = int(port.ContainerPort)
break
}
Expand Down Expand Up @@ -108,7 +104,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
}

port := int(ports[0])
container := builder.NewContainerBuilder(kbAgentContainerName).
container := builder.NewContainerBuilder(kbagent.ContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands(kbAgentCommand).
Expand All @@ -117,7 +113,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
AddEnv(envVars...).
AddPorts(corev1.ContainerPort{
ContainerPort: int32(port),
Name: kbAgentPortName,
Name: kbagent.DefaultPortName,
Protocol: "TCP",
}).
SetStartupProbe(corev1.Probe{
Expand All @@ -139,7 +135,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
synthesizedComp.HostNetwork.ContainerPorts,
appsv1alpha1.HostNetworkContainerPort{
Container: container.Name,
Ports: []string{kbAgentPortName},
Ports: []string{kbagent.DefaultPortName},
})
}

Expand Down Expand Up @@ -229,7 +225,7 @@ func buildKBAgentStartupEnvs(synthesizedComp *SynthesizedComponent) ([]corev1.En
probes = append(probes, *p)
}

return kbagent.BuildStartupEnvs(actions, probes)
return kbagent.BuildStartupEnv(actions, probes)
}

func buildAction4KBAgent(action *appsv1alpha1.Action, name string) *proto.Action {
Expand Down Expand Up @@ -360,7 +356,7 @@ func customExecActionImageNContainer(synthesizedComp *SynthesizedComponent) (str
}

func buildKBAgentInitContainer() *corev1.Container {
return builder.NewContainerBuilder(kbAgentInitContainerName).
return builder.NewContainerBuilder(kbagent.InitContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands([]string{"cp", "-r", kbAgentCommand, "/bin/curl", kbAgentSharedMountPath + "/"}...).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/component/lifecycle/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
ErrActionNotImplemented = errors.New("action is not implemented")
ErrActionInProgress = errors.New("action is in progress")
ErrActionBusy = errors.New("action is busy")
ErrActionTimeout = errors.New("action timeout")
ErrActionTimedOut = errors.New("action timed-out")
ErrActionFailed = errors.New("action failed")
ErrActionCanceled = errors.New("action canceled")
ErrActionInternalError = errors.New("action internal error")
Expand Down
75 changes: 51 additions & 24 deletions pkg/controller/component/lifecycle/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
kbagt "github.com/apecloud/kubeblocks/pkg/kbagent"
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
"github.com/apecloud/kubeblocks/pkg/kbagent/service"
)

type lifecycleAction interface {
Expand Down Expand Up @@ -265,16 +266,23 @@ func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1
// - back-off to retry
// - timeout
for _, pod := range pods {
cli, err1 := kbacli.NewClient(*pod)
if err1 != nil {
return err1
host, port, err := a.serverEndpoint(pod)
if err != nil {
return err
}
cli, err := kbacli.NewClient(host, port)
if err != nil {
return err
}
if cli == nil {
continue // not defined, for test only
continue // not kb-agent container and port defined, for test only
}
rsp, err := cli.Action(ctx, *req)
if err != nil {
return err // http error
}
_, err2 := cli.CallAction(ctx, *req)
if err2 != nil {
return a.error2(lfa, err2)
if len(rsp.Error) > 0 {
return a.formatError(lfa, rsp)
}
}
return nil
Expand Down Expand Up @@ -321,25 +329,44 @@ func (a *kbagent) selectTargetPods(spec *appsv1alpha1.Action) ([]*corev1.Pod, er
}
}

func (a *kbagent) error2(lfa lifecycleAction, err error) error {
func (a *kbagent) serverEndpoint(pod *corev1.Pod) (string, int32, error) {
port, err := intctrlutil.GetPortByName(*pod, kbagt.ContainerName, kbagt.DefaultPortName)
if err != nil {
// has no kb-agent defined
return "", 0, nil
}
host := pod.Status.PodIP
if host == "" {
return "", 0, fmt.Errorf("pod %v has no ip", pod.Name)
}
return host, port, nil
}

func (a *kbagent) formatError(lfa lifecycleAction, rsp proto.ActionResponse) error {
wrapError := func(err error) error {
return errors.Wrapf(err, "action: %s, error: %s", lfa.name(), rsp.Message)
}
err := proto.Type2Error(rsp.Error)
switch {
case err == nil:
return nil
case errors.Is(err, service.ErrNotDefined):
return errors.Wrap(ErrActionNotDefined, lfa.name())
case errors.Is(err, service.ErrNotImplemented):
return errors.Wrap(ErrActionNotImplemented, lfa.name())
case errors.Is(err, service.ErrInProgress):
return errors.Wrap(ErrActionInProgress, lfa.name())
case errors.Is(err, service.ErrBusy):
return errors.Wrap(ErrActionBusy, lfa.name())
case errors.Is(err, service.ErrTimeout):
return errors.Wrap(ErrActionTimeout, lfa.name())
case errors.Is(err, service.ErrFailed):
return errors.Wrap(ErrActionFailed, lfa.name())
case errors.Is(err, service.ErrInternalError):
return errors.Wrap(ErrActionInternalError, lfa.name())
case errors.Is(err, proto.ErrNotDefined):
return wrapError(ErrActionNotDefined)
case errors.Is(err, proto.ErrNotImplemented):
return wrapError(ErrActionNotImplemented)
case errors.Is(err, proto.ErrBadRequest):
return wrapError(ErrActionInternalError)
case errors.Is(err, proto.ErrInProgress):
return wrapError(ErrActionInProgress)
case errors.Is(err, proto.ErrBusy):
return wrapError(ErrActionBusy)
case errors.Is(err, proto.ErrTimedOut):
return wrapError(ErrActionTimedOut)
case errors.Is(err, proto.ErrFailed):
return wrapError(ErrActionFailed)
case errors.Is(err, proto.ErrInternalError):
return wrapError(ErrActionInternalError)
default:
return err
return wrapError(err)
}
}
32 changes: 9 additions & 23 deletions pkg/kbagent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,19 @@ package client

import (
"context"
"fmt"
"net"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"

intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)

// TODO: move to a common package
const (
kbAgentContainerName = "kbagent"
kbAgentPortName = "http"
defaultConnectTimeout = 5 * time.Second
)

type Client interface {
CallAction(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error)

// LaunchProbe(ctx context.Context, probe proto.Probe) error
Action(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error)
}

// HACK: for unit test only.
Expand All @@ -62,36 +54,30 @@ func GetMockClient() Client {
return mockClient
}

func NewClient(pod corev1.Pod) (Client, error) {
func NewClient(host string, port int32) (Client, error) {
if mockClient != nil || mockClientError != nil {
return mockClient, mockClientError
}

port, err := intctrlutil.GetPortByName(pod, kbAgentContainerName, kbAgentPortName)
if err != nil {
// has no kb-agent defined
if host == "" && port == 0 {
return nil, nil
}

ip := pod.Status.PodIP
if ip == "" {
return nil, fmt.Errorf("pod %v has no ip", pod.Name)
}

// don't use default http-client
dialer := &net.Dialer{
Timeout: 5 * time.Second,
Timeout: defaultConnectTimeout,
}
transport := &http.Transport{
Dial: dialer.Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSHandshakeTimeout: defaultConnectTimeout,
}
cli := &http.Client{
Timeout: time.Second * 30,
// don't set timeout at client level
// Timeout: time.Second * 30,
Transport: transport,
}
return &httpClient{
host: ip,
host: host,
port: port,
client: cli,
}, nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/kbagent/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9b0c4f0

Please sign in to comment.