diff --git a/controllers/apps/component_controller_test.go b/controllers/apps/component_controller_test.go index 03015985cca..c974af10cfb 100644 --- a/controllers/apps/component_controller_test.go +++ b/controllers/apps/component_controller_test.go @@ -86,7 +86,7 @@ var mockKBAgentClient = func(mock func(*kbagent.MockClientMockRecorder)) { var mockKBAgentClientDefault = func() { mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) { - recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) { + recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) { return kbagentproto.ActionResponse{}, nil }).AnyTimes() }) @@ -94,7 +94,7 @@ var mockKBAgentClientDefault = func() { var mockKBAgentClient4HScale = func(clusterKey types.NamespacedName, compName string, replicas int) { mockKBAgentClient(func(recorder *kbagent.MockClientMockRecorder) { - recorder.CallAction(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) { + recorder.Action(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req kbagentproto.ActionRequest) (kbagentproto.ActionResponse, error) { rsp := kbagentproto.ActionResponse{} if req.Action != "memberLeave" { return rsp, nil diff --git a/pkg/controller/component/kbagent.go b/pkg/controller/component/kbagent.go index 2823a330523..303b04c5f6f 100644 --- a/pkg/controller/component/kbagent.go +++ b/pkg/controller/component/kbagent.go @@ -38,11 +38,7 @@ import ( ) const ( - kbAgentContainerName = "kbagent" - kbAgentInitContainerName = "init-kbagent" - kbAgentCommand = "/bin/kbagent" - kbAgentPortName = "http" - + kbAgentCommand = "/bin/kbagent" kbAgentSharedMountPath = "/kubeblocks" kbAgentCommandOnSharedMount = "/kubeblocks/kbagent" @@ -56,18 +52,18 @@ var ( ) func IsKBAgentContainer(c *corev1.Container) bool { - return c.Name == kbAgentContainerName || c.Name == kbAgentInitContainerName + return c.Name == kbagent.ContainerName || c.Name == kbagent.InitContainerName } func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) { - idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbAgentContainerName) + idx, c := intctrlutil.GetContainerByName(synthesizedComp.PodSpec.Containers, kbagent.ContainerName) if c == nil { return } httpPort := 0 for _, port := range c.Ports { - if port.Name == kbAgentPortName { + if port.Name == kbagent.DefaultPortName { httpPort = int(port.ContainerPort) break } @@ -108,7 +104,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error { } port := int(ports[0]) - container := builder.NewContainerBuilder(kbAgentContainerName). + container := builder.NewContainerBuilder(kbagent.ContainerName). SetImage(viper.GetString(constant.KBToolsImage)). SetImagePullPolicy(corev1.PullIfNotPresent). AddCommands(kbAgentCommand). @@ -117,7 +113,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error { AddEnv(envVars...). AddPorts(corev1.ContainerPort{ ContainerPort: int32(port), - Name: kbAgentPortName, + Name: kbagent.DefaultPortName, Protocol: "TCP", }). SetStartupProbe(corev1.Probe{ @@ -139,7 +135,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error { synthesizedComp.HostNetwork.ContainerPorts, appsv1alpha1.HostNetworkContainerPort{ Container: container.Name, - Ports: []string{kbAgentPortName}, + Ports: []string{kbagent.DefaultPortName}, }) } @@ -229,7 +225,7 @@ func buildKBAgentStartupEnvs(synthesizedComp *SynthesizedComponent) ([]corev1.En probes = append(probes, *p) } - return kbagent.BuildStartupEnvs(actions, probes) + return kbagent.BuildStartupEnv(actions, probes) } func buildAction4KBAgent(action *appsv1alpha1.Action, name string) *proto.Action { @@ -360,7 +356,7 @@ func customExecActionImageNContainer(synthesizedComp *SynthesizedComponent) (str } func buildKBAgentInitContainer() *corev1.Container { - return builder.NewContainerBuilder(kbAgentInitContainerName). + return builder.NewContainerBuilder(kbagent.InitContainerName). SetImage(viper.GetString(constant.KBToolsImage)). SetImagePullPolicy(corev1.PullIfNotPresent). AddCommands([]string{"cp", "-r", kbAgentCommand, "/bin/curl", kbAgentSharedMountPath + "/"}...). diff --git a/pkg/controller/component/lifecycle/kbagent.go b/pkg/controller/component/lifecycle/kbagent.go index c1b4987a9ac..06ee92a3481 100644 --- a/pkg/controller/component/lifecycle/kbagent.go +++ b/pkg/controller/component/lifecycle/kbagent.go @@ -34,6 +34,8 @@ import ( "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/component" "github.com/apecloud/kubeblocks/pkg/controller/instanceset" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" + kbagt "github.com/apecloud/kubeblocks/pkg/kbagent" kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client" "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) @@ -264,16 +266,20 @@ func (a *kbagent) callActionWithSelector(ctx context.Context, spec *appsv1alpha1 // - back-off to retry // - timeout for _, pod := range pods { - cli, err1 := kbacli.NewClient(*pod) - if err1 != nil { - return err1 + host, port, err := a.serverEndpoint(pod) + if err != nil { + return err + } + cli, err := kbacli.NewClient(host, port) + if err != nil { + return err } if cli == nil { - continue // not defined, for test only + continue // not kb-agent container and port defined, for test only } - rsp, err2 := cli.CallAction(ctx, *req) - if err2 != nil { - return err2 + rsp, err := cli.Action(ctx, *req) + if err != nil { + return err // http error } if len(rsp.Error) > 0 { return a.formatError(lfa, rsp) @@ -323,6 +329,19 @@ func (a *kbagent) selectTargetPods(spec *appsv1alpha1.Action) ([]*corev1.Pod, er } } +func (a *kbagent) serverEndpoint(pod *corev1.Pod) (string, int32, error) { + port, err := intctrlutil.GetPortByName(*pod, kbagt.ContainerName, kbagt.DefaultPortName) + if err != nil { + // has no kb-agent defined + return "", 0, nil + } + host := pod.Status.PodIP + if host == "" { + return "", 0, fmt.Errorf("pod %v has no ip", pod.Name) + } + return host, port, nil +} + func (a *kbagent) formatError(lfa lifecycleAction, rsp proto.ActionResponse) error { wrapError := func(err error) error { return errors.Wrapf(err, "action: %s, error: %s", lfa.name(), rsp.Message) diff --git a/pkg/kbagent/client/client.go b/pkg/kbagent/client/client.go index d26e12a999c..d5cfc2bab58 100644 --- a/pkg/kbagent/client/client.go +++ b/pkg/kbagent/client/client.go @@ -21,27 +21,19 @@ package client import ( "context" - "fmt" "net" "net/http" "time" - corev1 "k8s.io/api/core/v1" - - intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) -// TODO: move to a common package const ( - kbAgentContainerName = "kbagent" - kbAgentPortName = "http" + defaultConnectTimeout = 5 * time.Second ) type Client interface { - CallAction(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) - - // LaunchProbe(ctx context.Context, probe proto.Probe) error + Action(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) } // HACK: for unit test only. @@ -62,36 +54,30 @@ func GetMockClient() Client { return mockClient } -func NewClient(pod corev1.Pod) (Client, error) { +func NewClient(host string, port int32) (Client, error) { if mockClient != nil || mockClientError != nil { return mockClient, mockClientError } - port, err := intctrlutil.GetPortByName(pod, kbAgentContainerName, kbAgentPortName) - if err != nil { - // has no kb-agent defined + if host == "" && port == 0 { return nil, nil } - ip := pod.Status.PodIP - if ip == "" { - return nil, fmt.Errorf("pod %v has no ip", pod.Name) - } - // don't use default http-client dialer := &net.Dialer{ - Timeout: 5 * time.Second, + Timeout: defaultConnectTimeout, } transport := &http.Transport{ Dial: dialer.Dial, - TLSHandshakeTimeout: 5 * time.Second, + TLSHandshakeTimeout: defaultConnectTimeout, } cli := &http.Client{ - Timeout: time.Second * 30, + // don't set timeout at client level + // Timeout: time.Second * 30, Transport: transport, } return &httpClient{ - host: ip, + host: host, port: port, client: cli, }, nil diff --git a/pkg/kbagent/client/client_mock.go b/pkg/kbagent/client/client_mock.go index ef37a8d9b92..48d2a55b209 100644 --- a/pkg/kbagent/client/client_mock.go +++ b/pkg/kbagent/client/client_mock.go @@ -56,17 +56,17 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { return m.recorder } -// CallAction mocks base method. -func (m *MockClient) CallAction(arg0 context.Context, arg1 proto.ActionRequest) (proto.ActionResponse, error) { +// Action mocks base method. +func (m *MockClient) Action(arg0 context.Context, arg1 proto.ActionRequest) (proto.ActionResponse, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CallAction", arg0, arg1) + ret := m.ctrl.Call(m, "Action", arg0, arg1) ret0, _ := ret[0].(proto.ActionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } -// CallAction indicates an expected call of CallAction. -func (mr *MockClientMockRecorder) CallAction(arg0, arg1 interface{}) *gomock.Call { +// Action indicates an expected call of Action. +func (mr *MockClientMockRecorder) Action(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CallAction", reflect.TypeOf((*MockClient)(nil).CallAction), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Action", reflect.TypeOf((*MockClient)(nil).Action), arg0, arg1) } diff --git a/pkg/kbagent/client/httpclient.go b/pkg/kbagent/client/httpclient.go index d33681e481e..413b11217f4 100644 --- a/pkg/kbagent/client/httpclient.go +++ b/pkg/kbagent/client/httpclient.go @@ -27,14 +27,11 @@ import ( "io" "net/http" - "github.com/valyala/fasthttp" - "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) const ( - urlTemplate = "http://%s:%d/%s" - actionServiceURI = "/v1.0/action" + urlTemplate = "http://%s:%d%s" ) type httpClient struct { @@ -45,20 +42,22 @@ type httpClient struct { var _ Client = &httpClient{} -func (c *httpClient) CallAction(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) { - url := fmt.Sprintf(urlTemplate, c.host, c.port, actionServiceURI) +func (c *httpClient) Action(ctx context.Context, req proto.ActionRequest) (proto.ActionResponse, error) { + rsp := proto.ActionResponse{} data, err := json.Marshal(req) if err != nil { - return proto.ActionResponse{}, err + return rsp, err } - payload, err := c.request(ctx, fasthttp.MethodPost, url, bytes.NewReader(data)) + url := fmt.Sprintf(urlTemplate, c.host, c.port, proto.ServiceAction.URI) + payload, err := c.request(ctx, http.MethodPost, url, bytes.NewReader(data)) if err != nil { - return proto.ActionResponse{}, err + return rsp, err } + defer payload.Close() - return c.decode(payload) + return decode(payload, &rsp) } func (c *httpClient) request(ctx context.Context, method, url string, body io.Reader) (io.ReadCloser, error) { @@ -69,26 +68,25 @@ func (c *httpClient) request(ctx context.Context, method, url string, body io.Re rsp, err := c.client.Do(req) if err != nil { - return nil, err + return nil, err // http error } switch rsp.StatusCode { - case http.StatusOK, http.StatusNotImplemented, http.StatusInternalServerError: + case http.StatusOK, http.StatusInternalServerError: return rsp.Body, nil default: - return nil, fmt.Errorf("http error: %s", rsp.Status) + return nil, fmt.Errorf("unexpected http status code: %s", rsp.Status) } } -func (c *httpClient) decode(body io.Reader) (proto.ActionResponse, error) { - rsp := proto.ActionResponse{} +func decode[T any](body io.Reader, rsp *T) (T, error) { data, err := io.ReadAll(body) if err != nil { - return rsp, err + return *rsp, err } err = json.Unmarshal(data, &rsp) if err != nil { - return rsp, err + return *rsp, err } - return rsp, nil + return *rsp, nil } diff --git a/pkg/kbagent/proto/proto.go b/pkg/kbagent/proto/proto.go index 736b31429e9..e30463f6afd 100644 --- a/pkg/kbagent/proto/proto.go +++ b/pkg/kbagent/proto/proto.go @@ -52,6 +52,8 @@ type ActionResponse struct { Output []byte `json:"output,omitempty"` } +// TODO: define the event spec for probe or async action + type Probe struct { Action string `json:"action"` InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"` diff --git a/pkg/kbagent/proto/service.go b/pkg/kbagent/proto/service.go new file mode 100644 index 00000000000..04442167e55 --- /dev/null +++ b/pkg/kbagent/proto/service.go @@ -0,0 +1,39 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package proto + +type Service struct { + Kind string + Version string + URI string +} + +var ( + ServiceAction = &Service{ + Kind: "Action", + Version: "v1.0", + URI: "/v1.0/action", + } + ServiceProbe = &Service{ + Kind: "Probe", + Version: "v1.0", + URI: "/v1.0/probe", + } +) diff --git a/pkg/kbagent/server/httpserver.go b/pkg/kbagent/server/httpserver.go index 80edfae11a3..8109f1abdcc 100644 --- a/pkg/kbagent/server/httpserver.go +++ b/pkg/kbagent/server/httpserver.go @@ -24,14 +24,12 @@ import ( "errors" "fmt" "net" - "strings" "time" fasthttprouter "github.com/fasthttp/router" "github.com/go-logr/logr" "github.com/valyala/fasthttp" - "github.com/apecloud/kubeblocks/pkg/kbagent/proto" "github.com/apecloud/kubeblocks/pkg/kbagent/service" ) @@ -150,12 +148,8 @@ func (s *server) router() fasthttp.RequestHandler { } func (s *server) registerService(router *fasthttprouter.Router, svc service.Service) { - router.Handle(fasthttp.MethodPost, s.serviceURI(svc), s.dispatcher(svc)) - s.logger.Info("register service to server", "service", svc.Kind(), "method", fasthttp.MethodPost, "uri", s.serviceURI(svc)) -} - -func (s *server) serviceURI(svc service.Service) string { - return fmt.Sprintf("/%s/%s", svc.Version(), strings.ToLower(svc.Kind())) + router.Handle(fasthttp.MethodPost, svc.URI(), s.dispatcher(svc)) + s.logger.Info("register service to server", "service", svc.Kind(), "method", fasthttp.MethodPost, "uri", svc.URI()) } func (s *server) dispatcher(svc service.Service) func(*fasthttp.RequestCtx) { @@ -166,20 +160,19 @@ func (s *server) dispatcher(svc service.Service) func(*fasthttp.RequestCtx) { output, err := svc.HandleRequest(ctx, body) statusCode := fasthttp.StatusOK if err != nil { - if errors.Is(err, proto.ErrNotImplemented) { - statusCode = fasthttp.StatusNotImplemented - } else { - statusCode = fasthttp.StatusInternalServerError - } + statusCode = fasthttp.StatusInternalServerError } - respond(reqCtx, statusCode, output) + respond(reqCtx, statusCode, output, err) } } -func respond(ctx *fasthttp.RequestCtx, code int, body []byte) { +func respond(ctx *fasthttp.RequestCtx, code int, body []byte, err error) { ctx.Response.Header.SetContentType(jsonContentTypeHeader) ctx.Response.SetStatusCode(code) - if body != nil { + switch { + case err != nil: + ctx.Response.SetBodyString(err.Error()) + case body != nil: ctx.Response.SetBody(body) } } diff --git a/pkg/kbagent/server/types.go b/pkg/kbagent/server/server.go similarity index 100% rename from pkg/kbagent/server/types.go rename to pkg/kbagent/server/server.go diff --git a/pkg/kbagent/service/action.go b/pkg/kbagent/service/action.go index 011116ed451..3f5079b8a2c 100644 --- a/pkg/kbagent/service/action.go +++ b/pkg/kbagent/service/action.go @@ -33,11 +33,6 @@ import ( "github.com/apecloud/kubeblocks/pkg/kbagent/proto" ) -const ( - actionServiceName = "Action" - actionServiceVersion = "v1.0" -) - func newActionService(logger logr.Logger, actions []proto.Action) (*actionService, error) { sa := &actionService{ logger: logger, @@ -69,11 +64,11 @@ type runningAction struct { var _ Service = &actionService{} func (s *actionService) Kind() string { - return actionServiceName + return proto.ServiceAction.Kind } -func (s *actionService) Version() string { - return actionServiceVersion +func (s *actionService) URI() string { + return proto.ServiceAction.URI } func (s *actionService) Start() error { @@ -112,22 +107,18 @@ func (s *actionService) handleRequest(ctx context.Context, req *proto.ActionRequ if _, ok := s.actions[req.Action]; !ok { return nil, errors.Wrapf(proto.ErrNotDefined, "%s is not defined", req.Action) } - return s.handleActionRequest(ctx, req) -} - -func (s *actionService) handleActionRequest(ctx context.Context, req *proto.ActionRequest) ([]byte, error) { action := s.actions[req.Action] - if action.Exec != nil { - return s.handleExecAction(ctx, req, action) + if action.Exec == nil { + return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported") } - return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported") + return s.handleExecAction(ctx, req, action) } func (s *actionService) handleExecAction(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) { - if req.NonBlocking != nil && *req.NonBlocking { - return s.handleExecActionNonBlocking(ctx, req, action) + if req.NonBlocking == nil || !*req.NonBlocking { + return runCommand(ctx, action.Exec, req.Parameters, req.TimeoutSeconds) } - return runCommand(ctx, action.Exec, req.Parameters, req.TimeoutSeconds) + return s.handleExecActionNonBlocking(ctx, req, action) } func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) { diff --git a/pkg/kbagent/service/probe.go b/pkg/kbagent/service/probe.go index 92ac18dcb9a..a981ea70ab8 100644 --- a/pkg/kbagent/service/probe.go +++ b/pkg/kbagent/service/probe.go @@ -28,6 +28,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/pkg/errors" "golang.org/x/exp/maps" "github.com/apecloud/kubeblocks/pkg/kbagent/proto" @@ -35,8 +36,6 @@ import ( ) const ( - probeServiceName = "Probe" - probeServiceVersion = "v1.0" defaultProbePeriodSeconds = 60 ) @@ -67,11 +66,11 @@ type probeService struct { var _ Service = &probeService{} func (s *probeService) Kind() string { - return probeServiceName + return proto.ServiceProbe.Kind } -func (s *probeService) Version() string { - return probeServiceVersion +func (s *probeService) URI() string { + return proto.ServiceProbe.URI } func (s *probeService) Start() error { @@ -87,7 +86,7 @@ func (s *probeService) Start() error { } func (s *probeService) HandleRequest(ctx context.Context, payload []byte) ([]byte, error) { - return nil, proto.ErrNotImplemented + return nil, errors.Wrapf(proto.ErrNotImplemented, "service %s does not support request handling", s.Kind()) } type probeRunner struct { diff --git a/pkg/kbagent/service/probe_test.go b/pkg/kbagent/service/probe_test.go index 48012bc2d03..8cb2119be7a 100644 --- a/pkg/kbagent/service/probe_test.go +++ b/pkg/kbagent/service/probe_test.go @@ -67,8 +67,7 @@ var _ = Describe("probe", func() { service, err := newProbeService(logr.New(nil), actionSvc, probes) Expect(err).Should(BeNil()) Expect(service).ShouldNot(BeNil()) - Expect(service.Kind()).Should(Equal(probeServiceName)) - Expect(service.Version()).Should(Equal(probeServiceVersion)) + Expect(service.Kind()).Should(Equal(proto.ServiceProbe.Kind)) }) It("start", func() { diff --git a/pkg/kbagent/service/service.go b/pkg/kbagent/service/service.go index 24deb0e177d..e3dfecf792b 100644 --- a/pkg/kbagent/service/service.go +++ b/pkg/kbagent/service/service.go @@ -29,7 +29,7 @@ import ( type Service interface { Kind() string - Version() string + URI() string Start() error diff --git a/pkg/kbagent/setup.go b/pkg/kbagent/setup.go index c49afc9601b..8b5408b4ca5 100644 --- a/pkg/kbagent/setup.go +++ b/pkg/kbagent/setup.go @@ -31,11 +31,15 @@ import ( ) const ( + ContainerName = "kbagent" + InitContainerName = "init-kbagent" + DefaultPortName = "http" + actionEnvName = "KB_AGENT_ACTION" probeEnvName = "KB_AGENT_PROBE" ) -func BuildStartupEnvs(actions []proto.Action, probes []proto.Probe) ([]corev1.EnvVar, error) { +func BuildStartupEnv(actions []proto.Action, probes []proto.Probe) ([]corev1.EnvVar, error) { da, dp, err := serializeActionNProbe(actions, probes) if err != nil { return nil, err