Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Aug 26, 2024
1 parent 71d0a85 commit b3ed795
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 116 deletions.
4 changes: 2 additions & 2 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ var mockKBAgentClient = func(mock func(*kbagent.MockClientMockRecorder)) {

var mockKBAgentClientDefault = func() {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
return kbagentproto.ActionResponse{}, nil
}).AnyTimes()
})
}

var mockKBAgentClient4HScale = func(clusterKey types.NamespacedName, compName string, replicas int) {
mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) {
recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) {
rsp := kbagentproto.ActionResponse{}
if req.Action != "memberLeave" {
return rsp, nil
Expand Down
22 changes: 9 additions & 13 deletions pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ import (
)

const (
kbAgentContainerName = "kbagent"
kbAgentInitContainerName = "init-kbagent"
kbAgentCommand = "/bin/kbagent"
kbAgentPortName = "http"

kbAgentCommand = "/bin/kbagent"
kbAgentSharedMountPath = "/kubeblocks"
kbAgentCommandOnSharedMount = "/kubeblocks/kbagent"

Expand All @@ -56,18 +52,18 @@ var (
)

func IsKBAgentContainer(c *corev1.Container) bool {
return c.Name == kbAgentContainerName || c.Name == kbAgentInitContainerName
return c.Name == kbagent.ContainerName || c.Name == kbagent.InitContainerName
}

func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbAgentContainerName)
idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbagent.ContainerName)
if c == nil {
return
}

httpPort := 0
for _, port := range c.Ports {
if port.Name == kbAgentPortName {
if port.Name == kbagent.DefaultPortName {
httpPort = int(port.ContainerPort)
break
}
Expand Down Expand Up @@ -108,7 +104,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
}

port := int(ports[0])
container := builder.NewContainerBuilder(kbAgentContainerName).
container := builder.NewContainerBuilder(kbagent.ContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands(kbAgentCommand).
Expand All @@ -117,7 +113,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
AddEnv(envVars...).
AddPorts(corev1.ContainerPort{
ContainerPort: int32(port),
Name: kbAgentPortName,
Name: kbagent.DefaultPortName,
Protocol: "TCP",
}).
SetStartupProbe(corev1.Probe{
Expand All @@ -139,7 +135,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
synthesizedComp.HostNetwork.ContainerPorts,
appsv1alpha1.HostNetworkContainerPort{
Container: container.Name,
Ports: []string{kbAgentPortName},
Ports: []string{kbagent.DefaultPortName},
})
}

Expand Down Expand Up @@ -229,7 +225,7 @@ func buildKBAgentStartupEnvs(synthesizedComp *SynthesizedComponent) ([]corev1.En
probes = append(probes, *p)
}

return kbagent.BuildStartupEnvs(actions, probes)
return kbagent.BuildStartupEnv(actions, probes)
}

func buildAction4KBAgent(action *appsv1alpha1.Action, name string) *proto.Action {
Expand Down Expand Up @@ -360,7 +356,7 @@ func customExecActionImageNContainer(synthesizedComp *SynthesizedComponent) (str
}

func buildKBAgentInitContainer() *corev1.Container {
return builder.NewContainerBuilder(kbAgentInitContainerName).
return builder.NewContainerBuilder(kbagent.InitContainerName).
SetImage(viper.GetString(constant.KBToolsImage)).
SetImagePullPolicy(corev1.PullIfNotPresent).
AddCommands([]string{"cp", "-r", kbAgentCommand, "/bin/curl", kbAgentSharedMountPath + "/"}...).
Expand Down
33 changes: 26 additions & 7 deletions pkg/controller/component/lifecycle/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
kbagt "github.com/apecloud/kubeblocks/pkg/kbagent"
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)
Expand Down Expand Up @@ -264,16 +266,20 @@ func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1
// - back-off to retry
// - timeout
for _, pod := range pods {
cli, err1 := kbacli.NewClient(*pod)
if err1 != nil {
return err1
host, port, err := a.serverEndpoint(pod)
if err != nil {
return err
}
if cli == nil {
if host == "" && port == 0 {
continue // not defined, for test only
}
rsp, err2 := cli.CallAction(ctx, *req)
if err2 != nil {
return err2
cli, err := kbacli.NewClient(host, port)
if err != nil {
return err
}
rsp, err := cli.Action(ctx, *req)
if err != nil {
return err // http error
}
if len(rsp.Error) > 0 {
return a.formatError(lfa, rsp)
Expand Down Expand Up @@ -323,6 +329,19 @@ func (a *kbagent) selectTargetPods(spec *appsv1alpha1.Action) ([]*corev1.Pod, er
}
}

func (a *kbagent) serverEndpoint(pod *corev1.Pod) (string, int32, error) {
port, err := intctrlutil.GetPortByName(*pod, kbagt.ContainerName, kbagt.DefaultPortName)
if err != nil {
// has no kb-agent defined
return "", 0, nil
}
host := pod.Status.PodIP
if host == "" {
return "", 0, fmt.Errorf("pod %v has no ip", pod.Name)
}
return host, port, nil
}

func (a *kbagent) formatError(lfa lifecycleAction, rsp proto.ActionResponse) error {
wrapError := func(err error) error {
return errors.Wrapf(err, "action: %s, error: %s", lfa.name(), rsp.Message)
Expand Down
34 changes: 8 additions & 26 deletions pkg/kbagent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,19 @@ package client

import (
"context"
"fmt"
"net"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"

intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)

// TODO: move to a common package
const (
kbAgentContainerName = "kbagent"
kbAgentPortName = "http"
defaultConnectTimeout = 5 * time.Second
)

type Client interface {
CallAction(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error)

// LaunchProbe(ctx context.Context, probe proto.Probe) error
Action(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error)
}

// HACK: for unit test only.
Expand All @@ -62,36 +54,26 @@ func GetMockClient() Client {
return mockClient
}

func NewClient(pod corev1.Pod) (Client, error) {
func NewClient(host string, port int32) (Client, error) {
if mockClient != nil || mockClientError != nil {
return mockClient, mockClientError
}

port, err := intctrlutil.GetPortByName(pod, kbAgentContainerName, kbAgentPortName)
if err != nil {
// has no kb-agent defined
return nil, nil
}

ip := pod.Status.PodIP
if ip == "" {
return nil, fmt.Errorf("pod %v has no ip", pod.Name)
}

// don't use default http-client
dialer := &net.Dialer{
Timeout: 5 * time.Second,
Timeout: defaultConnectTimeout,
}
transport := &http.Transport{
Dial: dialer.Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSHandshakeTimeout: defaultConnectTimeout,
}
cli := &http.Client{
Timeout: time.Second * 30,
// don't set timeout at client level
// Timeout: time.Second * 30,
Transport: transport,
}
return &httpClient{
host: ip,
host: host,
port: port,
client: cli,
}, nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/kbagent/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 16 additions & 18 deletions pkg/kbagent/client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ import (
"io"
"net/http"

"github.com/valyala/fasthttp"

"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)

const (
urlTemplate = "http://%s:%d/%s"
actionServiceURI = "/v1.0/action"
urlTemplate = "http://%s:%d/%s"
)

type httpClient struct {
Expand All @@ -45,20 +42,22 @@ type httpClient struct {

var _ Client = &httpClient{}

func (c *httpClient) CallAction(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
url := fmt.Sprintf(urlTemplate, c.host, c.port, actionServiceURI)
func (c *httpClient) Action(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) {
rsp := proto.ActionResponse{}

data, err := json.Marshal(req)
if err != nil {
return proto.ActionResponse{}, err
return rsp, err
}

payload, err := c.request(ctx, fasthttp.MethodPost, url, bytes.NewReader(data))
url := fmt.Sprintf(urlTemplate, c.host, c.port, proto.ServiceAction.URI)
payload, err := c.request(ctx, http.MethodPost, url, bytes.NewReader(data))
if err != nil {
return proto.ActionResponse{}, err
return rsp, err
}

defer payload.Close()
return c.decode(payload)
return decode(payload, &rsp)
}

func (c *httpClient) request(ctx context.Context, method, url string, body io.Reader) (io.ReadCloser, error) {
Expand All @@ -69,26 +68,25 @@ func (c *httpClient) request(ctx context.Context, method, url string, body io.Re

rsp, err := c.client.Do(req)
if err != nil {
return nil, err
return nil, err // http error
}

switch rsp.StatusCode {
case http.StatusOK, http.StatusNotImplemented, http.StatusInternalServerError:
case http.StatusOK, http.StatusInternalServerError:
return rsp.Body, nil
default:
return nil, fmt.Errorf("http error: %s", rsp.Status)
return nil, fmt.Errorf("unexpected http status code: %s", rsp.Status)
}
}

func (c *httpClient) decode(body io.Reader) (proto.ActionResponse, error) {
rsp := proto.ActionResponse{}
func decode[T any](body io.Reader, rsp *T) (T, error) {
data, err := io.ReadAll(body)
if err != nil {
return rsp, err
return *rsp, err
}
err = json.Unmarshal(data, &rsp)
if err != nil {
return rsp, err
return *rsp, err
}
return rsp, nil
return *rsp, nil
}
39 changes: 39 additions & 0 deletions pkg/kbagent/proto/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package proto

type Service struct {
Kind string
Version string
URI string
}

var (
ServiceAction = &Service{
Kind: "Action",
Version: "v1.0",
URI: "/v1.0/action",
}
ServiceProbe = &Service{
Kind: "Probe",
Version: "v1.0",
URI: "/v1.0/probe",
}
)
Loading

0 comments on commit b3ed795

Please sign in to comment.