From 0850b7a2199250983595100454df2d3589f37a71 Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Tue, 3 Dec 2024 15:31:23 +0900 Subject: [PATCH 1/6] Implement PipedServiceClient for gRPC communication in pipedapi package Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/plugin/pipedapi/client.go | 61 +++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 pkg/plugin/pipedapi/client.go diff --git a/pkg/plugin/pipedapi/client.go b/pkg/plugin/pipedapi/client.go new file mode 100644 index 0000000000..e4635354c0 --- /dev/null +++ b/pkg/plugin/pipedapi/client.go @@ -0,0 +1,61 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package planner provides a piped component +// that decides the deployment pipeline of a deployment. +// The planner bases on the changes from git commits +// then builds the deployment manifests to know the behavior of the deployment. +// From that behavior the planner can decides which pipeline should be applied. + +package pipedapi + +import ( + "context" + "slices" + + "google.golang.org/grpc" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/cmd/piped/service" + "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" +) + +type PipedServiceClient struct { + service.PluginServiceClient + conn *grpc.ClientConn +} + +func NewClient(ctx context.Context, address string, opts ...rpcclient.DialOption) (*PipedServiceClient, error) { + // Clone the opts to avoid modifying the original opts slice. + opts = slices.Clone(opts) + + // Append the required options. + // The WithBlock option is required to make the client wait until the connection is up. + // The WithInsecure option is required to disable the transport security. + // The piped service does not require transport security because it is only used in localhost. + opts = append(opts, rpcclient.WithBlock(), rpcclient.WithInsecure()) + + conn, err := rpcclient.DialContext(ctx, address, opts...) + if err != nil { + return nil, err + } + + return &PipedServiceClient{ + PluginServiceClient: service.NewPluginServiceClient(conn), + conn: conn, + }, nil +} + +func (c *PipedServiceClient) Close() error { + return c.conn.Close() +} From d0cdf4f7b4621211f051d627f6271750f8960c3c Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Tue, 3 Dec 2024 15:46:46 +0900 Subject: [PATCH 2/6] Add constructor for ToolRegistry Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/app/pipedv1/plugin/toolregistry/toolregistry.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/app/pipedv1/plugin/toolregistry/toolregistry.go b/pkg/app/pipedv1/plugin/toolregistry/toolregistry.go index 4e674b79c9..debf54df2c 100644 --- a/pkg/app/pipedv1/plugin/toolregistry/toolregistry.go +++ b/pkg/app/pipedv1/plugin/toolregistry/toolregistry.go @@ -24,6 +24,12 @@ type ToolRegistry struct { client service.PluginServiceClient } +func NewToolRegistry(client service.PluginServiceClient) *ToolRegistry { + return &ToolRegistry{ + client: client, + } +} + func (r *ToolRegistry) InstallTool(ctx context.Context, name, version, script string) (path string, err error) { res, err := r.client.InstallTool(ctx, &service.InstallToolRequest{ Name: name, From 42a3602fab32a6987e0e9cb61bb3775149333fbd Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Tue, 3 Dec 2024 15:48:17 +0900 Subject: [PATCH 3/6] Initialize piped plugin service client and pass Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/app/pipedv1/plugin/kubernetes/server.go | 32 ++++++++++++++------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/server.go b/pkg/app/pipedv1/plugin/kubernetes/server.go index adc0b83b06..39b3a69ed9 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/server.go @@ -16,10 +16,15 @@ package main import ( "context" + "net" + "strconv" "time" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/deployment" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/toolregistry" "github.com/pipe-cd/pipecd/pkg/cli" + "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" + "github.com/pipe-cd/pipecd/pkg/plugin/pipedapi" "github.com/pipe-cd/pipecd/pkg/rpc" "github.com/spf13/cobra" "go.uber.org/zap" @@ -27,11 +32,12 @@ import ( ) type server struct { - apiPort int - gracePeriod time.Duration - tls bool - certFile string - keyFile string + apiPort int + pipedPluginServicePort int + gracePeriod time.Duration + tls bool + certFile string + keyFile string enableGRPCReflection bool } @@ -39,8 +45,9 @@ type server struct { // NewServerCommand creates a new cobra command for executing api server. func NewServerCommand() *cobra.Command { s := &server{ - apiPort: 10000, - gracePeriod: 30 * time.Second, + apiPort: 10000, + pipedPluginServicePort: 9087, + gracePeriod: 30 * time.Second, } cmd := &cobra.Command{ Use: "server", @@ -49,6 +56,7 @@ func NewServerCommand() *cobra.Command { } cmd.Flags().IntVar(&s.apiPort, "api-port", s.apiPort, "The port number used to run a grpc server for external apis.") + cmd.Flags().IntVar(&s.pipedPluginServicePort, "piped-plugin-service-port", s.pipedPluginServicePort, "The port number used to connect to the piped plugin service.") cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.") @@ -68,15 +76,19 @@ func (s *server) run(ctx context.Context, input cli.Input) (runErr error) { group, ctx := errgroup.WithContext(ctx) - // TODO: create the piped plugin gRPC client here. + pipedapiClient, err := pipedapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(s.pipedPluginServicePort)), nil) + if err != nil { + input.Logger.Error("failed to create piped plugin service client", zap.Error(err)) + return err + } // Start a gRPC server for handling external API requests. { var ( service = deployment.NewDeploymentService( input.Logger, - nil, // TODO: set the tool registry client here. - nil, // TODO: set the log persister here. + toolregistry.NewToolRegistry(pipedapiClient), + logpersister.NewPersister(pipedapiClient, input.Logger), ) opts = []rpc.Option{ rpc.WithPort(s.apiPort), From 81bef1814ee9a775d9a92d34c4c7c8b89e9012f6 Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Tue, 3 Dec 2024 17:43:42 +0900 Subject: [PATCH 4/6] Add validation for piped-plugin-service-port in server command Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/app/pipedv1/plugin/kubernetes/server.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/server.go b/pkg/app/pipedv1/plugin/kubernetes/server.go index 39b3a69ed9..543a3f697c 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/server.go @@ -16,6 +16,7 @@ package main import ( "context" + "errors" "net" "strconv" "time" @@ -46,7 +47,7 @@ type server struct { func NewServerCommand() *cobra.Command { s := &server{ apiPort: 10000, - pipedPluginServicePort: 9087, + pipedPluginServicePort: -1, // default as error value gracePeriod: 30 * time.Second, } cmd := &cobra.Command{ @@ -56,7 +57,7 @@ func NewServerCommand() *cobra.Command { } cmd.Flags().IntVar(&s.apiPort, "api-port", s.apiPort, "The port number used to run a grpc server for external apis.") - cmd.Flags().IntVar(&s.pipedPluginServicePort, "piped-plugin-service-port", s.pipedPluginServicePort, "The port number used to connect to the piped plugin service.") + cmd.Flags().IntVar(&s.pipedPluginServicePort, "piped-plugin-service-port", s.pipedPluginServicePort, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead. cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.") @@ -76,6 +77,11 @@ func (s *server) run(ctx context.Context, input cli.Input) (runErr error) { group, ctx := errgroup.WithContext(ctx) + if s.pipedPluginServicePort == -1 { + input.Logger.Error("piped-plugin-service-port is required") + return errors.New("piped-plugin-service-port is required") + } + pipedapiClient, err := pipedapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(s.pipedPluginServicePort)), nil) if err != nil { input.Logger.Error("failed to create piped plugin service client", zap.Error(err)) From b66474b27b40adb0cb0bac05768a76d10c78f836 Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Wed, 4 Dec 2024 10:52:17 +0900 Subject: [PATCH 5/6] Mark piped-plugin-service-port required and remove validation Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/app/pipedv1/plugin/kubernetes/server.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/server.go b/pkg/app/pipedv1/plugin/kubernetes/server.go index 543a3f697c..0f3bb5773c 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/server.go @@ -16,7 +16,6 @@ package main import ( "context" - "errors" "net" "strconv" "time" @@ -58,6 +57,7 @@ func NewServerCommand() *cobra.Command { cmd.Flags().IntVar(&s.apiPort, "api-port", s.apiPort, "The port number used to run a grpc server for external apis.") cmd.Flags().IntVar(&s.pipedPluginServicePort, "piped-plugin-service-port", s.pipedPluginServicePort, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead. + cmd.MarkFlagRequired("piped-plugin-service-port") cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.") @@ -77,11 +77,6 @@ func (s *server) run(ctx context.Context, input cli.Input) (runErr error) { group, ctx := errgroup.WithContext(ctx) - if s.pipedPluginServicePort == -1 { - input.Logger.Error("piped-plugin-service-port is required") - return errors.New("piped-plugin-service-port is required") - } - pipedapiClient, err := pipedapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(s.pipedPluginServicePort)), nil) if err != nil { input.Logger.Error("failed to create piped plugin service client", zap.Error(err)) From 2d1da06cf08b164298c627d9b58a902adc64ab9b Mon Sep 17 00:00:00 2001 From: Shinnosuke Sawada-Dazai Date: Wed, 4 Dec 2024 10:57:44 +0900 Subject: [PATCH 6/6] Remove default error value for pipedPluginServicePort Signed-off-by: Shinnosuke Sawada-Dazai --- pkg/app/pipedv1/plugin/kubernetes/server.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/app/pipedv1/plugin/kubernetes/server.go b/pkg/app/pipedv1/plugin/kubernetes/server.go index 0f3bb5773c..ce27d510c3 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/server.go @@ -45,9 +45,8 @@ type server struct { // NewServerCommand creates a new cobra command for executing api server. func NewServerCommand() *cobra.Command { s := &server{ - apiPort: 10000, - pipedPluginServicePort: -1, // default as error value - gracePeriod: 30 * time.Second, + apiPort: 10000, + gracePeriod: 30 * time.Second, } cmd := &cobra.Command{ Use: "server",